aiice.core.huggingface

  1from collections import defaultdict
  2from concurrent.futures import ThreadPoolExecutor, as_completed
  3from datetime import date, timedelta
  4from functools import lru_cache
  5from io import BytesIO
  6
  7import requests
  8from huggingface_hub import HfApi
  9from huggingface_hub.constants import DEFAULT_REQUEST_TIMEOUT
 10from huggingface_hub.errors import RemoteEntryNotFoundError
 11from huggingface_hub.file_download import http_get
 12from huggingface_hub.utils import build_hf_headers
 13
 14from aiice.constants import (
 15    BYTES_IN_MB,
 16    DATASET_SHAPE,
 17    DEFAULT_BACKOFF,
 18    DEFAULT_RETRIES,
 19    HF_BASE_URL,
 20    HF_DATASET_REPO,
 21    HF_PACKAGE_NAME,
 22    HF_REPO_TYPE,
 23    KEY_DATASET_END,
 24    KEY_DATASET_START,
 25    KEY_FILES,
 26    KEY_PER_YEAR,
 27    KEY_SHAPE,
 28    KEY_SIZE_BYTES,
 29    KEY_SIZE_MB,
 30    MAX_DATASET_END,
 31    MIN_DATASET_START,
 32    YEAR_STATS_CACHE_SIZE,
 33)
 34from aiice.core.utils import (
 35    convert_step_to_delta,
 36    get_filename_template,
 37    retry_on_network_errors,
 38)
 39
 40
 41class HfDatasetClient:
 42    """
 43    Client for accessing the AIICE Hugging Face dataset.
 44    """
 45
 46    def __init__(self):
 47        self._api_base_url = HF_BASE_URL
 48        self._api = HfApi(endpoint=self._api_base_url, library_name=HF_PACKAGE_NAME)
 49        self._api_headers = build_hf_headers(library_name=HF_PACKAGE_NAME)
 50
 51        self._dataset_repo = HF_DATASET_REPO
 52        self._dataset_repo_type = HF_REPO_TYPE
 53
 54        self._min_dataset_start, self._max_dataset_end = (
 55            MIN_DATASET_START,
 56            MAX_DATASET_END,
 57        )
 58        self._shape = DATASET_SHAPE
 59
 60    @property
 61    def dataset_start(self) -> date:
 62        """
 63        Earliest available date in the dataset.
 64        """
 65        return self._min_dataset_start
 66
 67    @property
 68    def dataset_end(self) -> date:
 69        """
 70        Latest available date in the dataset.
 71        """
 72        return self._max_dataset_end
 73
 74    @property
 75    def shape(self) -> tuple[int, ...]:
 76        """
 77        Shape of a single dataset sample.
 78        """
 79        return self._shape
 80
 81    @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
 82    def info(self, per_year: bool = False, threads: int = 24) -> dict[str, any]:
 83        """
 84        Collect dataset size statistics.
 85
 86        Args:
 87            per_year (`bool`, optional): If True, include per-year file and size statistics. Defaults to False.
 88            threads (`int`, optional): Number of threads used for parallel HTTP requests. Defaults to 24.
 89        """
 90        total_files, total_size = 0, 0
 91        per_year_result = defaultdict(
 92            lambda: {
 93                KEY_FILES: 0,
 94                KEY_SIZE_BYTES: 0,
 95                KEY_SIZE_MB: 0.0,
 96            }
 97        )
 98
 99        with ThreadPoolExecutor(max_workers=threads) as executor:
100            futures = [
101                executor.submit(self._fetch_year_stats, year)
102                for year in range(
103                    self.dataset_start.year,
104                    self.dataset_end.year + 1,
105                )
106            ]
107
108            for future in as_completed(futures):
109                year, files, size = future.result()
110
111                per_year_result[year][KEY_FILES] = files
112                per_year_result[year][KEY_SIZE_BYTES] = size
113                per_year_result[year][KEY_SIZE_MB] = round(size / BYTES_IN_MB, 2)
114
115                total_files += files
116                total_size += size
117
118        result: dict[str, any] = {
119            KEY_DATASET_START: self.dataset_start,
120            KEY_DATASET_END: self.dataset_end,
121            KEY_SHAPE: self.shape,
122            f"total_{KEY_FILES}": total_files,
123            f"total_{KEY_SIZE_BYTES}": total_size,
124            f"total_{KEY_SIZE_MB}": round(total_size / BYTES_IN_MB, 2),
125        }
126
127        if per_year:
128            result[KEY_PER_YEAR] = dict(per_year_result)
129
130        return result
131
132    def get_filenames(
133        self,
134        start: date | None = None,
135        end: date | None = None,
136        step: int | str | None = None,
137    ) -> list[str]:
138        """
139        Generate dataset filenames for a date range.
140
141        Args:
142            start (`date`, optional): Start date (inclusive). Defaults to dataset start.
143            end (`date`, optional): End date (inclusive). Defaults to dataset end.
144            step (`int` or `str`, optional): Step between files. If `int` - number of days.
145                If `str` - format like `"1d"`, `"1w"`, `"1m"`, `"1y"`.
146                For month or years steps (`"1m"`, `"2m"`, etc.), the date always lands on the last day
147                of the month (e.g., Jan 31 + 1 month = Feb 28/29, then Mar 31).
148                Defaults to 1 day.
149        """
150        start = start or self.dataset_start
151        end = end or self.dataset_end
152
153        if start < self.dataset_start:
154            raise ValueError(f"date start value should be > {self.dataset_start}")
155
156        if end > self.dataset_end:
157            raise ValueError(f"date end value should be < {self.dataset_end}")
158
159        if start > end:
160            raise ValueError("start date must be <= date end")
161
162        filenames: list[str] = []
163        current = start
164        delta = convert_step_to_delta(step=step)
165
166        while current <= end:
167            filenames.append(get_filename_template(current))
168            current += delta
169
170        return filenames
171
172    @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
173    def read_file(self, filename: str) -> bytes | None:
174        """
175        Load a dataset file from Hugging Face into memory.
176
177        Args:
178            filename (`str`): Relative path to the dataset file.
179        """
180        url = f"{self._api_base_url}/datasets/{self._dataset_repo}/resolve/main/{filename}"
181        buffer = BytesIO()
182        try:
183            http_get(
184                url=url,
185                temp_file=buffer,
186                displayed_filename=filename,
187                headers=self._api_headers,
188            )
189            return buffer.getvalue()
190
191        # ignore if file isn't found
192        except RemoteEntryNotFoundError:
193            return None
194
195        except Exception as e:
196            raise RuntimeError(f"Failed to get file {filename}") from e
197
198    @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
199    def download_file(self, filename: str, local_dir: str) -> str | None:
200        """
201        Download a dataset file to a local directory.
202
203        Args:
204            filename (`str`): Dataset file path.
205            local_dir (`str`): Target directory for download.
206        """
207        try:
208            return self._api.hf_hub_download(
209                repo_id=self._dataset_repo,
210                repo_type=self._dataset_repo_type,
211                filename=filename,
212                local_dir=local_dir,
213            )
214
215        # ignore if file isn't found
216        except RemoteEntryNotFoundError:
217            return None
218
219        except Exception as e:
220            raise RuntimeError(f"Failed to download file {filename}") from e
221
222    @lru_cache(maxsize=YEAR_STATS_CACHE_SIZE)
223    def _fetch_year_stats(self, year: int) -> tuple[int, int, int]:
224        url = f"{self._api_base_url}/api/datasets/{self._dataset_repo}/tree/main/global_series/{year}"
225
226        resp = requests.get(
227            url, timeout=DEFAULT_REQUEST_TIMEOUT, headers=self._api_headers
228        )
229        resp.raise_for_status()
230
231        files, size = 0, 0
232        for item in resp.json():
233            if item.get("type") != "file":
234                continue
235
236            files += 1
237            size += item.get("size", 0)
238
239        return year, files, size
class HfDatasetClient:
 42class HfDatasetClient:
 43    """
 44    Client for accessing the AIICE Hugging Face dataset.
 45    """
 46
 47    def __init__(self):
 48        self._api_base_url = HF_BASE_URL
 49        self._api = HfApi(endpoint=self._api_base_url, library_name=HF_PACKAGE_NAME)
 50        self._api_headers = build_hf_headers(library_name=HF_PACKAGE_NAME)
 51
 52        self._dataset_repo = HF_DATASET_REPO
 53        self._dataset_repo_type = HF_REPO_TYPE
 54
 55        self._min_dataset_start, self._max_dataset_end = (
 56            MIN_DATASET_START,
 57            MAX_DATASET_END,
 58        )
 59        self._shape = DATASET_SHAPE
 60
 61    @property
 62    def dataset_start(self) -> date:
 63        """
 64        Earliest available date in the dataset.
 65        """
 66        return self._min_dataset_start
 67
 68    @property
 69    def dataset_end(self) -> date:
 70        """
 71        Latest available date in the dataset.
 72        """
 73        return self._max_dataset_end
 74
 75    @property
 76    def shape(self) -> tuple[int, ...]:
 77        """
 78        Shape of a single dataset sample.
 79        """
 80        return self._shape
 81
 82    @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
 83    def info(self, per_year: bool = False, threads: int = 24) -> dict[str, any]:
 84        """
 85        Collect dataset size statistics.
 86
 87        Args:
 88            per_year (`bool`, optional): If True, include per-year file and size statistics. Defaults to False.
 89            threads (`int`, optional): Number of threads used for parallel HTTP requests. Defaults to 24.
 90        """
 91        total_files, total_size = 0, 0
 92        per_year_result = defaultdict(
 93            lambda: {
 94                KEY_FILES: 0,
 95                KEY_SIZE_BYTES: 0,
 96                KEY_SIZE_MB: 0.0,
 97            }
 98        )
 99
100        with ThreadPoolExecutor(max_workers=threads) as executor:
101            futures = [
102                executor.submit(self._fetch_year_stats, year)
103                for year in range(
104                    self.dataset_start.year,
105                    self.dataset_end.year + 1,
106                )
107            ]
108
109            for future in as_completed(futures):
110                year, files, size = future.result()
111
112                per_year_result[year][KEY_FILES] = files
113                per_year_result[year][KEY_SIZE_BYTES] = size
114                per_year_result[year][KEY_SIZE_MB] = round(size / BYTES_IN_MB, 2)
115
116                total_files += files
117                total_size += size
118
119        result: dict[str, any] = {
120            KEY_DATASET_START: self.dataset_start,
121            KEY_DATASET_END: self.dataset_end,
122            KEY_SHAPE: self.shape,
123            f"total_{KEY_FILES}": total_files,
124            f"total_{KEY_SIZE_BYTES}": total_size,
125            f"total_{KEY_SIZE_MB}": round(total_size / BYTES_IN_MB, 2),
126        }
127
128        if per_year:
129            result[KEY_PER_YEAR] = dict(per_year_result)
130
131        return result
132
133    def get_filenames(
134        self,
135        start: date | None = None,
136        end: date | None = None,
137        step: int | str | None = None,
138    ) -> list[str]:
139        """
140        Generate dataset filenames for a date range.
141
142        Args:
143            start (`date`, optional): Start date (inclusive). Defaults to dataset start.
144            end (`date`, optional): End date (inclusive). Defaults to dataset end.
145            step (`int` or `str`, optional): Step between files. If `int` - number of days.
146                If `str` - format like `"1d"`, `"1w"`, `"1m"`, `"1y"`.
147                For month or years steps (`"1m"`, `"2m"`, etc.), the date always lands on the last day
148                of the month (e.g., Jan 31 + 1 month = Feb 28/29, then Mar 31).
149                Defaults to 1 day.
150        """
151        start = start or self.dataset_start
152        end = end or self.dataset_end
153
154        if start < self.dataset_start:
155            raise ValueError(f"date start value should be > {self.dataset_start}")
156
157        if end > self.dataset_end:
158            raise ValueError(f"date end value should be < {self.dataset_end}")
159
160        if start > end:
161            raise ValueError("start date must be <= date end")
162
163        filenames: list[str] = []
164        current = start
165        delta = convert_step_to_delta(step=step)
166
167        while current <= end:
168            filenames.append(get_filename_template(current))
169            current += delta
170
171        return filenames
172
173    @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
174    def read_file(self, filename: str) -> bytes | None:
175        """
176        Load a dataset file from Hugging Face into memory.
177
178        Args:
179            filename (`str`): Relative path to the dataset file.
180        """
181        url = f"{self._api_base_url}/datasets/{self._dataset_repo}/resolve/main/{filename}"
182        buffer = BytesIO()
183        try:
184            http_get(
185                url=url,
186                temp_file=buffer,
187                displayed_filename=filename,
188                headers=self._api_headers,
189            )
190            return buffer.getvalue()
191
192        # ignore if file isn't found
193        except RemoteEntryNotFoundError:
194            return None
195
196        except Exception as e:
197            raise RuntimeError(f"Failed to get file {filename}") from e
198
199    @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
200    def download_file(self, filename: str, local_dir: str) -> str | None:
201        """
202        Download a dataset file to a local directory.
203
204        Args:
205            filename (`str`): Dataset file path.
206            local_dir (`str`): Target directory for download.
207        """
208        try:
209            return self._api.hf_hub_download(
210                repo_id=self._dataset_repo,
211                repo_type=self._dataset_repo_type,
212                filename=filename,
213                local_dir=local_dir,
214            )
215
216        # ignore if file isn't found
217        except RemoteEntryNotFoundError:
218            return None
219
220        except Exception as e:
221            raise RuntimeError(f"Failed to download file {filename}") from e
222
223    @lru_cache(maxsize=YEAR_STATS_CACHE_SIZE)
224    def _fetch_year_stats(self, year: int) -> tuple[int, int, int]:
225        url = f"{self._api_base_url}/api/datasets/{self._dataset_repo}/tree/main/global_series/{year}"
226
227        resp = requests.get(
228            url, timeout=DEFAULT_REQUEST_TIMEOUT, headers=self._api_headers
229        )
230        resp.raise_for_status()
231
232        files, size = 0, 0
233        for item in resp.json():
234            if item.get("type") != "file":
235                continue
236
237            files += 1
238            size += item.get("size", 0)
239
240        return year, files, size

Client for accessing the AIICE Hugging Face dataset.

dataset_start: datetime.date
61    @property
62    def dataset_start(self) -> date:
63        """
64        Earliest available date in the dataset.
65        """
66        return self._min_dataset_start

Earliest available date in the dataset.

dataset_end: datetime.date
68    @property
69    def dataset_end(self) -> date:
70        """
71        Latest available date in the dataset.
72        """
73        return self._max_dataset_end

Latest available date in the dataset.

shape: tuple[int, ...]
75    @property
76    def shape(self) -> tuple[int, ...]:
77        """
78        Shape of a single dataset sample.
79        """
80        return self._shape

Shape of a single dataset sample.

@retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
def info(self, per_year: bool = False, threads: int = 24) -> dict[str, any]:
 82    @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
 83    def info(self, per_year: bool = False, threads: int = 24) -> dict[str, any]:
 84        """
 85        Collect dataset size statistics.
 86
 87        Args:
 88            per_year (`bool`, optional): If True, include per-year file and size statistics. Defaults to False.
 89            threads (`int`, optional): Number of threads used for parallel HTTP requests. Defaults to 24.
 90        """
 91        total_files, total_size = 0, 0
 92        per_year_result = defaultdict(
 93            lambda: {
 94                KEY_FILES: 0,
 95                KEY_SIZE_BYTES: 0,
 96                KEY_SIZE_MB: 0.0,
 97            }
 98        )
 99
100        with ThreadPoolExecutor(max_workers=threads) as executor:
101            futures = [
102                executor.submit(self._fetch_year_stats, year)
103                for year in range(
104                    self.dataset_start.year,
105                    self.dataset_end.year + 1,
106                )
107            ]
108
109            for future in as_completed(futures):
110                year, files, size = future.result()
111
112                per_year_result[year][KEY_FILES] = files
113                per_year_result[year][KEY_SIZE_BYTES] = size
114                per_year_result[year][KEY_SIZE_MB] = round(size / BYTES_IN_MB, 2)
115
116                total_files += files
117                total_size += size
118
119        result: dict[str, any] = {
120            KEY_DATASET_START: self.dataset_start,
121            KEY_DATASET_END: self.dataset_end,
122            KEY_SHAPE: self.shape,
123            f"total_{KEY_FILES}": total_files,
124            f"total_{KEY_SIZE_BYTES}": total_size,
125            f"total_{KEY_SIZE_MB}": round(total_size / BYTES_IN_MB, 2),
126        }
127
128        if per_year:
129            result[KEY_PER_YEAR] = dict(per_year_result)
130
131        return result

Collect dataset size statistics.

Arguments:
  • per_year (bool, optional): If True, include per-year file and size statistics. Defaults to False.
  • threads (int, optional): Number of threads used for parallel HTTP requests. Defaults to 24.
def get_filenames( self, start: datetime.date | None = None, end: datetime.date | None = None, step: int | str | None = None) -> list[str]:
133    def get_filenames(
134        self,
135        start: date | None = None,
136        end: date | None = None,
137        step: int | str | None = None,
138    ) -> list[str]:
139        """
140        Generate dataset filenames for a date range.
141
142        Args:
143            start (`date`, optional): Start date (inclusive). Defaults to dataset start.
144            end (`date`, optional): End date (inclusive). Defaults to dataset end.
145            step (`int` or `str`, optional): Step between files. If `int` - number of days.
146                If `str` - format like `"1d"`, `"1w"`, `"1m"`, `"1y"`.
147                For month or years steps (`"1m"`, `"2m"`, etc.), the date always lands on the last day
148                of the month (e.g., Jan 31 + 1 month = Feb 28/29, then Mar 31).
149                Defaults to 1 day.
150        """
151        start = start or self.dataset_start
152        end = end or self.dataset_end
153
154        if start < self.dataset_start:
155            raise ValueError(f"date start value should be > {self.dataset_start}")
156
157        if end > self.dataset_end:
158            raise ValueError(f"date end value should be < {self.dataset_end}")
159
160        if start > end:
161            raise ValueError("start date must be <= date end")
162
163        filenames: list[str] = []
164        current = start
165        delta = convert_step_to_delta(step=step)
166
167        while current <= end:
168            filenames.append(get_filename_template(current))
169            current += delta
170
171        return filenames

Generate dataset filenames for a date range.

Arguments:
  • start (date, optional): Start date (inclusive). Defaults to dataset start.
  • end (date, optional): End date (inclusive). Defaults to dataset end.
  • step (int or str, optional): Step between files. If int - number of days. If str - format like "1d", "1w", "1m", "1y". For month or years steps ("1m", "2m", etc.), the date always lands on the last day of the month (e.g., Jan 31 + 1 month = Feb 28/29, then Mar 31). Defaults to 1 day.
@retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
def read_file(self, filename: str) -> bytes | None:
173    @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
174    def read_file(self, filename: str) -> bytes | None:
175        """
176        Load a dataset file from Hugging Face into memory.
177
178        Args:
179            filename (`str`): Relative path to the dataset file.
180        """
181        url = f"{self._api_base_url}/datasets/{self._dataset_repo}/resolve/main/{filename}"
182        buffer = BytesIO()
183        try:
184            http_get(
185                url=url,
186                temp_file=buffer,
187                displayed_filename=filename,
188                headers=self._api_headers,
189            )
190            return buffer.getvalue()
191
192        # ignore if file isn't found
193        except RemoteEntryNotFoundError:
194            return None
195
196        except Exception as e:
197            raise RuntimeError(f"Failed to get file {filename}") from e

Load a dataset file from Hugging Face into memory.

Arguments:
  • filename (str): Relative path to the dataset file.
@retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
def download_file(self, filename: str, local_dir: str) -> str | None:
199    @retry_on_network_errors(retries=DEFAULT_RETRIES, backoff=DEFAULT_BACKOFF)
200    def download_file(self, filename: str, local_dir: str) -> str | None:
201        """
202        Download a dataset file to a local directory.
203
204        Args:
205            filename (`str`): Dataset file path.
206            local_dir (`str`): Target directory for download.
207        """
208        try:
209            return self._api.hf_hub_download(
210                repo_id=self._dataset_repo,
211                repo_type=self._dataset_repo_type,
212                filename=filename,
213                local_dir=local_dir,
214            )
215
216        # ignore if file isn't found
217        except RemoteEntryNotFoundError:
218            return None
219
220        except Exception as e:
221            raise RuntimeError(f"Failed to download file {filename}") from e

Download a dataset file to a local directory.

Arguments:
  • filename (str): Dataset file path.
  • local_dir (str): Target directory for download.