aiice.core.huggingface
1from collections import defaultdict 2from concurrent.futures import ThreadPoolExecutor, as_completed 3from datetime import date, timedelta 4from functools import lru_cache 5from io import BytesIO 6 7import requests 8from huggingface_hub import HfApi 9from huggingface_hub.constants import DEFAULT_REQUEST_TIMEOUT 10from huggingface_hub.errors import RemoteEntryNotFoundError 11from huggingface_hub.file_download import http_get 12from huggingface_hub.utils import build_hf_headers 13 14from aiice.constants import ( 15 BYTES_IN_MB, 16 DATASET_SHAPE, 17 DEFAULT_BACKOFF, 18 DEFAULT_RETRIES, 19 HF_BASE_URL, 20 HF_DATASET_REPO, 21 HF_PACKAGE_NAME, 22 HF_REPO_TYPE, 23 KEY_DATASET_END, 24 KEY_DATASET_START, 25 KEY_FILES, 26 KEY_PER_YEAR, 27 KEY_SHAPE, 28 KEY_SIZE_BYTES, 29 KEY_SIZE_MB, 30 MAX_DATASET_END, 31 MIN_DATASET_START, 32 YEAR_STATS_CACHE_SIZE, 33) 34from aiice.core.utils import ( 35 convert_step_to_delta, 36 get_filename_template, 37 retry_on_network_errors, 38) 39 40 41class HfDatasetClient: 42 """ 43 Client for accessing the AIICE Hugging Face dataset. 44 """ 45 46 def __init__(self): 47 self._api_base_url = HF_BASE_URL 48 self._api = HfApi(endpoint=self._api_base_url, library_name=HF_PACKAGE_NAME) 49 self._api_headers = build_hf_headers(library_name=HF_PACKAGE_NAME) 50 51 self._dataset_repo = HF_DATASET_REPO 52 self._dataset_repo_type = HF_REPO_TYPE 53 54 self._min_dataset_start, self._max_dataset_end = ( 55 MIN_DATASET_START, 56 MAX_DATASET_END, 57 ) 58 self._shape = DATASET_SHAPE 59 60 @property 61 def dataset_start(self) -> date: 62 """ 63 Earliest available date in the dataset. 64 """ 65 return self._min_dataset_start 66 67 @property 68 def dataset_end(self) -> date: 69 """ 70 Latest available date in the dataset. 71 """ 72 return self._max_dataset_end 73 74 @property 75 def shape(self) -> tuple[int, ...]: 76 """ 77 Shape of a single dataset sample. 78 """ 79 return self._shape 80 81 @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF) 82 def info(self, per_year: bool = False, threads: int = 24) -> dict[str, any]: 83 """ 84 Collect dataset size statistics. 85 86 Args: 87 per_year (`bool`, optional): If True, include per-year file and size statistics. Defaults to False. 88 threads (`int`, optional): Number of threads used for parallel HTTP requests. Defaults to 24. 89 """ 90 total_files, total_size = 0, 0 91 per_year_result = defaultdict( 92 lambda: { 93 KEY_FILES: 0, 94 KEY_SIZE_BYTES: 0, 95 KEY_SIZE_MB: 0.0, 96 } 97 ) 98 99 with ThreadPoolExecutor(max_workers=threads) as executor: 100 futures = [ 101 executor.submit(self._fetch_year_stats, year) 102 for year in range( 103 self.dataset_start.year, 104 self.dataset_end.year + 1, 105 ) 106 ] 107 108 for future in as_completed(futures): 109 year, files, size = future.result() 110 111 per_year_result[year][KEY_FILES] = files 112 per_year_result[year][KEY_SIZE_BYTES] = size 113 per_year_result[year][KEY_SIZE_MB] = round(size / BYTES_IN_MB, 2) 114 115 total_files += files 116 total_size += size 117 118 result: dict[str, any] = { 119 KEY_DATASET_START: self.dataset_start, 120 KEY_DATASET_END: self.dataset_end, 121 KEY_SHAPE: self.shape, 122 f"total_{KEY_FILES}": total_files, 123 f"total_{KEY_SIZE_BYTES}": total_size, 124 f"total_{KEY_SIZE_MB}": round(total_size / BYTES_IN_MB, 2), 125 } 126 127 if per_year: 128 result[KEY_PER_YEAR] = dict(per_year_result) 129 130 return result 131 132 def get_filenames( 133 self, 134 start: date | None = None, 135 end: date | None = None, 136 step: int | str | None = None, 137 ) -> list[str]: 138 """ 139 Generate dataset filenames for a date range. 140 141 Args: 142 start (`date`, optional): Start date (inclusive). Defaults to dataset start. 143 end (`date`, optional): End date (inclusive). Defaults to dataset end. 144 step (`int` or `str`, optional): Step between files. If `int` - number of days. 145 If `str` - format like `"1d"`, `"1w"`, `"1m"`, `"1y"`. 146 For month or years steps (`"1m"`, `"2m"`, etc.), the date always lands on the last day 147 of the month (e.g., Jan 31 + 1 month = Feb 28/29, then Mar 31). 148 Defaults to 1 day. 149 """ 150 start = start or self.dataset_start 151 end = end or self.dataset_end 152 153 if start < self.dataset_start: 154 raise ValueError(f"date start value should be > {self.dataset_start}") 155 156 if end > self.dataset_end: 157 raise ValueError(f"date end value should be < {self.dataset_end}") 158 159 if start > end: 160 raise ValueError("start date must be <= date end") 161 162 filenames: list[str] = [] 163 current = start 164 delta = convert_step_to_delta(step=step) 165 166 while current <= end: 167 filenames.append(get_filename_template(current)) 168 current += delta 169 170 return filenames 171 172 @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF) 173 def read_file(self, filename: str) -> bytes | None: 174 """ 175 Load a dataset file from Hugging Face into memory. 176 177 Args: 178 filename (`str`): Relative path to the dataset file. 179 """ 180 url = f"{self._api_base_url}/datasets/{self._dataset_repo}/resolve/main/{filename}" 181 buffer = BytesIO() 182 try: 183 http_get( 184 url=url, 185 temp_file=buffer, 186 displayed_filename=filename, 187 headers=self._api_headers, 188 ) 189 return buffer.getvalue() 190 191 # ignore if file isn't found 192 except RemoteEntryNotFoundError: 193 return None 194 195 except Exception as e: 196 raise RuntimeError(f"Failed to get file {filename}") from e 197 198 @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF) 199 def download_file(self, filename: str, local_dir: str) -> str | None: 200 """ 201 Download a dataset file to a local directory. 202 203 Args: 204 filename (`str`): Dataset file path. 205 local_dir (`str`): Target directory for download. 206 """ 207 try: 208 return self._api.hf_hub_download( 209 repo_id=self._dataset_repo, 210 repo_type=self._dataset_repo_type, 211 filename=filename, 212 local_dir=local_dir, 213 ) 214 215 # ignore if file isn't found 216 except RemoteEntryNotFoundError: 217 return None 218 219 except Exception as e: 220 raise RuntimeError(f"Failed to download file {filename}") from e 221 222 @lru_cache(maxsize=YEAR_STATS_CACHE_SIZE) 223 def _fetch_year_stats(self, year: int) -> tuple[int, int, int]: 224 url = f"{self._api_base_url}/api/datasets/{self._dataset_repo}/tree/main/global_series/{year}" 225 226 resp = requests.get( 227 url, timeout=DEFAULT_REQUEST_TIMEOUT, headers=self._api_headers 228 ) 229 resp.raise_for_status() 230 231 files, size = 0, 0 232 for item in resp.json(): 233 if item.get("type") != "file": 234 continue 235 236 files += 1 237 size += item.get("size", 0) 238 239 return year, files, size
class
HfDatasetClient:
42class HfDatasetClient: 43 """ 44 Client for accessing the AIICE Hugging Face dataset. 45 """ 46 47 def __init__(self): 48 self._api_base_url = HF_BASE_URL 49 self._api = HfApi(endpoint=self._api_base_url, library_name=HF_PACKAGE_NAME) 50 self._api_headers = build_hf_headers(library_name=HF_PACKAGE_NAME) 51 52 self._dataset_repo = HF_DATASET_REPO 53 self._dataset_repo_type = HF_REPO_TYPE 54 55 self._min_dataset_start, self._max_dataset_end = ( 56 MIN_DATASET_START, 57 MAX_DATASET_END, 58 ) 59 self._shape = DATASET_SHAPE 60 61 @property 62 def dataset_start(self) -> date: 63 """ 64 Earliest available date in the dataset. 65 """ 66 return self._min_dataset_start 67 68 @property 69 def dataset_end(self) -> date: 70 """ 71 Latest available date in the dataset. 72 """ 73 return self._max_dataset_end 74 75 @property 76 def shape(self) -> tuple[int, ...]: 77 """ 78 Shape of a single dataset sample. 79 """ 80 return self._shape 81 82 @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF) 83 def info(self, per_year: bool = False, threads: int = 24) -> dict[str, any]: 84 """ 85 Collect dataset size statistics. 86 87 Args: 88 per_year (`bool`, optional): If True, include per-year file and size statistics. Defaults to False. 89 threads (`int`, optional): Number of threads used for parallel HTTP requests. Defaults to 24. 90 """ 91 total_files, total_size = 0, 0 92 per_year_result = defaultdict( 93 lambda: { 94 KEY_FILES: 0, 95 KEY_SIZE_BYTES: 0, 96 KEY_SIZE_MB: 0.0, 97 } 98 ) 99 100 with ThreadPoolExecutor(max_workers=threads) as executor: 101 futures = [ 102 executor.submit(self._fetch_year_stats, year) 103 for year in range( 104 self.dataset_start.year, 105 self.dataset_end.year + 1, 106 ) 107 ] 108 109 for future in as_completed(futures): 110 year, files, size = future.result() 111 112 per_year_result[year][KEY_FILES] = files 113 per_year_result[year][KEY_SIZE_BYTES] = size 114 per_year_result[year][KEY_SIZE_MB] = round(size / BYTES_IN_MB, 2) 115 116 total_files += files 117 total_size += size 118 119 result: dict[str, any] = { 120 KEY_DATASET_START: self.dataset_start, 121 KEY_DATASET_END: self.dataset_end, 122 KEY_SHAPE: self.shape, 123 f"total_{KEY_FILES}": total_files, 124 f"total_{KEY_SIZE_BYTES}": total_size, 125 f"total_{KEY_SIZE_MB}": round(total_size / BYTES_IN_MB, 2), 126 } 127 128 if per_year: 129 result[KEY_PER_YEAR] = dict(per_year_result) 130 131 return result 132 133 def get_filenames( 134 self, 135 start: date | None = None, 136 end: date | None = None, 137 step: int | str | None = None, 138 ) -> list[str]: 139 """ 140 Generate dataset filenames for a date range. 141 142 Args: 143 start (`date`, optional): Start date (inclusive). Defaults to dataset start. 144 end (`date`, optional): End date (inclusive). Defaults to dataset end. 145 step (`int` or `str`, optional): Step between files. If `int` - number of days. 146 If `str` - format like `"1d"`, `"1w"`, `"1m"`, `"1y"`. 147 For month or years steps (`"1m"`, `"2m"`, etc.), the date always lands on the last day 148 of the month (e.g., Jan 31 + 1 month = Feb 28/29, then Mar 31). 149 Defaults to 1 day. 150 """ 151 start = start or self.dataset_start 152 end = end or self.dataset_end 153 154 if start < self.dataset_start: 155 raise ValueError(f"date start value should be > {self.dataset_start}") 156 157 if end > self.dataset_end: 158 raise ValueError(f"date end value should be < {self.dataset_end}") 159 160 if start > end: 161 raise ValueError("start date must be <= date end") 162 163 filenames: list[str] = [] 164 current = start 165 delta = convert_step_to_delta(step=step) 166 167 while current <= end: 168 filenames.append(get_filename_template(current)) 169 current += delta 170 171 return filenames 172 173 @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF) 174 def read_file(self, filename: str) -> bytes | None: 175 """ 176 Load a dataset file from Hugging Face into memory. 177 178 Args: 179 filename (`str`): Relative path to the dataset file. 180 """ 181 url = f"{self._api_base_url}/datasets/{self._dataset_repo}/resolve/main/{filename}" 182 buffer = BytesIO() 183 try: 184 http_get( 185 url=url, 186 temp_file=buffer, 187 displayed_filename=filename, 188 headers=self._api_headers, 189 ) 190 return buffer.getvalue() 191 192 # ignore if file isn't found 193 except RemoteEntryNotFoundError: 194 return None 195 196 except Exception as e: 197 raise RuntimeError(f"Failed to get file {filename}") from e 198 199 @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF) 200 def download_file(self, filename: str, local_dir: str) -> str | None: 201 """ 202 Download a dataset file to a local directory. 203 204 Args: 205 filename (`str`): Dataset file path. 206 local_dir (`str`): Target directory for download. 207 """ 208 try: 209 return self._api.hf_hub_download( 210 repo_id=self._dataset_repo, 211 repo_type=self._dataset_repo_type, 212 filename=filename, 213 local_dir=local_dir, 214 ) 215 216 # ignore if file isn't found 217 except RemoteEntryNotFoundError: 218 return None 219 220 except Exception as e: 221 raise RuntimeError(f"Failed to download file {filename}") from e 222 223 @lru_cache(maxsize=YEAR_STATS_CACHE_SIZE) 224 def _fetch_year_stats(self, year: int) -> tuple[int, int, int]: 225 url = f"{self._api_base_url}/api/datasets/{self._dataset_repo}/tree/main/global_series/{year}" 226 227 resp = requests.get( 228 url, timeout=DEFAULT_REQUEST_TIMEOUT, headers=self._api_headers 229 ) 230 resp.raise_for_status() 231 232 files, size = 0, 0 233 for item in resp.json(): 234 if item.get("type") != "file": 235 continue 236 237 files += 1 238 size += item.get("size", 0) 239 240 return year, files, size
Client for accessing the AIICE Hugging Face dataset.
dataset_start: datetime.date
61 @property 62 def dataset_start(self) -> date: 63 """ 64 Earliest available date in the dataset. 65 """ 66 return self._min_dataset_start
Earliest available date in the dataset.
dataset_end: datetime.date
68 @property 69 def dataset_end(self) -> date: 70 """ 71 Latest available date in the dataset. 72 """ 73 return self._max_dataset_end
Latest available date in the dataset.
shape: tuple[int, ...]
75 @property 76 def shape(self) -> tuple[int, ...]: 77 """ 78 Shape of a single dataset sample. 79 """ 80 return self._shape
Shape of a single dataset sample.
@retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
def
info(self, per_year: bool = False, threads: int = 24) -> dict[str, any]:
82 @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF) 83 def info(self, per_year: bool = False, threads: int = 24) -> dict[str, any]: 84 """ 85 Collect dataset size statistics. 86 87 Args: 88 per_year (`bool`, optional): If True, include per-year file and size statistics. Defaults to False. 89 threads (`int`, optional): Number of threads used for parallel HTTP requests. Defaults to 24. 90 """ 91 total_files, total_size = 0, 0 92 per_year_result = defaultdict( 93 lambda: { 94 KEY_FILES: 0, 95 KEY_SIZE_BYTES: 0, 96 KEY_SIZE_MB: 0.0, 97 } 98 ) 99 100 with ThreadPoolExecutor(max_workers=threads) as executor: 101 futures = [ 102 executor.submit(self._fetch_year_stats, year) 103 for year in range( 104 self.dataset_start.year, 105 self.dataset_end.year + 1, 106 ) 107 ] 108 109 for future in as_completed(futures): 110 year, files, size = future.result() 111 112 per_year_result[year][KEY_FILES] = files 113 per_year_result[year][KEY_SIZE_BYTES] = size 114 per_year_result[year][KEY_SIZE_MB] = round(size / BYTES_IN_MB, 2) 115 116 total_files += files 117 total_size += size 118 119 result: dict[str, any] = { 120 KEY_DATASET_START: self.dataset_start, 121 KEY_DATASET_END: self.dataset_end, 122 KEY_SHAPE: self.shape, 123 f"total_{KEY_FILES}": total_files, 124 f"total_{KEY_SIZE_BYTES}": total_size, 125 f"total_{KEY_SIZE_MB}": round(total_size / BYTES_IN_MB, 2), 126 } 127 128 if per_year: 129 result[KEY_PER_YEAR] = dict(per_year_result) 130 131 return result
Collect dataset size statistics.
Arguments:
- per_year (
bool, optional): If True, include per-year file and size statistics. Defaults to False. - threads (
int, optional): Number of threads used for parallel HTTP requests. Defaults to 24.
def
get_filenames( self, start: datetime.date | None = None, end: datetime.date | None = None, step: int | str | None = None) -> list[str]:
133 def get_filenames( 134 self, 135 start: date | None = None, 136 end: date | None = None, 137 step: int | str | None = None, 138 ) -> list[str]: 139 """ 140 Generate dataset filenames for a date range. 141 142 Args: 143 start (`date`, optional): Start date (inclusive). Defaults to dataset start. 144 end (`date`, optional): End date (inclusive). Defaults to dataset end. 145 step (`int` or `str`, optional): Step between files. If `int` - number of days. 146 If `str` - format like `"1d"`, `"1w"`, `"1m"`, `"1y"`. 147 For month or years steps (`"1m"`, `"2m"`, etc.), the date always lands on the last day 148 of the month (e.g., Jan 31 + 1 month = Feb 28/29, then Mar 31). 149 Defaults to 1 day. 150 """ 151 start = start or self.dataset_start 152 end = end or self.dataset_end 153 154 if start < self.dataset_start: 155 raise ValueError(f"date start value should be > {self.dataset_start}") 156 157 if end > self.dataset_end: 158 raise ValueError(f"date end value should be < {self.dataset_end}") 159 160 if start > end: 161 raise ValueError("start date must be <= date end") 162 163 filenames: list[str] = [] 164 current = start 165 delta = convert_step_to_delta(step=step) 166 167 while current <= end: 168 filenames.append(get_filename_template(current)) 169 current += delta 170 171 return filenames
Generate dataset filenames for a date range.
Arguments:
- start (
date, optional): Start date (inclusive). Defaults to dataset start. - end (
date, optional): End date (inclusive). Defaults to dataset end. - step (
intorstr, optional): Step between files. Ifint- number of days. Ifstr- format like"1d","1w","1m","1y". For month or years steps ("1m","2m", etc.), the date always lands on the last day of the month (e.g., Jan 31 + 1 month = Feb 28/29, then Mar 31). Defaults to 1 day.
@retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
def
read_file(self, filename: str) -> bytes | None:
173 @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF) 174 def read_file(self, filename: str) -> bytes | None: 175 """ 176 Load a dataset file from Hugging Face into memory. 177 178 Args: 179 filename (`str`): Relative path to the dataset file. 180 """ 181 url = f"{self._api_base_url}/datasets/{self._dataset_repo}/resolve/main/{filename}" 182 buffer = BytesIO() 183 try: 184 http_get( 185 url=url, 186 temp_file=buffer, 187 displayed_filename=filename, 188 headers=self._api_headers, 189 ) 190 return buffer.getvalue() 191 192 # ignore if file isn't found 193 except RemoteEntryNotFoundError: 194 return None 195 196 except Exception as e: 197 raise RuntimeError(f"Failed to get file {filename}") from e
Load a dataset file from Hugging Face into memory.
Arguments:
- filename (
str): Relative path to the dataset file.
@retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
def
download_file(self, filename: str, local_dir: str) -> str | None:
199 @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF) 200 def download_file(self, filename: str, local_dir: str) -> str | None: 201 """ 202 Download a dataset file to a local directory. 203 204 Args: 205 filename (`str`): Dataset file path. 206 local_dir (`str`): Target directory for download. 207 """ 208 try: 209 return self._api.hf_hub_download( 210 repo_id=self._dataset_repo, 211 repo_type=self._dataset_repo_type, 212 filename=filename, 213 local_dir=local_dir, 214 ) 215 216 # ignore if file isn't found 217 except RemoteEntryNotFoundError: 218 return None 219 220 except Exception as e: 221 raise RuntimeError(f"Failed to download file {filename}") from e
Download a dataset file to a local directory.
Arguments:
- filename (
str): Dataset file path. - local_dir (
str): Target directory for download.