# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Module to create the weights used to blend data."""
import copy
from typing import Any, Dict, List, Optional, Tuple, Union
import cf_units
import iris
import numpy as np
from cf_units import Unit
from iris.coords import Coord
from iris.cube import Cube, CubeList
from numpy import ndarray
from scipy.interpolate import interp1d
from improver import BasePlugin
from improver.blending import MODEL_BLEND_COORD, MODEL_NAME_COORD
from improver.utilities.cube_manipulation import (
check_cube_coordinates,
sort_coord_in_cube,
)
[docs]
class WeightsUtilities:
"""Utilities for Weight processing."""
def __repr__(self) -> str:
"""Represent the configured plugin instance as a string."""
result = "<WeightsUtilities>"
return result
[docs]
@staticmethod
def normalise_weights(weights: ndarray, axis: Optional[int] = None) -> ndarray:
"""Ensures all weights add up to one.
Args:
weights:
array of weights
axis:
The axis that we want to normalise along for a multiple
dimensional array. Defaults to None, meaning the whole
array is used for the normalisation.
Returns:
array of weights where sum = 1.0
Raises:
ValueError: any negative weights are found in input.
ValueError: sum of weights in the input is 0.
"""
if np.any(weights.min(axis=axis) < 0.0):
msg = (
"Weights must be positive. The weights have at least one "
"value < 0.0: {}".format(weights)
)
raise ValueError(msg)
sumval = np.sum(weights, axis=axis, keepdims=True)
if np.any(sumval == 0):
msg = "Sum of weights must be > 0.0"
raise ValueError(msg)
normalised_weights = weights / sumval
return normalised_weights
[docs]
@staticmethod
def build_weights_cube(cube: Cube, weights: ndarray, blending_coord: str) -> Cube:
"""Build a cube containing weights for use in blending.
Args:
cube:
The cube that is being blended over blending_coord.
weights:
Array of weights
blending_coord:
Name of the coordinate over which the weights will be used
to blend data, e.g. across model name when grid blending.
Returns:
A cube containing the array of weights.
Raises:
ValueError : If weights array is not of the same length as the
coordinate being blended over on cube.
"""
if len(weights) != len(cube.coord(blending_coord).points):
msg = (
"Weights array provided is not the same size as the "
"blending coordinate; weights shape: {}, blending "
"coordinate shape: {}".format(
len(weights), len(cube.coord(blending_coord).points)
)
)
raise ValueError(msg)
try:
weights_cube = next(cube.slices(blending_coord))
except ValueError:
weights_cube = iris.util.new_axis(cube, blending_coord)
weights_cube = next(weights_cube.slices(blending_coord))
weights_cube.attributes = None
# Find dim associated with blending_coord and don't remove any coords
# associated with this dimension.
blending_dim = cube.coord_dims(blending_coord)
defunct_coords = [
crd.name()
for crd in cube.coords(dim_coords=True)
if not cube.coord_dims(crd) == blending_dim
]
for crd in defunct_coords:
weights_cube.remove_coord(crd)
weights_cube.data = weights
weights_cube.rename("weights")
weights_cube.units = 1
return weights_cube
[docs]
class ChooseWeightsLinear(BasePlugin):
"""Plugin to interpolate weights linearly to the required points, where
original weights are provided as a configuration dictionary"""
[docs]
def __init__(
self,
weighting_coord_name: str,
config_dict: Dict[str, Dict[str, Any]],
config_coord_name: str = MODEL_NAME_COORD,
) -> None:
"""
Set up for calculating linear weights from a dictionary or input cube
Args:
weighting_coord_name:
Standard name of the coordinate along which the weights will be
interpolated. For example, if the intention is to provide
weights varying with forecast period, then this argument would
be "forecast_period". This coordinate must be included within
the configuration dictionary.
config_dict:
Dictionary containing the configuration information, namely
an initial set of weights and information regarding the
points along the specified coordinate at which the weights are
valid. An example dictionary is shown below.
config_coord_name:
Name of the coordinate used to select the configuration.
For example, if the intention is to create weights that scale
differently with the weighting_coord for different models, then
MODEL_NAME_COORD would be the config_coord.
Dictionary of format::
{
"uk_det": {
"forecast_period": [7, 12],
"weights": [1, 0],
"units": "hours"
}
"uk_ens": {
"forecast_period": [7, 12, 48, 54],
"weights": [0, 1, 1, 0],
"units": "hours"
}
}
To assign a different constant weight to each model, choose any coordinate
for "weighting_coord_name" and choose any two points for its value in the
dictionary. Set the value of "weights" to be the same for both points in
each model's dictionary. For example, we can assign weights of 0.3 and 0.7
to models uk_det and uk_ens as follows::
{
"uk_det": {
"forecast_period": [0, 48],
"weights": [0.3, 0.3],
"units": "hours"
}
"uk_ens": {
"forecast_period": [0, 48],
"weights": [0.7, 0.7],
"units": "hours"
}
}
"""
self.weighting_coord_name = weighting_coord_name
self.config_coord_name = config_coord_name
self.config_dict = config_dict
self.weights_key_name = "weights"
self._check_config_dict()
def __repr__(self) -> str:
"""Represent the plugin instance as a string"""
msg = (
"<ChooseWeightsLinear(): weighting_coord_name = {}, "
"config_coord_name = {}, config_dict = {}>".format(
self.weighting_coord_name, self.config_coord_name, str(self.config_dict)
)
)
return msg
[docs]
def _check_config_dict(self) -> None:
"""
Check whether the items within the configuration dictionary
are present and of matching lengths.
Raises:
ValueError: If items within the configuration dictionary are
not of matching lengths.
KeyError: If the required items are not present in the
configuration dictionary.
"""
# Check all keys
for key in self.config_dict.keys():
weighting_len = len(self.config_dict[key][self.weighting_coord_name])
weights_len = len(self.config_dict[key][self.weights_key_name])
if weighting_len != weights_len:
msg = (
"{} is {}, {} is {}."
"These items in the configuration dictionary "
"have different lengths i.e. {} != {}".format(
self.weighting_coord_name,
self.config_dict[key][self.weighting_coord_name],
self.weights_key_name,
self.config_dict[key][self.weights_key_name],
weighting_len,
weights_len,
)
)
raise ValueError(msg)
[docs]
@staticmethod
def _interpolate_to_find_weights(
source_points: ndarray,
target_points: ndarray,
source_weights: ndarray,
fill_value: Tuple[int, int],
axis: int = 0,
) -> ndarray:
"""
Use of scipy.interpolate.interp1d to interpolate source_weights
(valid at source_points) onto target_points grid. This allows
the specification of an axis for the interpolation, so that the
source_weights can be a multi-dimensional numpy array.
Args:
source_points:
Points within the configuration dictionary that will
be used as the input to the interpolation.
target_points:
Points within the cube that will be the target points
for the interpolation.
source_weights:
Weights from the configuration dictionary that will be
used as the input to the interpolation.
fill_value:
Values to be used if extrapolation is required. The
fill values are used for target_points that are outside
the source_points grid.
axis:
Axis along which the interpolation will occur.
Returns:
Weights corresponding to target_points following interpolation.
"""
f_out = interp1d(
source_points,
source_weights,
axis=axis,
fill_value=fill_value,
bounds_error=False,
)
weights = f_out(target_points)
return weights
[docs]
def _create_new_weights_cube(self, cube: Cube, weights: ndarray) -> Cube:
"""Create a cube to contain the output of the interpolation.
It is currently assumed that the output weights matches the size
of the input cube.
Args:
cube:
Cube containing the coordinate information that will be used
for setting up the new_weights_cube.
weights:
Weights calculated following interpolation.
Returns:
Cube containing the output from the interpolation. This has
the same shape as "cube", without the x and y dimensions.
"""
spatial = [cube.coord(axis="y"), cube.coord(axis="x")]
cubelist = iris.cube.CubeList([])
for cube_slice, weight in zip(
cube.slices_over(self.weighting_coord_name), weights
):
sub_slice = next(cube_slice.slices_over(spatial))
sub_slice.data = np.ones(sub_slice.data.shape) * weight
cubelist.append(sub_slice)
# re-order dimension coordinates to match input cube
new_weights_cube = check_cube_coordinates(
next(cube.slices_over(spatial)), cubelist.merge_cube()
)
# remove all scalar coordinates that are not time-, model- or
# blend-related
dim_coords = new_weights_cube.coords(dim_coords=True)
keep_coords = [
"time",
"forecast_period",
"forecast_reference_time",
MODEL_BLEND_COORD,
MODEL_NAME_COORD,
self.weighting_coord_name,
self.config_coord_name,
]
for coord in new_weights_cube.coords():
if coord not in dim_coords and coord.name() not in keep_coords:
new_weights_cube.remove_coord(coord)
# remove attributes
new_weights_cube.attributes = {}
# rename cube
new_weights_cube.rename(self.weights_key_name)
new_weights_cube.units = cf_units.Unit("1")
return new_weights_cube
[docs]
def _calculate_weights(self, cube: Cube) -> Cube:
"""Method to wrap the calls to other methods to support calculation
of the weights by interpolation.
Args:
cube:
Cube containing the coordinate information that will be used
for setting up the interpolation and create the new weights
cube.
Returns:
Cube containing the output from the interpolation. This
has been renamed using the self.weights_key_name but
otherwise matches the input cube.
"""
(source_points, target_points, source_weights, fill_value) = (
self._get_interpolation_inputs_from_dict(cube)
)
axis = 0
weights = self._interpolate_to_find_weights(
source_points, target_points, source_weights, fill_value, axis=axis
)
new_weights_cube = self._create_new_weights_cube(cube, weights)
return new_weights_cube
[docs]
def _define_slice(self, cube: Cube) -> List[Coord]:
"""
Returns a list of coordinates over which to slice the input cube to
create a list of cubes for blending.
Args:
cube:
Cube input to plugin
Returns:
List of coordinates defining the slice to iterate over
"""
if cube.coord_dims(self.weighting_coord_name):
slice_list = [
cube.coord(self.weighting_coord_name),
cube.coord(axis="y"),
cube.coord(axis="x"),
]
else:
slice_list = [cube.coord(axis="y"), cube.coord(axis="x")]
# To handle non-orthogonal spatial coordinates, i.e. multiple coordinates
# that share the same dimension, as in a spot-forecast.
unique_slice_list = []
for dim in set([cube.coord_dims(crd) for crd in slice_list]):
unique_slice_list.append(cube.coords(dimensions=dim)[0])
return unique_slice_list
[docs]
def process(self, cubes: Union[Cube, CubeList]) -> Cube:
"""Calculation of linear weights based on an input dictionary.
Args:
cubes:
Cubes containing the coordinate (source point) information
that will be used for setting up the interpolation. Each cube
should have "self.config_coord_name" as a scalar dimension; if
a merged cube is passed in, the plugin will split this into a
list cubes.
Returns:
Cube containing the output from the interpolation.
DimCoords (such as model_id) will be in sorted-ascending order.
"""
# create 2D cube lists with relevant dimensions only for dict
# processing
cubes = self._slice_input_cubes(cubes)
# calculate weights
cube_slices = iris.cube.CubeList([])
for cube in cubes:
new_weights_cube = self._calculate_weights(cube)
cube_slices.append(new_weights_cube)
# normalise weights
new_weights_cube = cube_slices.merge_cube()
axis = new_weights_cube.coord_dims(self.config_coord_name)
new_weights_cube.data = WeightsUtilities.normalise_weights(
new_weights_cube.data, axis=axis
)
return new_weights_cube
[docs]
class ChooseDefaultWeightsLinear(BasePlugin):
"""Calculate Default Weights using Linear Function."""
[docs]
def __init__(self, y0val: float, ynval: float) -> None:
"""
Set up for calculating default weights using linear function.
Args:
y0val:
Relative weight of first point. Must be positive.
ynval:
Relative weight of last point.
"""
if y0val is None or ynval is None:
raise ValueError(
"y0val and ynval are required arguments to the "
"ChooseDefaultWeightsLinear plugin"
)
if y0val < 0.0:
msg = "y0val must be a float >= 0.0, y0val = {0:s}".format(str(y0val))
raise ValueError(msg)
self.y0val = float(y0val)
self.ynval = float(ynval)
[docs]
def linear_weights(self, num_of_weights: int) -> ndarray:
"""Create linear weights
Args:
num_of_weights:
Number of weights to create.
Returns:
array of weights, sum of all weights = 1.0
"""
# Special case num_of_weights == 1 i.e. Scalar coordinate.
if num_of_weights == 1:
weights = np.array([1.0], dtype=np.float32)
return weights
slope = (self.ynval - self.y0val) / (num_of_weights - 1.0)
weights_list = []
for tval in range(0, num_of_weights):
weights_list.append(slope * tval + self.y0val)
weights = WeightsUtilities.normalise_weights(
np.array(weights_list, dtype=np.float32)
)
return weights
[docs]
def process(self, cube: Cube, coord_name: str) -> Cube:
"""
Calculated weights for a given cube and coord. Weights scale linearly
between self.y0val and self.ynval for the cube provided in ascending
order of blend coordinate. self.y0val = self.ynval gives equal
weightings across all input fields.
Args:
cube:
Cube to blend across the coord.
coord_name:
Name of coordinate in the cube to be blended.
Returns:
1D cube of normalised (sum = 1.0) weights matching length
of input dimension to be blended
Raises:
TypeError : input is not a cube
"""
if not isinstance(cube, iris.cube.Cube):
msg = (
"The first argument must be an instance of "
"iris.cube.Cube but is"
" {0:s}".format(str(type(cube)))
)
raise TypeError(msg)
weights = self.linear_weights(len(cube.coord(coord_name).points))
weights_cube = WeightsUtilities.build_weights_cube(cube, weights, coord_name)
return weights_cube
def __repr__(self):
"""Represent the configured plugin instance as a string."""
desc = "<ChooseDefaultWeightsLinear y0val={:4.1f}, ynval={:4.1f}>".format(
self.y0val, self.ynval
)
return desc
[docs]
class ChooseDefaultWeightsNonLinear(BasePlugin):
"""Calculate Default Weights using NonLinear Function."""
[docs]
def __init__(self, cval: float) -> None:
"""
Set up for calculating default weights using non-linear function.
Args:
cval:
Value greater than 0, less than equal 1.0. Weights are
calculated for input cubes in order such that the first has
weight cval**0, then cval**1, cval**2, etc. The weights are
then re-normalised. Thus a value of 1 gives equal weighting
across all input fields.
Raises:
ValueError: an inappropriate value of cval is input.
"""
if cval is None:
raise ValueError(
"cval is a required argument to the "
"ChooseDefaultWeightsNonLinear plugin"
)
if cval <= 0.0 or cval > 1.0:
msg = (
"cval must be greater than 0.0 and less "
"than or equal to 1.0 cval = {}".format(cval)
)
raise ValueError(msg)
self.cval = cval
[docs]
def nonlinear_weights(self, num_of_weights: int) -> ndarray:
"""
Create nonlinear weights.
Args:
num_of_weights:
Number of weights to create
Returns:
Normalised array of weights
"""
weights_list = []
for tval_minus1 in range(0, num_of_weights):
weights_list.append(self.cval ** (tval_minus1))
weights = WeightsUtilities.normalise_weights(
np.array(weights_list, dtype=np.float32)
)
return weights
[docs]
def process(
self, cube: Cube, coord_name: str, inverse_ordering: bool = False
) -> Cube:
"""
Calculate nonlinear weights for a given cube and coord.
Args:
cube:
Cube to be blended across the coord.
coord_name:
Name of coordinate in the cube to be blended.
inverse_ordering:
The input cube blend coordinate will be in ascending order,
so that calculated blend weights decrease with increasing
value. For eg cycle blending by forecast reference time, we
wish to weight more recent cubes more highly. This flag gives
the option to reverse the blend coordinate order so as to have
higher weights for the higher values.
Returns:
1D cube of normalised (sum = 1.0) weights matching input
dimension to be blended
Raises:
TypeError : input is not a cube
"""
if not isinstance(cube, iris.cube.Cube):
msg = (
"The first argument must be an instance of "
"iris.cube.Cube but is"
" {0:s}".format(str(type(cube)))
)
raise TypeError(msg)
if inverse_ordering:
# make a copy of the input cube from which to calculate weights
inverted_cube = cube.copy()
inverted_cube = sort_coord_in_cube(
inverted_cube, coord_name, descending=True
)
cube = inverted_cube
weights = self.nonlinear_weights(len(cube.coord(coord_name).points))
weights_cube = WeightsUtilities.build_weights_cube(cube, weights, coord_name)
if inverse_ordering:
# re-sort the weights cube so that it is in ascending order of
# blend coordinate (and hence matches the input cube)
weights_cube = sort_coord_in_cube(weights_cube, coord_name)
return weights_cube
def __repr__(self):
"""Represent the configured plugin instance as a string."""
desc = "<ChooseDefaultWeightsNonLinear cval={0:4.1f}>".format(self.cval)
return desc
[docs]
class ChooseDefaultWeightsTriangular(BasePlugin):
"""Calculate Default Weights using a Triangular Function."""
[docs]
def __init__(self, width: float, units: Union[Unit, str] = "no_unit") -> None:
"""Set up for calculating default weights using triangular function.
Args:
width:
The width of the triangular function from the centre point.
units:
The cf units of the width and midpoint.
"""
self.width = width
if not isinstance(units, cf_units.Unit):
units = cf_units.Unit(units)
self.parameters_units = units
def __repr__(self) -> str:
"""Represent the configured plugin instance as a string."""
msg = "<ChooseDefaultTriangularWeights width={}, parameters_units={}>"
desc = msg.format(self.width, self.parameters_units)
return desc
[docs]
@staticmethod
def triangular_weights(
coord_vals: ndarray, midpoint: float, width: float
) -> ndarray:
"""Calculate triangular weights.
Args:
coord_vals:
An array of coordinate values that we want to calculate
weights for.
midpoint:
The centre point of the triangular function.
width:
The width from the triangle’s centre point, in units of the plugin's
units argument, which will determine the triangular weighting function
used to blend that specified point with its adjacent points. Beyond
this width the weighting drops to zero.
Returns:
An array of weights, the sum of which should equal 1.0.
"""
def calculate_weight(point: float, slope: float) -> float:
"""
A helper function to calculate the weights for each point using a
piecewise function to build up the triangular function.
Args:
point:
The point in the coordinate from the cube for
which we want to calculate a weight for.
slope:
The gradient of the triangle, calculated from
1/(width of triangle).
Returns:
The individual weight calculated by the function.
"""
if point == midpoint:
weight = 1
else:
weight = 1 - abs(point - midpoint) * slope
return weight
slope = 1.0 / width
weights = np.zeros(coord_vals.shape, dtype=np.float32)
# Find the indices of the points where there will be non-zero weights.
condition = (coord_vals >= (midpoint - width)) & (
coord_vals <= (midpoint + width)
)
points_with_weights = np.where(condition)[0]
# Calculate for weights for points where we want a non-zero weight.
for index in points_with_weights:
weights[index] = calculate_weight(coord_vals[index], slope)
# Normalise the weights.
weights = WeightsUtilities.normalise_weights(weights)
return weights
[docs]
def process(self, cube: Cube, coord_name: str, midpoint: float) -> Cube:
"""Calculate triangular weights for a given cube and coord.
Args:
cube:
Cube to blend across the coord.
coord_name:
Name of coordinate in the cube to be blended.
midpoint:
The centre point of the triangular function. This is
assumed to be provided in the same units as "self.width",
ie "self.parameter_units" as initialised.
Returns:
1D cube of normalised (sum = 1.0) weights matching length
of input dimension to be blended.
Raises:
TypeError : input is not a cube
"""
if not isinstance(cube, iris.cube.Cube):
msg = (
"The first argument must be an instance of "
"iris.cube.Cube but is"
" {0:s}".format(str(type(cube)))
)
raise TypeError(msg)
cube_coord = cube.coord(coord_name)
coord_vals = cube_coord.points
coord_units = cube_coord.units
# Rescale width and midpoint if in different units to the coordinate
if coord_units != self.parameters_units:
width_in_coord_units = self.parameters_units.convert(
self.width, coord_units
)
midpoint = self.parameters_units.convert(midpoint, coord_units)
else:
width_in_coord_units = copy.deepcopy(self.width)
weights = self.triangular_weights(coord_vals, midpoint, width_in_coord_units)
weights_cube = WeightsUtilities.build_weights_cube(cube, weights, coord_name)
return weights_cube