Source code for rook.operator

import pathlib
import tempfile
from copy import deepcopy

from rook.director import wrap_director
from rook.utils.average_utils import (
    run_average_by_dim,
    run_average_by_shape,
    run_average_by_time,
)
from rook.utils.concat_utils import run_concat
from rook.utils.input_utils import resolve_to_file_paths
from rook.utils.regrid_utils import run_regrid
from rook.utils.subset_utils import run_subset
from rook.utils.weighted_average_utils import run_weighted_average
from clisops.utils.file_utils import is_file_list, FileMapper


[docs] class Operator: # Sub-classes require "prefix" property prefix = NotImplemented def __init__(self, output_dir): if isinstance(output_dir, pathlib.Path): output_dir_ = output_dir.as_posix() else: output_dir_ = output_dir self.config = { "output_dir": output_dir_, # "apply_fixes": apply_fixes, # 'original_files': original_files # 'chunk_rules': dconfig.chunk_rules, # 'filenamer': dconfig.filenamer, }
[docs] def call(self, args): # args.update(self.config) args["output_dir"] = self._get_output_dir() collection = args["collection"] # collection is a list runner = self._get_runner() if is_file_list(collection): # This block is called if this is NOT the first stage of a workflow, and # the collection will be a file list (one or more files) args["apply_fixes"] = False kwargs = deepcopy(args) file_paths = resolve_to_file_paths(args.get("collection")) kwargs["collection"] = FileMapper(file_paths) output_uris = runner(kwargs) # this needs to be in a list else: # This block is called when this is the first stage of a workflow director = wrap_director(collection, args, runner) output_uris = director.output_uris return output_uris
def _get_runner(self): return NotImplementedError def _get_output_dir(self): return tempfile.mkdtemp(dir=self.config["output_dir"], prefix=f"{self.prefix}_")
[docs] class Subset(Operator): prefix = "subset" def _get_runner(self): return run_subset
[docs] class AverageByTime(Operator): prefix = "average_time" def _get_runner(self): return run_average_by_time
[docs] class AverageByDimension(Operator): prefix = "average" def _get_runner(self): return run_average_by_dim
[docs] class AverageByShape(Operator): prefix = "average_shape" def _get_runner(self): return run_average_by_shape
[docs] class WeightedAverage(Operator): prefix = "weighted_average" def _get_runner(self): return run_weighted_average
[docs] class Regrid(Operator): prefix = "regrid" def _get_runner(self): return run_regrid
[docs] class Concat(Operator): prefix = "concat" def _get_runner(self): return run_concat