Source code for rook.director.alignment

from pathlib import Path

from clisops.parameter import time_parameter
from clisops.utils.time_utils import to_isoformat
from clisops.project_utils import url_to_file_path

import xarray as xr


[docs] class SubsetAlignmentChecker: def __init__(self, input_files, inputs): self.input_files = sorted(input_files) self.is_aligned = False self.aligned_files = [] self._deduce_alignment(inputs) def _deduce_alignment(self, inputs): # At present, we reject alignment if any "time_components", "area", "shape" or "level" subset is requested if ( inputs.get("time_components", None) or inputs.get("area", None) or inputs.get("level", None) or inputs.get("shape", None) ): return time = inputs.get("time", None) # add in a catch for if time bounds are None # this means is_aligned = True and all files are needed if time is None: self.is_aligned = True self.aligned_files = self.input_files return else: start, end = time_parameter.TimeParameter(time).get_bounds() self._check_time_alignment(start, end) def _get_file_times(self, fpath): # get start and end times from the time dimension in the file # convert url to file path if needed if Path(fpath).as_posix().startswith("http"): fpath = url_to_file_path(fpath) try: time_coder = xr.coders.CFDatetimeCoder(use_cftime=True) ds = xr.open_dataset(fpath, decode_times=time_coder) except (AttributeError, TypeError): ds = xr.open_dataset(fpath, use_cftime=True) start = to_isoformat(ds.time.values[0]) end = to_isoformat(ds.time.values[-1]) ds.close() return start, end def _check_time_alignment(self, start, end): """ Check if data files can be aligned with start and end time. Loops through all data files to check if the `start` and `end` can be aligned with the exact start or end time in the file(s). If both the `start` and the `end` are aligned then the following properties are set: - self.aligned_files = [list of matching files in range] - self.is_aligned = True If the `start` is before the start time of the first file and/or the `end` is after the end time of the last file then that is considered a valid match to the required time range. """ # Set matches as a counter to see if we get valid time alignment. # Must result in matches==2 in order to be valid. matches = 0 # First of all truncate requested range to actual range if it extends # beyond the actual range in the files start_in_files, _ = self._get_file_times(self.input_files[0]) _, end_in_files = self._get_file_times(self.input_files[-1]) if start < start_in_files: start = start_in_files if end > end_in_files: end = end_in_files # Now go through files to check alignment for fpath in self.input_files: fstart, fend = self._get_file_times(fpath) # Break out if start of file is beyond end of requested range if fstart > end: break if fstart == start: matches += 1 if fend == end: matches += 1 if fstart >= start or end <= fend: self.aligned_files.append(fpath) if matches != 2: self.aligned_files.clear() return else: self.is_aligned = True return