Source code for rook.director.director

from collections import OrderedDict

from pywps.app.exceptions import ProcessError
from clisops.project_utils import get_project_name
from clisops.utils.file_utils import FileMapper

from rook import CONFIG
from clisops.exceptions import InvalidCollection
from rook.catalog import get_catalog

from ..utils.input_utils import clean_inputs
from .alignment import SubsetAlignmentChecker
from .compat import ResultSet, is_characterised


[docs] def wrap_director(collection, inputs, runner): # Ask director whether request should be rejected, use original files or apply WPS process try: director = Director(collection, inputs) director.process(runner) return director except Exception as e: raise ProcessError(f"{e}")
[docs] class Director: def __init__(self, coll, inputs): self.coll = coll self.inputs = inputs self.project = get_project_name(coll[0]) self.use_original_files = False self.original_file_urls = None self.output_uris = None self.search_result = None if CONFIG[f"project:{self.project}"].get("use_catalog"): try: self.catalog = get_catalog(self.project) except Exception: raise InvalidCollection() self._resolve() # if enabled for the project then check if a fix will be applied self._check_apply_fixes()
[docs] def use_fixes(self): # TODO: don't use fixes return False
# return CONFIG[f"project:{self.project}"].get("use_fixes", False) def _check_apply_fixes(self): if ( self.use_fixes() and self.inputs.get("apply_fixes") and not self.use_original_files and self.requires_fixes() ): self.inputs["apply_fixes"] = True else: self.inputs["apply_fixes"] = False def _resolve(self): """ Resolve how the WPS will handle this request. Steps through the following: - Are all datasets in the inventory? If NO: raise Exception - Does the user want to access original files only? If YES: return (and use original files) - Does the user require data to be pre-checked AND has the collection been pre-checked? If NO: raise Exception - Does the user want to apply fixes AND fixes are required for this collection? If YES: return (and use WPS) - Does the requested temporal subset align with files in all datasets in this collection? If YES: return (and use original files) If NO: return (and use WPS) Raises ------ ProcessError: [description] ProcessError: [description] """ # search self.search_result = self.catalog.search( collection=self.coll, time=self.inputs.get("time"), time_components=self.inputs.get("time_components"), ) # Raise exception if any of the dataset ids is not in the inventory if len(self.search_result) != len(self.coll): raise InvalidCollection() # If original files are requested then go straight there if ( self.inputs.get("original_files") or self.project == "c3s-ipcc-atlas" # or self.project == "c3s-cica-atlas" ): self.original_file_urls = self.search_result.download_urls() self.use_original_files = True return # Raise exception if "pre_checked" selected but data has not been characterised by dachar if self.inputs.get("pre_checked") and not is_characterised( self.coll, require_all=True ): raise ProcessError("Data has not been pre-checked") # Check if fixes are required. If so, then return (and subset will be generated). if self.inputs.get("apply_fixes") and self.requires_fixes(): return # TODO: quick fix for average, regrid and concat. Don't use original files for these operators. if "dims" in self.inputs or "freq" in self.inputs or "grid" in self.inputs: return # Finally, check if the subset requirements can align with whole datasets if self.request_aligns_with_files(): # This call sets values for self.original_file_urls AND self.use_original_files pass # If we got here: then WPS will be used, because `self.use_original_files == False`
[docs] def requires_fixes(self): # TODO: don't use fixes return False
# if not self.use_fixes(): # return False # if self.search_result: # ds_ids = self.search_result.files() # else: # ds_ids = self.coll # for ds_id in ds_ids: # fix = fixer.Fixer(ds_id) # if fix.pre_processor or fix.post_processors: # return True # return False
[docs] def request_aligns_with_files(self): """Return whether collection files already align with the requested subset.""" required_files = OrderedDict() for ds_id, urls in self.search_result.download_urls().items(): sac = SubsetAlignmentChecker(urls, self.inputs) # TODO: don't use original files for atlas data ... need to apply a fix # if not sac.is_aligned or "c3s-cica-atlas" in ds_id: if not sac.is_aligned: self.use_original_files = False self.original_file_urls = None return False required_files[ds_id] = sac.aligned_files[:] # If we got here, then we have full alignment so set the properties and return True self.use_original_files = True self.original_file_urls = required_files return True
[docs] def process(self, runner): # Either packages up original files (URLs) or # runs the process to generate the outputs # If original files should be returned, then add the files if self.use_original_files: result = ResultSet() for ds_id, file_urls in self.original_file_urls.items(): result.add(ds_id, file_urls) file_uris = result.file_uris # else: generate the new subset of files else: clean_inputs(self.inputs) # use search result if available if self.search_result: self.inputs["collection"] = [] for file_uris in self.search_result.files().values(): self.inputs["collection"].append(FileMapper(file_uris)) try: file_uris = runner(self.inputs) except Exception as e: raise ProcessError(f"{e}") # print("orig files", self.use_original_files) # print("uris", file_uris) self.output_uris = file_uris