Source code for rook.catalog.intake

"""Utilities for working with Intake catalogs."""

from urllib.parse import urlparse

import fsspec
import intake

from rook import CONFIG

from .base import Catalog
from .util import MAX_DATETIME, MIN_DATETIME, parse_time

DEFAULT_INTAKE_CATALOG_URL = (
    "https://raw.githubusercontent.com/cp4cds/c3s_34g_manifests/master/"
    "intake/catalogs/c3s.yaml"
)


[docs] class IntakeCatalog(Catalog): """Intake catalog class.""" def __init__(self, project, url=None): super().__init__(project) self.url = ( url or CONFIG.get("catalog", {}).get("intake_catalog_url") or DEFAULT_INTAKE_CATALOG_URL ) self._cat = None self._store = {} @property def catalog(self): """Return the intake catalog.""" if not self._cat: parsed_url = urlparse(self.url) is_http_catalog = parsed_url.scheme in {"http", "https"} if is_http_catalog: fs = fsspec.filesystem("http", client_kwargs={"trust_env": True}) try: # Disable shell-based default expansion in intake catalogs. # Ref: https://github.com/roocs/rook/security/dependabot/3 self._cat = intake.open_catalog(self.url, fs=fs, getshell=False) except TypeError: # Keep compatibility with intake variants that do not accept fs. self._cat = intake.open_catalog(self.url, getshell=False) else: self._cat = intake.open_catalog(self.url, getshell=False) return self._cat
[docs] def load(self): """Load the catalog.""" if self.project not in self._store: project_catalog = self.catalog[self.project] # Avoid stale transport options on nested catalogs for some partner deployments. if hasattr(project_catalog, "_storage_options"): project_catalog._storage_options = None self._store[self.project] = project_catalog.read() return self._store[self.project]
def _query(self, collection, time=None, time_components=None): df = self.load() start, end = parse_time(time, time_components) # workaround for NaN values when no time axis (fx datasets) df = df.fillna({"start_time": MIN_DATETIME, "end_time": MAX_DATETIME}) # needed when catalog created from catalog_maker instead of above - can remove above line eventually df = df.replace({"start_time": {"undefined": MIN_DATETIME}}) df = df.replace({"end_time": {"undefined": MAX_DATETIME}}) # search result = df.loc[ (df.ds_id.isin(collection)) & (df.end_time >= start) & (df.start_time <= end) ] records = {} for _, row in result.iterrows(): if row.ds_id not in records: records[row.ds_id] = [] records[row.ds_id].append(row.path) return records