# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Module containing classes for doing weighted blending by collapsing a
whole dimension."""
import warnings
from typing import List, Optional, Union
import iris
import numpy as np
from iris.analysis import Aggregator
from iris.coords import AuxCoord
from iris.cube import Cube, CubeList
from iris.exceptions import CoordinateNotFoundError
from numpy import ndarray
from improver import BasePlugin, PostProcessingPlugin
from improver.blending import MODEL_BLEND_COORD, MODEL_NAME_COORD
from improver.blending.utilities import find_blend_dim_coord, store_record_run_as_coord
from improver.metadata.constants import FLOAT_DTYPE, PERC_COORD
from improver.metadata.forecast_times import rebadge_forecasts_as_latest_cycle
from improver.utilities.complex_conversion import complex_to_deg, deg_to_complex
from improver.utilities.cube_manipulation import (
MergeCubes,
collapsed,
enforce_coordinate_ordering,
get_coord_names,
get_dim_coord_names,
sort_coord_in_cube,
)
[docs]
class MergeCubesForWeightedBlending(BasePlugin):
"""Prepares cubes for cycle and grid blending"""
[docs]
def __init__(
self,
blend_coord: str,
weighting_coord: Optional[str] = None,
model_id_attr: Optional[str] = None,
record_run_attr: Optional[str] = None,
) -> None:
"""
Initialise the class
Args:
blend_coord:
Name of coordinate over which blending will be performed. For
multi-model blending this is flexible to any string containing
"model". For all other coordinates this is prescriptive:
cube.coord(blend_coord) must return an iris.coords.Coord
instance for all cubes passed into the "process" method.
weighting_coord:
The coordinate across which weights will be scaled in a
multi-model blend.
model_id_attr:
Name of attribute used to identify model for grid blending.
record_run_attr:
Name of attribute used to record models and cycles blended.
Ignored if None.
Raises:
ValueError:
If trying to blend over model when model_id_attr is not set
"""
if "model" in blend_coord and model_id_attr is None:
raise ValueError(
"model_id_attr required to blend over {}".format(blend_coord)
)
# ensure model coordinates are not created for non-model blending
if "model" not in blend_coord and (
model_id_attr is not None and record_run_attr is None
):
warnings.warn(
"model_id_attr not required for blending over {} - "
"will be ignored".format(blend_coord)
)
model_id_attr = None
self.blend_coord = blend_coord
self.weighting_coord = weighting_coord
self.model_id_attr = model_id_attr
self.record_run_attr = record_run_attr
[docs]
def _create_model_coordinates(self, cubelist: Union[List[Cube], CubeList]) -> None:
"""
Adds numerical model ID and string model configuration scalar
coordinates to input cubes if self.model_id_attr is specified.
Sets the original attribute value to "blend", in anticipation.
Modifies cubes in place.
Args:
cubelist:
List of cubes to be merged for blending
Raises:
ValueError:
If self.model_id_attr is not present on all cubes
ValueError:
If input cubelist contains cubes from the same model
"""
model_titles = []
for i, cube in enumerate(cubelist):
if self.model_id_attr not in cube.attributes:
msg = (
"Cannot create model ID coordinate for grid blending "
'as "model_id_attr={}" was not found within the cube '
"attributes".format(self.model_id_attr)
)
raise ValueError(msg)
model_title = cube.attributes.pop(self.model_id_attr)
if model_title in model_titles:
raise ValueError(
"Cannot create model dimension coordinate with duplicate points"
)
model_titles.append(model_title)
new_model_id_coord = AuxCoord(
np.array([1000 * i], dtype=np.int32),
units="1",
long_name=MODEL_BLEND_COORD,
)
new_model_coord = AuxCoord(
[model_title], units="no_unit", long_name=MODEL_NAME_COORD
)
cube.add_aux_coord(new_model_id_coord)
cube.add_aux_coord(new_model_coord)
[docs]
@staticmethod
def _remove_blend_time(cube: Cube) -> Cube:
"""If present on input, remove existing blend time coordinate (as this will
be replaced on blending)"""
if "blend_time" in get_coord_names(cube):
cube.remove_coord("blend_time")
return cube
[docs]
@staticmethod
def _remove_deprecation_warnings(cube: Cube) -> Cube:
"""Remove deprecation warnings from forecast period and forecast reference
time coordinates so that these can be merged before blending"""
for coord in ["forecast_period", "forecast_reference_time"]:
try:
cube.coord(coord).attributes.pop("deprecation_message", None)
except CoordinateNotFoundError:
pass
return cube
[docs]
def process(
self,
cubes_in: Union[List[Cube], Cube, CubeList],
cycletime: Optional[str] = None,
) -> Cube:
"""
Prepares merged input cube for cycle and grid blending. Makes updates to
metadata (attributes and time-type coordinates) ONLY in so far as these are
needed to ensure inputs can be merged into a single cube.
Args:
cubes_in:
Cubes to be merged.
cycletime:
The cycletime in a YYYYMMDDTHHMMZ format e.g. 20171122T0100Z.
Can be used in rationalise_blend_time_coordinates.
Returns:
Merged cube.
Raises:
ValueError:
If self.blend_coord is not present on all cubes (unless
blending over models)
"""
cubelist = (
[cubes_in.copy()]
if isinstance(cubes_in, iris.cube.Cube)
else [cube.copy() for cube in cubes_in]
)
if self.record_run_attr is not None and self.model_id_attr is not None:
store_record_run_as_coord(
cubelist, self.record_run_attr, self.model_id_attr
)
if "model" in self.blend_coord:
cubelist = [self._remove_blend_time(cube) for cube in cubelist]
cubelist = [self._remove_deprecation_warnings(cube) for cube in cubelist]
if (
self.weighting_coord is not None
and "forecast_period" in self.weighting_coord
):
# if blending models using weights by forecast period, unify
# forecast periods (assuming validity times are identical);
# method returns a new cubelist with copies of input cubes
cubelist = rebadge_forecasts_as_latest_cycle(
cubelist, cycletime=cycletime
)
# check all input cubes have the blend coordinate
for cube in cubelist:
if "model" not in self.blend_coord and not cube.coords(self.blend_coord):
raise ValueError(
"{} coordinate is not present on all input " "cubes".format(
self.blend_coord
)
)
# create model ID and model configuration coordinates if blending
# different models
if "model" in self.blend_coord and self.model_id_attr is not None:
self._create_model_coordinates(cubelist)
# merge resulting cubelist
result = MergeCubes()(cubelist, check_time_bounds_ranges=True)
return result
[docs]
class PercentileBlendingAggregator:
"""Class for the percentile blending aggregator
This class implements the method described by Combining Probabilities
by Caroline Jones, 2017. This method implements blending in probability
space.
The steps are:
1. At each geographic point in the cube we take the percentile
threshold's values across the percentile dimensional coordinate.
We recalculate, using linear interpolation, their probabilities
in the pdf of the other points across the coordinate we are
blending over. Thus at each point we have a set of thresholds
and their corresponding probability values in each of the
probability spaces across the blending coordinate.
2. We do a weighted blend across all the probability spaces,
combining all the thresholds in all the points in the coordinate
we are blending over. This gives us an array of thresholds and an
array of blended probabilities for each of the grid points.
3. We convert back to the original percentile values, again using
linear interpolation, resulting in blended values at each of the
original percentiles.
References:
:download:`Combining Probabilities by Caroline Jones, 2017
<../files/Combining_Probabilities.pdf>`
"""
[docs]
@staticmethod
def aggregate(
data: ndarray, axis: int, percentiles: ndarray, arr_weights: ndarray
) -> ndarray:
"""
Function to blend percentile data over a given dimension.
The input percentile data must be provided with the blend coord as the
first axis and the percentile coord as the second (these are re-ordered
after aggregator initialisation at the start of this function). Weights
data must be provided with the blend coord as the first dimension.
Args:
data:
Array containing the data to blend.
axis:
The index of the coordinate dimension in the cube. This
dimension will be aggregated over.
percentiles:
Array of percentile values e.g [0, 20.0, 50.0, 70.0, 100.0],
same size as the percentile (second) dimension of data.
arr_weights:
Array of weights, same shape as data, but without the percentile
dimension (weights do not vary with percentile).
Note "weights" has special meaning in Aggregator, hence
using a different name for this variable.
Returns:
Array containing the weighted percentile blend data across
the chosen coord. The dimension associated with axis
has been collapsed, and the rest of the dimensions remain.
"""
# Iris aggregators support indexing from the end of the array.
if axis < 0:
axis += data.ndim
# Blend coordinate is moved to the -1 position in initialisation; move
# this back to the leading coordinate
data = np.moveaxis(data, [axis], [0])
if len(data) != len(arr_weights):
raise ValueError("Weights shape does not match data")
# Flatten data and weights over non-blend and percentile dimensions
grid_shape = data.shape[2:]
grid_points = np.prod(grid_shape, dtype=int)
flattened_shape = [data.shape[0], data.shape[1], grid_points]
data = data.reshape(flattened_shape)
weights_shape = [data.shape[0], grid_points]
arr_weights = arr_weights.reshape(weights_shape)
# Find the blended percentile values at each point in the flattened data
result = np.zeros(flattened_shape[1:], dtype=FLOAT_DTYPE)
for i in range(data.shape[-1]):
result[:, i] = PercentileBlendingAggregator.blend_percentiles(
data[:, :, i], percentiles, arr_weights[:, i]
)
# Reshape the data with a leading percentile dimension
shape = percentiles.shape + grid_shape
result = result.reshape(shape)
return result
[docs]
@staticmethod
def blend_percentiles(
perc_values: ndarray, percentiles: ndarray, weights: ndarray
) -> ndarray:
"""Blend percentiles function, to calculate the weighted blend across
a given axis of percentile data for a single grid point.
Args:
perc_values:
Array containing the percentile values to blend, with
shape: (length of coord to blend, num of percentiles)
percentiles:
Array of percentile values e.g [0, 20.0, 50.0, 70.0, 100.0],
same size as the percentile dimension of data.
weights:
Array of weights, same size as the axis dimension of data,
that we will blend over.
Returns:
Array containing the weighted percentile blend data
across the chosen coord
"""
inputs_to_blend = perc_values.shape[0]
combined_cdf = np.zeros((inputs_to_blend, len(percentiles)), dtype=FLOAT_DTYPE)
# Loop over the axis we are blending over finding the values for the
# probability at each threshold in the cdf, for each of the other
# points in the axis we are blending over.
# Then add the probabilities multiplied by the correct weight to the
# running total.
for i in range(0, inputs_to_blend):
interp_values = np.reshape(
np.interp(perc_values, perc_values[i], percentiles),
(inputs_to_blend, len(percentiles)),
)
interp_values[i] = percentiles
combined_cdf += interp_values * weights[i]
# Combine and sort the threshold values for all the points
# we are blending.
combined_perc_thres_data = np.sort(perc_values.flatten())
# Combine and sort blended probability values.
combined_perc_values = np.sort(combined_cdf.flatten())
# Find the percentile values from this combined data by interpolating
# back from probability values to the original percentiles.
new_combined_perc = np.interp(
percentiles, combined_perc_values, combined_perc_thres_data
).astype(FLOAT_DTYPE)
return new_combined_perc
[docs]
class WeightedBlendAcrossWholeDimension(PostProcessingPlugin):
"""Apply a Weighted blend to a cube, collapsing across the whole
dimension. Uses one of two methods, either weighted average, or
the maximum of the weighted probabilities."""
[docs]
def __init__(self, blend_coord: str, timeblending: bool = False) -> None:
"""Set up for a Weighted Blending plugin
Args:
blend_coord:
The name of the coordinate dimension over which the cube will
be blended.
timeblending:
With the default of False the cube being blended will be
checked to ensure that slices across the blending coordinate
all have the same validity time. Setting this to True will
bypass this test, as is necessary for triangular time
blending.
Raises:
ValueError: If the blend coordinate is "threshold".
"""
if blend_coord == "threshold":
msg = "Blending over thresholds is not supported"
raise ValueError(msg)
self.blend_coord = blend_coord
self.timeblending = timeblending
self.cycletime = None
self.crds_to_remove = None
def __repr__(self) -> str:
"""Represent the configured plugin instance as a string."""
description = (
"<WeightedBlendAcrossWholeDimension: coord = {}, timeblending: {}>"
)
return description.format(self.blend_coord, self.timeblending)
[docs]
@staticmethod
def check_percentile_coord(cube: Cube) -> bool:
"""
Determines if the cube to be blended has a percentile dimension
coordinate.
Args:
cube:
The cube to be checked for a percentile coordinate.
Returns:
True if there is a multi-valued percentile dimension; False if not
Raises:
ValueError : If there is a percentile coord and it is not a
dimension coord in the cube.
ValueError : If there is a percentile dimension with only one
point, we need at least two points in order to do the blending.
"""
try:
perc_coord = cube.coord(PERC_COORD)
perc_dim = cube.coord_dims(PERC_COORD)
if not perc_dim:
msg = "The percentile coord must be a dimension of the cube."
raise ValueError(msg)
# Check the percentile coordinate has more than one point,
# otherwise raise an error as we won't be able to blend.
if len(perc_coord.points) < 2.0:
msg = (
"Percentile coordinate does not have enough points"
" in order to blend. Must have at least 2 percentiles."
)
raise ValueError(msg)
return True
except CoordinateNotFoundError:
return False
[docs]
def check_compatible_time_points(self, cube: Cube) -> None:
"""
Check that the time coordinate only contains a single time. Data
varying over the blending coordinate should all be for the same
validity time unless we are triangular time blending. In this case the
timeblending flag should be true and this function will not raise an
exception.
Args:
cube:
The cube upon which the compatibility of the time coords is
being checked.
Raises:
ValueError : If blending over forecast reference time on a cube
with multiple times.
"""
if self.timeblending:
return
time_points = cube.coord("time").points
if len(set(time_points)) > 1:
msg = (
"Attempting to blend data for different validity times. The"
" time points within the input cube are {}".format(time_points)
)
raise ValueError(msg)
[docs]
@staticmethod
def shape_weights(cube: Cube, weights: Cube) -> ndarray:
"""
The function shapes weights to match the diagnostic cube. A cube of
weights that vary across the blending coordinate will be broadcast to
match the complete multidimensional cube shape. A multidimensional cube
of weights will be checked to ensure that the coordinate names match
between the two cubes. If they match the order will be enforced and
then the shape will be checked. If the shapes match the weights will be
returned as an array.
Args:
cube:
The data cube on which a coordinate is being blended.
weights:
Cube of blending weights.
Returns:
An array of weights that matches the cube data shape.
Raises:
ValueError: If weights cube coordinates do not match the diagnostic
cube in the case of a multidimensional weights cube.
ValueError: If weights cube shape is not broadcastable to the data
cube shape.
"""
# Check that a multidimensional weights cube has coordinates that match
# the diagnostic cube. Checking names only to not to be too exacting.
weight_dims = get_dim_coord_names(weights)
cube_dims = get_dim_coord_names(cube)
if set(weight_dims) == set(cube_dims):
enforce_coordinate_ordering(weights, cube_dims)
weights_array = weights.data.astype(FLOAT_DTYPE)
else:
# Map array of weights to shape of cube to collapse.
dim_map = []
dim_coords = [coord.name() for coord in weights.dim_coords]
# Loop through dim coords in weights cube and find the dim the
# coord relates to in the cube we are collapsing.
for dim_coord in dim_coords:
try:
dim_map.append(cube.coord_dims(dim_coord)[0])
except CoordinateNotFoundError:
message = (
"{} is a coordinate on the weights cube but it is not "
"found on the cube we are trying to collapse."
)
raise ValueError(message.format(dim_coord))
try:
weights_array = iris.util.broadcast_to_shape(
np.array(weights.data, dtype=FLOAT_DTYPE),
cube.shape,
tuple(dim_map),
)
except ValueError:
msg = (
"Weights cube is not a compatible shape with the"
" data cube. Weights: {}, Diagnostic: {}".format(
weights.shape, cube.shape
)
)
raise ValueError(msg)
return weights_array
[docs]
@staticmethod
def _normalise_weights(weights: ndarray) -> ndarray:
"""
Checks that weights across the leading blend dimension sum up to 1. If
not, normalise across the blending dimension ignoring any points at which
the sum of weights is zero.
Args:
weights:
Array of weights shaped to match the data cube.
Returns:
Weights normalised along the (leading) blend dimension
"""
sum_of_weights = np.sum(weights, axis=0)
sum_of_non_zero_weights = sum_of_weights[sum_of_weights > 0]
if not (np.isclose(sum_of_non_zero_weights, 1)).all():
weights = np.where(
sum_of_weights > 0, np.divide(weights, sum_of_weights), 0
)
return weights
[docs]
def get_weights_array(self, cube: Cube, weights: Optional[Cube]) -> ndarray:
"""
Given a 1 or multidimensional cube of weights, reshape and broadcast
these to the shape of the data cube. If no weights are provided, an
array of weights is returned that equally weights all slices across
the blending coordinate.
Args:
cube:
Template cube to reshape weights, with a leading blend coordinate
weights:
Cube of initial blending weights or None
Returns:
An array of weights that matches the template cube shape.
"""
if weights:
weights_array = self.shape_weights(cube, weights)
else:
(number_of_fields,) = cube.coord(self.blend_coord).shape
weight = FLOAT_DTYPE(1.0 / number_of_fields)
weights_array = np.broadcast_to(weight, cube.shape)
return weights_array
[docs]
def percentile_weighted_mean(self, cube: Cube, weights: Optional[Cube]) -> Cube:
"""
Blend percentile data using the weights provided.
Args:
cube:
The cube which is being blended over self.blend_coord. Assumes
self.blend_coord and percentile are leading coordinates (enforced
in process).
weights:
Cube of blending weights.
Returns:
The cube with percentile values blended over self.blend_coord,
with suitable weightings applied.
"""
non_perc_slice = next(cube.slices_over(PERC_COORD))
weights_array = self.get_weights_array(non_perc_slice, weights)
weights_array = self._normalise_weights(weights_array)
# Set up aggregator
PERCENTILE_BLEND = Aggregator(
"mean", # Use CF-compliant cell method.
PercentileBlendingAggregator.aggregate,
)
cube_new = collapsed(
cube,
self.blend_coord,
PERCENTILE_BLEND,
percentiles=cube.coord(PERC_COORD).points,
arr_weights=weights_array,
)
return cube_new
[docs]
def weighted_mean(self, cube: Cube, weights: Optional[Cube]) -> Cube:
"""
Blend data using a weighted mean using the weights provided. Circular
data identified with a unit of "degrees" are blended appropriately.
Args:
cube:
The cube which is being blended over self.blend_coord.
Assumes leading blend dimension (enforced in process)
weights:
Cube of blending weights or None.
Returns:
The cube with values blended over self.blend_coord, with
suitable weightings applied.
"""
# If units are degrees, convert degrees to complex numbers.
if cube.units == "degrees":
cube.data = deg_to_complex(cube.data)
weights_array = self.get_weights_array(cube, weights)
slice_dim = 1
allow_slicing = cube.ndim > 3
if allow_slicing:
cube_slices = cube.slices_over(slice_dim)
else:
cube_slices = [cube]
weights_slices = (
np.moveaxis(weights_array, slice_dim, 0)
if allow_slicing
else [weights_array]
)
result_slices = iris.cube.CubeList(
collapsed(c_slice, self.blend_coord, iris.analysis.MEAN, weights=w_slice)
for c_slice, w_slice in zip(cube_slices, weights_slices)
)
result = result_slices.merge_cube() if allow_slicing else result_slices[0]
# If units are degrees, convert complex numbers back to degrees.
if cube.units == "degrees":
result.data = complex_to_deg(result.data)
return result
[docs]
def process(self, cube: Cube, weights: Optional[Cube] = None) -> Cube:
"""Calculate weighted blend across the chosen coord, for either
probabilistic or percentile data. If there is a percentile
coordinate on the cube, it will blend using the
PercentileBlendingAggregator but the percentile coordinate must
have at least two points.
Args:
cube:
Cube to blend across the coord.
weights:
Cube of blending weights. This will have 1 or 3 dimensions,
corresponding either to blend dimension on the input cube with or
without and additional 2 spatial dimensions. If None, the input cube
is blended with equal weights across the blending dimension.
Returns:
Containing the weighted blend across the chosen coordinate (typically
forecast reference time or model).
Raises:
TypeError : If the first argument not a cube.
CoordinateNotFoundError : If coordinate to be collapsed not found
in cube.
CoordinateNotFoundError : If coordinate to be collapsed not found
in provided weights cube.
ValueError : If coordinate to be collapsed is not a dimension.
"""
if not isinstance(cube, iris.cube.Cube):
msg = (
"The first argument must be an instance of iris.cube.Cube "
"but is {}.".format(type(cube))
)
raise TypeError(msg)
if not cube.coords(self.blend_coord):
msg = "Coordinate to be collapsed not found in cube."
raise CoordinateNotFoundError(msg)
output_dims = get_dim_coord_names(next(cube.slices_over(self.blend_coord)))
self.blend_coord = find_blend_dim_coord(cube, self.blend_coord)
# Ensure input cube and weights cube are ordered equivalently along
# blending coordinate.
cube = sort_coord_in_cube(cube, self.blend_coord)
if weights is not None:
if not weights.coords(self.blend_coord):
msg = "Coordinate to be collapsed not found in weights cube."
raise CoordinateNotFoundError(msg)
weights = sort_coord_in_cube(weights, self.blend_coord)
# Check that the time coordinate is single valued if required.
self.check_compatible_time_points(cube)
# Do blending and update metadata.
if self.check_percentile_coord(cube):
enforce_coordinate_ordering(cube, [self.blend_coord, "percentile"])
result = self.percentile_weighted_mean(cube, weights)
else:
enforce_coordinate_ordering(cube, [self.blend_coord])
result = self.weighted_mean(cube, weights)
# Reorder resulting dimensions to match input
enforce_coordinate_ordering(result, output_dims)
return result