Source code for rook.utils.weighted_average_utils

from collections.abc import Sequence
from pathlib import Path

import numpy as np
import xarray as xr


from clisops.parameter.dimension_parameter import DimensionParameter

from clisops.ops.average import Average as ClisopsAverage
from clisops.utils.file_namers import get_file_namer
from rook.utils.ops.average import Average as OpsAverage


[docs] def calc_weighted_mean(ds): # fix cftime calendar ds["time"] = ds.indexes["time"].to_numpy() ds = ds.drop_vars(["time_bnds"], errors="ignore") # Generate weights with both lat & lon dimensions weights = np.cos(np.deg2rad(ds.lat)) weights = weights / weights.sum() # Normalize weights = weights.broadcast_like(ds) # Ensure shape matches ds weights.name = "weights" # apply weights ds_weighted = ds.weighted(weights) # apply mean ds_weighted_mean = ds_weighted.mean(["lat", "lon"], keep_attrs=True) return ds_weighted_mean
[docs] class WeightedAverage_(ClisopsAverage): # noqa: N801 def _get_file_namer(self): extra = "_w-avg" namer = get_file_namer(self._file_namer)(extra=extra) return namer def _calculate(self): avg_ds = calc_weighted_mean(self.ds) return avg_ds
def _weighted_average( ds: xr.Dataset | str, dims: Sequence[str] | DimensionParameter | None = None, ignore_undetected_dims: bool = False, output_dir: str | Path | None = None, output_type: str = "netcdf", split_method: str = "time:auto", file_namer: str = "standard", ) -> list[xr.Dataset | str]: op = WeightedAverage_(**locals()) return op.process()
[docs] def run_weighted_average(args): class _Average(OpsAverage): def get_operation_callable(self): return _weighted_average return _Average(**args).calculate().file_uris