# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Module containing plugins for combining cubes"""
from operator import eq
from typing import Callable, List, Union
import iris
import numpy as np
from iris.coords import AuxCoord, CellMethod, DimCoord
from iris.cube import Cube, CubeList
from iris.exceptions import CoordinateNotFoundError
from improver import BasePlugin
from improver.metadata.amend import update_diagnostic_name
from improver.metadata.check_datatypes import enforce_dtype
from improver.metadata.constants.time_types import TIME_COORDS
from improver.metadata.forecast_times import rebadge_forecasts_as_latest_cycle
from improver.metadata.probabilistic import find_threshold_coordinate
from improver.utilities.common_input_handle import as_cubelist
from improver.utilities.cube_manipulation import (
enforce_coordinate_ordering,
expand_bounds,
filter_realizations,
strip_var_names,
)
[docs]
class Combine(BasePlugin):
"""Combine input cubes.
Combine the input cubes into a single cube using the requested operation.
The first cube in the input list provides the template for output metadata.
If coordinates are expanded as a result of this combine operation
(e.g. expanding time for accumulations / max in period) the upper bound of
the new coordinate will also be used as the point for the new coordinate, unless
midpoint_bound is True, in which case the midpoint of the bounds is used instead.
"""
[docs]
def __init__(
self,
operation: str,
broadcast: str = None,
minimum_realizations: Union[str, int, None] = None,
new_name: str = None,
cell_method_coordinate: str = None,
expand_bound: bool = True,
midpoint_bound: bool = False,
use_latest_frt: bool = False,
):
r"""
Args:
operation (str):
An operation to use in combining input cubes. One of:
+, -, \*, add, subtract, multiply, min, max, mean
broadcast (str):
If specified, broadcast input cubes to the stated coord prior
to combining - the coord must already exist on the first input
cube.
minimum_realizations (int):
If specified, the input cubes will be filtered to ensure that
only realizations that include all available lead times are
combined. If the number of realizations that meet this criteria
are fewer than this integer, an error will be raised. Minimum
value is 1.
new_name (str):
New name for the resulting dataset.
cell_method_coordinate (str):
If specified, a cell method is added to the output with the
coordinate provided. This is only available for max, min and
mean operations.
expand_bound:
If True then scalar coord bounds will be extended to represent
all cubes being combined. For example a time coordinate will
be set using the latest time available as the point and with
bounds describing the range. If false the first cube provided
sets the metadata and the scalar coordinates from this cube
will be used.
midpoint_bound:
If True, set the coordinate point to the midpoint of the bounds;
otherwise, use the upper bound. This is only used if expand_bound is also True.
use_latest_frt:
If True then the latest forecast_reference_time available
across the input cubes will be used for the output with a
suitably updated forecast_period.
"""
try:
self.minimum_realizations = int(minimum_realizations)
except TypeError:
if minimum_realizations is not None:
raise
self.minimum_realizations = None
self.new_name = new_name
self.broadcast = broadcast
self.cell_method_coordinate = cell_method_coordinate
self.expand_bound = expand_bound
self.midpoint_bound = midpoint_bound
self.use_latest_frt = use_latest_frt
self.plugin = CubeCombiner(
operation,
cell_method_coordinate=cell_method_coordinate,
broadcast=self.broadcast,
expand_bound=self.expand_bound,
midpoint_bound=self.midpoint_bound,
)
[docs]
def process(self, *cubes: Union[Cube, CubeList]) -> Cube:
"""
Preprocesses the cubes, then passes them to the appropriate plugin
Args:
cubes (iris.cube.CubeList or list of iris.cube.Cube):
An iris CubeList to be combined.
Returns:
result (iris.cube.Cube):
Returns a cube with the combined data.
Raises:
TypeError:
If input list of cubes is empty
ValueError:
If minimum_realizations aren't met, or less than one were requested.
"""
cubes = as_cubelist(*cubes)
if self.new_name is None:
self.new_name = cubes[0].name()
if self.minimum_realizations is None:
filtered_cubes = cubes
else:
if self.minimum_realizations < 1:
raise ValueError(
f"Minimum realizations must be at least 1, not {self.minimum_realizations}"
)
cube = filter_realizations(cubes)
realization_count = len(cube.coord("realization").points)
if realization_count < self.minimum_realizations:
raise ValueError(
f"After filtering, number of realizations {realization_count} "
"is less than the minimum number of realizations allowed "
f"({self.minimum_realizations})"
)
filtered_cubes = cube.slices_over("time")
if self.use_latest_frt:
filtered_cubes = rebadge_forecasts_as_latest_cycle(filtered_cubes)
return self.plugin(CubeList(filtered_cubes), self.new_name)
[docs]
def masked_add(
masked_array: np.ma.MaskedArray, masked_array_2: np.ma.MaskedArray
) -> np.ma.MaskedArray:
"""
Operation to add two masked arrays treating masked points as 0.
Args:
masked_array (numpy.ma.MaskedArray):
An array that may be masked.
masked_array_2 (numpy.ma.MaskedArray):
An array that may be masked.
Returns:
numpy.ma.MaskedArray:
The sum of the two masked arrays with masked points treated as 0.
"""
new_array_1 = np.ma.filled(masked_array, 0)
new_array_2 = np.ma.filled(masked_array_2, 0)
new_mask = np.ma.getmask(masked_array) * np.ma.getmask(masked_array_2)
summed_cube = np.ma.MaskedArray(np.add(new_array_1, new_array_2), mask=new_mask)
return summed_cube
[docs]
class CubeCombiner(BasePlugin):
"""Plugin for combining cubes using linear operators"""
COMBINE_OPERATORS = {
"+": np.add,
"add": np.add,
"-": np.subtract,
"subtract": np.subtract,
"*": np.multiply,
"multiply": np.multiply,
"max": np.maximum,
"min": np.minimum,
"mean": np.add, # mean is calculated in two steps: sum and normalise
"masked_add": masked_add, # masked_add sums arrays but treats masked points as 0
}
[docs]
def __init__(
self,
operation: str,
cell_method_coordinate: str = None,
broadcast: str = None,
expand_bound: bool = True,
midpoint_bound: bool = False,
) -> None:
"""Create a CubeCombiner plugin
Args:
operation:
Operation (+, - etc) to apply to the incoming cubes.
cell_method_coordinate (str):
If specified, a cell method is added to the output with the
coordinate provided. This is only available for max, min and
mean operations.
broadcast (str):
If specified, broadcast input cubes to the stated coord prior
to combining - the coord must already exist on the first input
cube.
expand_bound:
If True then scalar coord bounds will be extended to represent
all cubes being combined. For example a time coordinate will
be set using the latest time available as the point and with
bounds describing the range. If false the first cube provided
sets the metadata and the scalar coordinates from this cube
will be used.
midpoint_bound:
If True, set the coordinate point to the midpoint of the bounds;
otherwise, use the upper bound. This is only used if expand_bound is also True.
Raises:
ValueError: if operation is not recognised in dictionary
"""
self.operation = operation
self.cell_method_coordinate = cell_method_coordinate
try:
self.operator = self.COMBINE_OPERATORS[operation]
except KeyError:
msg = "Unknown operation {}".format(operation)
raise ValueError(msg)
self.broadcast = broadcast
self.normalise = operation == "mean"
self.expand_bound = expand_bound
self.midpoint_bound = midpoint_bound
[docs]
@staticmethod
def _check_dimensions_match(
cube_list: Union[List[Cube], CubeList], comparators: List[Callable] = [eq]
) -> None:
"""
Check all coordinate dimensions on the input cubes match according to
the comparators specified.
The var_name attributes on input cubes and coordinates are ignored during these
checks, except where the attribute is required to support probabilistic metadata.
This is to ensure consistency of behaviour with the MergeCubes plugin in
/utilities/cube_manipulation.py.
Args:
cube_list:
List of cubes to compare
comparators:
Comparison operators, at least one of which must return "True"
for each coordinate in order for the match to be valid
Raises:
ValueError: If dimension coordinates do not match
"""
test_cube_list = iris.cube.CubeList(cube_list.copy())
strip_var_names(test_cube_list)
ref_coords = test_cube_list[0].coords(dim_coords=True)
for cube in test_cube_list[1:]:
coords = cube.coords(dim_coords=True)
compare = [
np.any([comp(a, b) for comp in comparators])
for a, b in zip(coords, ref_coords)
]
if not np.all(compare):
msg = (
"Cannot combine cubes with different dimensions:\n"
"{} and {}".format(repr(cube_list[0]), repr(cube))
)
raise ValueError(msg)
[docs]
@staticmethod
def _get_expanded_coord_names(cube_list: Union[List[Cube], CubeList]) -> List[str]:
"""
Get names of coordinates whose bounds need expanding and points
recalculating after combining cubes. These are the scalar coordinates
that are present on all input cubes, but have different values.
Args:
cube_list:
List of cubes to that will be combined
Returns:
List of coordinate names to expand
"""
shared_scalar_coords = {
coord.name() for coord in cube_list[0].coords(dim_coords=False)
}
for cube in cube_list[1:]:
cube_scalar_coords = {
coord.name() for coord in cube.coords(dim_coords=False)
}
shared_scalar_coords = shared_scalar_coords & cube_scalar_coords
expanded_coords = []
for cube in cube_list[1:]:
for coord in shared_scalar_coords:
if (
cube.coord(coord) != cube_list[0].coord(coord)
and coord not in expanded_coords
):
expanded_coords.append(coord)
return expanded_coords
[docs]
def _add_cell_method(self, cube: Cube) -> None:
"""Add a cell method to record the operation undertaken.
Args:
cube:
Cube to which a cell method will be added.
Raises:
ValueError: If a cell_method_coordinate is provided and the operation
is not max, min or mean.
"""
cell_method_lookup = {"max": "maximum", "min": "minimum", "mean": "mean"}
if self.operation in ["max", "min", "mean"] and self.cell_method_coordinate:
cube.add_cell_method(
CellMethod(
cell_method_lookup[self.operation],
coords=self.cell_method_coordinate,
)
)
elif self.cell_method_coordinate:
msg = (
"A cell method coordinate has been produced with "
f"operation: {self.operation}. A cell method coordinate "
"can only be added if the operation is max, min or mean."
)
raise ValueError(msg)
[docs]
def _combine_cube_data(self, cube_list: Union[List[Cube], CubeList]) -> Cube:
"""
Perform cumulative operation to combine cube data
Args:
cube_list
Returns:
Combined cube
Raises:
TypeError: if the operation results in an escalated datatype
"""
result = cube_list[0].copy()
# Slice over realization if possible to reduce memory usage.
if (
"realization" in [crd.name() for crd in result.coords(dim_coords=True)]
and self.broadcast != "realization"
):
rslices = iris.cube.CubeList(result.slices_over("realization"))
for cube in cube_list[1:]:
cslices = cube.slices_over("realization")
for rslice, cslice in zip(rslices, cslices):
rslice.data = self.operator(rslice.data, cslice.data)
result = rslices.merge_cube()
enforce_coordinate_ordering(
result, [d.name() for d in cube_list[0].coords(dim_coords=True)]
)
else:
for cube in cube_list[1:]:
result.data = self.operator(result.data, cube.data)
if self.normalise:
result.data = np.divide(result.data, len(cube_list))
enforce_dtype(str(self.operator), cube_list, result)
return result
[docs]
def _setup_coords_for_broadcast(self, cube_list: CubeList) -> CubeList:
"""
Adds a scalar threshold to any subsequent cube in cube_list so that they all
match the dimensions, in order, of the first cube in the list
Args:
cube_list
Returns:
Updated version of cube_list
Raises:
CoordinateNotFoundError: if there is no threshold coordinate on the
first cube in the list
TypeError: if there is a scalar threshold coordinate on any of the
later cubes, which would indicate that the cube is only valid for
a single threshold and should not be broadcast to all thresholds.
"""
target_cube = cube_list[0]
if self.broadcast == "threshold":
self.broadcast = find_threshold_coordinate(cube_list[0]).name()
try:
target_coord = target_cube.coord(self.broadcast)
except CoordinateNotFoundError:
raise CoordinateNotFoundError(
f"Cannot find coord {self.broadcast} in {repr(target_cube)} to broadcast to"
)
new_list = CubeList([])
for cube in cube_list:
try:
found_coord = cube.coord(target_coord)
except CoordinateNotFoundError:
new_coord = target_coord.copy([0], bounds=None)
cube = cube.copy()
cube.add_aux_coord(new_coord, None)
cube = iris.util.new_axis(cube, new_coord)
enforce_coordinate_ordering(
cube, [d.name() for d in target_cube.coords(dim_coords=True)]
)
else:
if found_coord not in cube.dim_coords:
msg = "Cannot broadcast to coord threshold as it already exists as an AuxCoord"
raise TypeError(msg)
new_list.append(cube)
return new_list
[docs]
@staticmethod
def _coords_are_broadcastable(coord1: DimCoord, coord2: DimCoord) -> bool:
"""
Broadcastable coords will differ only in length, so create a copy of one with
the points and bounds of the other and compare. Also ensure length of at least
one of the coords is 1.
"""
coord_copy = coord1.copy(coord2.points, bounds=coord2.bounds)
return (coord_copy == coord2) and (
(len(coord1.points) == 1) or (len(coord2.points) == 1)
)
[docs]
def process(
self, cube_list: Union[List[Cube], CubeList], new_diagnostic_name: str
) -> Cube:
"""
Combine data and metadata from a list of input cubes into a single
cube, using the specified operation to combine the cube data. The
first cube in the input list provides the template for the combined
cube metadata.
If coordinates are expanded as a result of this combine operation
(e.g. expanding time for accumulations / max in period) the upper bound
of the new coordinate will also be used as the point for the new coordinate.
Args:
cube_list:
List of cubes to combine.
new_diagnostic_name:
New name for the combined diagnostic.
Returns:
Cube containing the combined data.
Raises:
ValueError: If the cube_list contains only one cube.
"""
if len(cube_list) < 2:
msg = "Expecting 2 or more cubes in cube_list"
raise ValueError(msg)
if self.broadcast is not None:
cube_list = self._setup_coords_for_broadcast(cube_list=cube_list)
self._check_dimensions_match(
cube_list, comparators=[eq, self._coords_are_broadcastable]
)
else:
self._check_dimensions_match(cube_list, comparators=[eq])
result = self._combine_cube_data(cube_list)
expanded_coord_names = self._get_expanded_coord_names(cube_list)
if self.operation in ["*", "multiply"]:
update_diagnostic_name(cube_list[0], new_diagnostic_name, result)
else:
if expanded_coord_names and self.expand_bound:
result = expand_bounds(
result,
cube_list,
expanded_coord_names,
midpoint_bound=self.midpoint_bound,
)
result.rename(new_diagnostic_name)
self._add_cell_method(result)
return result
[docs]
class MaxInTimeWindow(BasePlugin):
"""Find the maximum within a time window for a period diagnostic. For example,
find the maximum 3-hour precipitation accumulation within a 24 hour window."""
[docs]
def __init__(self, minimum_realizations: Union[str, int, None] = None):
"""Initialise class.
Args:
minimum_realizations (int):
If specified, the input cubes will be filtered to ensure that only realizations that
include all available lead times are combined. If the number of realizations that
meet this criteria are fewer than this integer, an error will be raised.
Minimum value is 1.
"""
self.minimum_realizations = minimum_realizations
self.time_units_in_hours = TIME_COORDS["time"].units.replace("seconds", "hours")
[docs]
def _get_coords_in_hours(
self, cubes: List[Cube]
) -> List[Union[AuxCoord, DimCoord]]:
"""Get the time coordinates from the input cubes in units of hours
since 1970-01-01 00:00:00.
Args:
cubes: Cubes from which the time coordinates will be extracted.
Returns:
The time coordinates extracted from the input cubes.
"""
coords = [c.coord("time").copy() for c in cubes]
[c.convert_units(self.time_units_in_hours) for c in coords]
return coords
[docs]
def process(self, cubes: CubeList) -> Cube:
"""Compute the maximum probability or maximum diagnostic value within a
time window for a period diagnostic using the Combine plugin. The resulting
cube has a time coordinate with bounds that represent the time window whilst
the cell method has been updated to represent the period recorded on the input
cubes. For example, the time window might be 24 hours, whilst the period might
be 3 hours.
Args:
cubes (iris.cube.CubeList or list of iris.cube.Cube):
An iris CubeList to be combined.
Returns:
result (iris.cube.Cube):
Returns a cube with the combined data.
"""
coords_in_hours = self._get_coords_in_hours(cubes)
self._check_input_cubes(coords_in_hours)
cube = Combine("max", minimum_realizations=self.minimum_realizations)(cubes)
cube = self._correct_metadata(cube, coords_in_hours)
return cube