Source code for improver.calibration.emos_calibration

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""
This module defines all the "plugins" specific for Ensemble Model Output
Statistics (EMOS).

.. Further information is available in:
.. include:: extended_documentation/calibration/ensemble_calibration/
   ensemble_calibration.rst

"""

import warnings
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import iris
import numpy as np
from cf_units import Unit
from iris.coords import Coord
from iris.cube import Cube, CubeList
from iris.exceptions import CoordinateNotFoundError
from numpy import ndarray
from scipy.optimize import OptimizeResult, minimize
from scipy.stats import norm

from improver import BasePlugin, PostProcessingPlugin
from improver.calibration.utilities import (
    broadcast_data_to_time_coord,
    check_data_sufficiency,
    check_forecast_consistency,
    check_predictor,
    convert_cube_data_to_2d,
    create_unified_frt_coord,
    filter_non_matching_cubes,
    flatten_ignoring_masked_data,
    forecast_coords_match,
    merge_land_and_sea,
)
from improver.ensemble_copula_coupling.ensemble_copula_coupling import (
    ConvertLocationAndScaleParametersToPercentiles,
    ConvertLocationAndScaleParametersToProbabilities,
    ConvertProbabilitiesToPercentiles,
    EnsembleReordering,
    RebadgePercentilesAsRealizations,
    ResamplePercentiles,
)
from improver.metadata.probabilistic import find_percentile_coordinate
from improver.metadata.utilities import (
    create_new_diagnostic_cube,
    generate_mandatory_attributes,
)
from improver.utilities.cube_manipulation import collapsed, enforce_coordinate_ordering


[docs] class ContinuousRankedProbabilityScoreMinimisers(BasePlugin): """ Minimise the Continuous Ranked Probability Score (CRPS) Calculate the optimised coefficients for minimising the CRPS based on assuming a particular probability distribution for the phenomenon being minimised. The number of coefficients that will be optimised depend upon the initial guess. The coefficients will be calculated either using all points provided or coefficients will be calculated separately for each point. Minimisation is performed using the Nelder-Mead algorithm for 200 iterations to limit the computational expense. Note that the BFGS algorithm was initially trialled but had a bug in comparison to comparative results generated in R. """ # The tolerated percentage change for the final iteration when # performing the minimisation. TOLERATED_PERCENTAGE_CHANGE = 5 # An arbitrary value set if an infinite value is detected # as part of the minimisation. BAD_VALUE = np.float64(999999)
[docs] def __init__( self, predictor: str, tolerance: float = 0.02, max_iterations: int = 1000, point_by_point: bool = False, ) -> None: """ Initialise class for performing minimisation of the Continuous Ranked Probability Score (CRPS). Args: predictor: String to specify the form of the predictor used to calculate the location parameter when estimating the EMOS coefficients. Currently the ensemble mean ("mean") and the ensemble realizations ("realizations") are supported as the predictors. tolerance: The tolerance for the Continuous Ranked Probability Score (CRPS) calculated by the minimisation. The CRPS is in the units of the variable being calibrated. The tolerance is therefore representative of how close to the actual value are we aiming to forecast for a particular variable. Once multiple iterations result in a CRPS equal to the same value within the specified tolerance, the minimisation will terminate. max_iterations: The maximum number of iterations allowed until the minimisation has converged to a stable solution. If the maximum number of iterations is reached, but the minimisation has not yet converged to a stable solution, then the available solution is used anyway, and a warning is raised. If the predictor_of_mean is "realizations", then the number of iterations may require increasing, as there will be more coefficients to solve for. point_by_point: If True, coefficients are calculated independently for each point within the input cube by minimising each point independently. """ # Dictionary containing the functions that will be minimised, # depending upon the distribution requested. The names of these # distributions match the names of distributions in scipy.stats. self.minimisation_dict = { "norm": self.calculate_normal_crps, "truncnorm": self.calculate_truncated_normal_crps, } self.predictor = check_predictor(predictor) self.tolerance = tolerance # Maximum iterations for minimisation using Nelder-Mead. self.max_iterations = max_iterations self.point_by_point = point_by_point
[docs] def _normal_crps_preparation( self, initial_guess: ndarray, forecast_predictor: ndarray, truth: ndarray, forecast_var: ndarray, ) -> Tuple[ndarray, ndarray, ndarray, ndarray, ndarray]: """ Prepare for the CRPS calculation by computing estimates for the location parameter (mu), scale parameter (sigma), normalised prediction error (xz) and the corresponding CDF and PDF, assuming a normal distribution. Args: initial_guess forecast_predictor truth forecast_var predictor Returns: The location parameter (mu), scale parameter (sigma), normalised prediction error (xz) and the corresponding CDF and PDF, assuming a normal distribution. """ aa, bb, gamma, delta = ( initial_guess[0], initial_guess[1:-2], initial_guess[-2], initial_guess[-1], ) if self.predictor == "mean": a_b = np.array([aa, *np.atleast_1d(bb)], dtype=np.float64) elif self.predictor == "realizations": bb = bb * bb a_b = np.array([aa] + bb.tolist(), dtype=np.float64) new_col = np.ones(truth.shape, dtype=np.float32) all_data = np.column_stack((new_col, forecast_predictor)) mu = np.dot(all_data, a_b) sigma = np.sqrt(gamma * gamma + delta * delta * forecast_var) xz = (truth - mu) / sigma normal_cdf = norm.cdf(xz) normal_pdf = norm.pdf(xz) return mu, sigma, xz, normal_cdf, normal_pdf
[docs] def calculate_normal_crps( self, initial_guess: ndarray, forecast_predictor: ndarray, truth: ndarray, forecast_var: ndarray, sqrt_pi: float, ) -> float: """ Calculate the CRPS for a normal distribution. Scientific Reference: Gneiting, T. et al., 2005. Calibrated Probabilistic Forecasting Using Ensemble Model Output Statistics and Minimum CRPS Estimation. Monthly Weather Review, 133(5), pp.1098-1118. Args: initial_guess: List of optimised coefficients. Order of coefficients is [alpha, beta, gamma, delta]. Multiple beta values can be provided if either realizations are provided as the predictor, or if additional predictors are provided. forecast_predictor: Data to be used as the predictor, either the ensemble mean or the ensemble realizations of the predictand variable and additional static predictors, as required. truth: Data to be used as truth. forecast_var: Ensemble variance data. sqrt_pi: Square root of Pi Returns: CRPS for the current set of coefficients. This CRPS is a mean value across all points. """ mu, sigma, xz, normal_cdf, normal_pdf = self._normal_crps_preparation( initial_guess, forecast_predictor, truth, forecast_var ) if np.isfinite(np.min(mu / sigma)): result = np.nanmean( sigma * (xz * (2 * normal_cdf - 1) + 2 * normal_pdf - 1 / sqrt_pi) ) else: result = self.BAD_VALUE return result
[docs] def calculate_truncated_normal_crps( self, initial_guess: ndarray, forecast_predictor: ndarray, truth: ndarray, forecast_var: ndarray, sqrt_pi: float, ) -> float: """ Calculate the CRPS for a truncated normal distribution with zero as the lower bound. Scientific Reference: Thorarinsdottir, T.L. & Gneiting, T., 2010. Probabilistic forecasts of wind speed: Ensemble model output statistics by using heteroscedastic censored regression. Journal of the Royal Statistical Society. Series A: Statistics in Society, 173(2), pp.371-388. Args: initial_guess: List of optimised coefficients. Order of coefficients is [alpha, beta, gamma, delta]. Multiple beta values can be provided if either realizations are provided as the predictor, or if additional predictors are provided. forecast_predictor: Data to be used as the predictor, either the ensemble mean or the ensemble realizations of the predictand variable and additional static predictors, as required. truth: Data to be used as truth. forecast_var: Ensemble variance data. sqrt_pi: Square root of Pi Returns: CRPS for the current set of coefficients. This CRPS is a mean value across all points. """ mu, sigma, xz, normal_cdf, normal_pdf = self._normal_crps_preparation( initial_guess, forecast_predictor, truth, forecast_var ) x0 = mu / sigma normal_cdf_0 = norm.cdf(x0) normal_cdf_root_two = norm.cdf(np.sqrt(2) * x0) if np.isfinite(np.min(mu / sigma)) or (np.min(mu / sigma) >= -3): result = np.nanmean( (sigma / (normal_cdf_0 * normal_cdf_0)) * ( xz * normal_cdf_0 * (2 * normal_cdf + normal_cdf_0 - 2) + 2 * normal_pdf * normal_cdf_0 - normal_cdf_root_two / sqrt_pi ) ) else: result = self.BAD_VALUE return result
[docs] def _calculate_percentage_change_in_last_iteration( self, allvecs: List[ndarray] ) -> None: """ Calculate the percentage change that has occurred within the last iteration of the minimisation. If the percentage change between the last iteration and the last-but-one iteration exceeds the threshold, a warning message is printed. Args: allvecs: List of numpy arrays containing the optimised coefficients, after each iteration. Warns: Warning: If a satisfactory minimisation has not been achieved. """ last_iteration_percentage_change = ( np.absolute((allvecs[-1] - allvecs[-2]) / allvecs[-2]) * 100 ) if np.any(last_iteration_percentage_change > self.TOLERATED_PERCENTAGE_CHANGE): np.set_printoptions(suppress=True) msg = ( "The final iteration resulted in a percentage change " f"that is greater than the accepted threshold of " f"{self.TOLERATED_PERCENTAGE_CHANGE}% " f"i.e. {last_iteration_percentage_change}. " "\nA satisfactory minimisation has not been achieved. " f"\nLast iteration: {allvecs[-1]}, " f"\nLast-but-one iteration: {allvecs[-2]}" f"\nAbsolute difference: {np.absolute(allvecs[-2] - allvecs[-1])}\n" ) warnings.warn(msg)
[docs] def _minimise_caller( self, minimisation_function: Callable, initial_guess: ndarray, forecast_predictor_data: ndarray, truth_data: ndarray, forecast_var_data: ndarray, sqrt_pi: float, ) -> OptimizeResult: """Call scipy minimize with the options provided. Args: minimisation_function initial_guess forecast_predictor truth forecast_var sqrt_pi: Square root of pi for minimisation. Return: A single set of coefficients with the order [alpha, beta, gamma, delta]. """ optimised_coeffs = minimize( minimisation_function, initial_guess, args=(forecast_predictor_data, truth_data, forecast_var_data, sqrt_pi), method="Nelder-Mead", tol=self.tolerance, options={"maxiter": self.max_iterations, "return_all": True}, ) return optimised_coeffs
[docs] def _prepare_forecasts(self, forecast_predictors: CubeList) -> ndarray: """Prepare forecasts to be a consistent shape for minimisation by broadcasting static predictors along the time dimension and flattening the spatiotemporal dimensions. Args: forecast_predictors: The forecast predictors to be reshaped. Returns: Reshaped array with a first dimension representing the flattened spatiotemporal dimensions and an optional second dimension for flattened non-spatiotemporal dimensions (e.g. realizations). """ preserve_leading_dimension = self.predictor == "realizations" forecast_predictors = broadcast_data_to_time_coord(forecast_predictors) flattened_forecast_predictors = [] for fp_data in forecast_predictors: flattened_forecast_predictors.append( flatten_ignoring_masked_data( fp_data, preserve_leading_dimension=preserve_leading_dimension ) ) if len(forecast_predictors) > 1: forecast_predictor_data = np.ma.vstack(flattened_forecast_predictors) else: (forecast_predictor_data,) = flattened_forecast_predictors return forecast_predictor_data
[docs] def _process_points_independently( self, minimisation_function: Callable, initial_guess: ndarray, forecast_predictors: CubeList, truth: Cube, forecast_var: Cube, sqrt_pi: float, ) -> ndarray: """Minimise each point along the spatial dimensions independently to create a set of coefficients for each point. The coefficients returned can be either gridded (i.e. separate dimensions for x and y) or for a list of sites where x and y share a common dimension. Args: minimisation_function: Function to use when minimising. initial_guess forecast_predictor truth forecast_var sqrt_pi Returns: Separate optimised coefficients for each point. The shape of the coefficients array is (number of coefficients, length of spatial dimensions). Order of coefficients is [alpha, beta, gamma, delta]. Multiple beta values can be provided if either realizations are provided as the predictor, or if additional predictors are provided. """ fp_template = forecast_predictors[0] sindex = [fp_template.coord(axis="y"), fp_template.coord(axis="x")] y_name = truth.coord(axis="y").name() x_name = truth.coord(axis="x").name() optimised_coeffs = [] for index, (truth_slice, fv_slice) in enumerate( zip(truth.slices_over(sindex), forecast_var.slices_over(sindex)) ): # Extract forecast predictor cubelist to match truth and variance cubes constr = iris.Constraint( coord_values={ y_name: lambda cell: any( np.isclose(cell.point, truth_slice.coord(axis="y").points) ), x_name: lambda cell: any( np.isclose(cell.point, truth_slice.coord(axis="x").points) ), } ) forecast_predictors_slice = forecast_predictors.extract(constr) forecast_predictor_data = self._prepare_forecasts(forecast_predictors_slice) if all(np.isnan(truth_slice.data)): optimised_coeffs.append( np.array(initial_guess[index], dtype=np.float32) ) else: optimised_coeffs.append( self._minimise_caller( minimisation_function, initial_guess[index], forecast_predictor_data.T, truth_slice.data, fv_slice.data, sqrt_pi, ).x.astype(np.float32) ) y_coord = fp_template.coord(axis="y") x_coord = fp_template.coord(axis="x") if fp_template.coord_dims(y_coord) == fp_template.coord_dims(x_coord): return np.transpose(np.array(optimised_coeffs)).reshape( (len(initial_guess[0]), len(fp_template.coord(axis="y").points)) ) else: return np.transpose(np.array(optimised_coeffs)).reshape( ( len(initial_guess[0]), len(fp_template.coord(axis="y").points), len(fp_template.coord(axis="x").points), ) )
[docs] def _process_points_together( self, minimisation_function: Callable, initial_guess: ndarray, forecast_predictors: CubeList, truth: Cube, forecast_var: Cube, sqrt_pi: float, ) -> ndarray: """Minimise all points together in one minimisation to create a single set of coefficients. Args: minimisation_function: Function to use when minimising. initial_guess forecast_predictor truth forecast_var sqrt_pi Returns: The optimised coefficients. Order of coefficients is [alpha, beta, gamma, delta]. Multiple beta values can be returned if either realizations are provided as the predictor, or if additional predictors are provided. """ # Flatten the data arrays and remove any missing data. truth_data = flatten_ignoring_masked_data(truth.data) forecast_var_data = flatten_ignoring_masked_data(forecast_var.data) forecast_predictor_data = self._prepare_forecasts(forecast_predictors) optimised_coeffs = self._minimise_caller( minimisation_function, initial_guess, forecast_predictor_data.T, truth_data, forecast_var_data, sqrt_pi, ) if not optimised_coeffs.success: msg = ( "Minimisation did not result in convergence after " "{} iterations. \n{}".format( self.max_iterations, optimised_coeffs.message ) ) warnings.warn(msg) self._calculate_percentage_change_in_last_iteration(optimised_coeffs.allvecs) return optimised_coeffs.x.astype(np.float32)
[docs] def process( self, initial_guess: ndarray, forecast_predictors: CubeList, truth: Cube, forecast_var: Cube, distribution: str, ) -> ndarray: """ Function to pass a given function to the scipy minimize function to estimate optimised values for the coefficients. Further information is available in the :mod:`module level docstring <improver.calibration.ensemble_calibration>`. Args: initial_guess: List of optimised coefficients. Order of coefficients is [alpha, beta, gamma, delta]. Multiple beta values can be provided if either realizations are provided as the predictor, or if additional predictors are provided. forecast_predictors: CubeList containing the fields to be used as the predictor. These will include the ensemble mean or realizations of the predictand variable and additional static predictors, as required. truth: Cube containing the field, which will be used as truth. forecast_var: Cube containing the field containing the ensemble variance. distribution: String used to access the appropriate function for use in the minimisation within self.minimisation_dict. Returns: The optimised coefficients following the order [alpha, beta, gamma, delta]. If point_by_point is False, then one set of coefficients are generated. If point_by_point is True, then the leading dimension of the numpy array is the length of the spatial dimensions within the forecast and truth cubes. Each set of coefficients are appropriate for a particular point. If realizations or static additional predictors are provided, then multiple values for beta will be generated. Raises: KeyError: If the distribution is not supported. Warns: Warning: If the minimisation did not converge. """ try: minimisation_function = self.minimisation_dict[distribution] except KeyError as err: msg = ( "Distribution requested {} is not supported in {}" "Error message is {}".format(distribution, self.minimisation_dict, err) ) raise KeyError(msg) if self.predictor == "realizations": for forecast_predictor in forecast_predictors: enforce_coordinate_ordering(forecast_predictor, "realization") # Set values to float64 precision. The increased precision is need for stable # coefficient calculation. initial_guess = np.array(initial_guess, dtype=np.float64) for index in range(len(forecast_predictors)): forecast_predictors[index].data = forecast_predictors[index].data.astype( np.float64 ) forecast_var.data = forecast_var.data.astype(np.float64) truth.data = truth.data.astype(np.float64) sqrt_pi = np.sqrt(np.pi) if self.point_by_point: optimised_coeffs = self._process_points_independently( minimisation_function, initial_guess, forecast_predictors, truth, forecast_var, sqrt_pi, ) else: optimised_coeffs = self._process_points_together( minimisation_function, initial_guess, forecast_predictors, truth, forecast_var, sqrt_pi, ) return optimised_coeffs
[docs] class EstimateCoefficientsForEnsembleCalibration(BasePlugin): """ Class focussing on estimating the optimised coefficients for ensemble calibration. """
[docs] def __init__( self, distribution: str, point_by_point: bool = False, use_default_initial_guess: bool = False, desired_units: Optional[Union[str, Unit]] = None, predictor: str = "mean", tolerance: float = 0.02, max_iterations: int = 1000, proportion_of_nans: float = 0.5, ) -> None: """ Create an ensemble calibration plugin that, for Nonhomogeneous Gaussian Regression, calculates coefficients based on historical forecasts and applies the coefficients to the current forecast. Further information is available in the :mod:`module level docstring <improver.calibration.ensemble_calibration>`. Args: distribution: Name of distribution. Assume that a calibrated version of the current forecast could be represented using this distribution. point_by_point: If True, coefficients are calculated independently for each point within the input cube by creating an initial guess and minimising each grid point independently. Please note this option is memory intensive and is unsuitable for gridded input. Using a default initial guess may reduce the memory overhead option. use_default_initial_guess: If True, use the default initial guess. The default initial guess assumes no adjustments are required to the initial choice of predictor to generate the calibrated distribution. This means coefficients of 1 for the multiplicative coefficients and 0 for the additive coefficients. If False, the initial guess is computed. desired_units: The unit that you would like the calibration to be undertaken in. The current forecast, historical forecast and truth will be converted as required. predictor: String to specify the form of the predictor used to calculate the location parameter when estimating the EMOS coefficients. Currently the ensemble mean ("mean") and the ensemble realizations ("realizations") are supported as the predictors. tolerance: The tolerance for the Continuous Ranked Probability Score (CRPS) calculated by the minimisation. The CRPS is in the units of the variable being calibrated. The tolerance is therefore representative of how close to the actual value are we aiming to forecast for a particular variable. Once multiple iterations result in a CRPS equal to the same value within the specified tolerance, the minimisation will terminate. max_iterations: The maximum number of iterations allowed until the minimisation has converged to a stable solution. If the maximum number of iterations is reached, but the minimisation has not yet converged to a stable solution, then the available solution is used anyway, and a warning is raised. If the predictor_of_mean is "realizations", then the number of iterations may require increasing, as there will be more coefficients to solve for. proportion_of_nans: The proportion of the matching historic forecast-truth pairs that are allowed to be NaN. """ self.distribution = distribution self.point_by_point = point_by_point self.use_default_initial_guess = use_default_initial_guess # Ensure predictor is valid. self.predictor = check_predictor(predictor) self._validate_distribution() self.desired_units = desired_units self.tolerance = tolerance self.max_iterations = max_iterations self.proportion_of_nans = proportion_of_nans self.minimiser = ContinuousRankedProbabilityScoreMinimisers( self.predictor, tolerance=self.tolerance, max_iterations=self.max_iterations, point_by_point=self.point_by_point, ) # Setting default values for coeff_names. self.coeff_names = ["alpha", "beta", "gamma", "delta"]
[docs] def _validate_distribution(self) -> None: """Validate that the distribution supplied has a corresponding method for minimising the Continuous Ranked Probability Score. Raises: ValueError: If the distribution requested is not supported. """ valid_distributions = ContinuousRankedProbabilityScoreMinimisers( self.predictor ).minimisation_dict.keys() if self.distribution not in valid_distributions: msg = ( "Given distribution {} not available. Available " "distributions are {}".format(self.distribution, valid_distributions) ) raise ValueError(msg)
[docs] def _set_attributes(self, historic_forecasts: Cube) -> Dict[str, Any]: """Set attributes for use on the EMOS coefficients cube. Args: historic_forecasts: Historic forecasts from the training dataset. Returns: Attributes for an EMOS coefficients cube including "diagnostic standard name", "distribution", "shape_parameters" and an updated title. """ attributes = {} attributes["diagnostic_standard_name"] = historic_forecasts.name() attributes["distribution"] = self.distribution if self.distribution == "truncnorm": # For the CRPS minimisation, the truncnorm distribution is # truncated at zero. attributes["shape_parameters"] = np.array([0, np.inf], dtype=np.float32) attributes["title"] = "Ensemble Model Output Statistics coefficients" return attributes
[docs] @staticmethod def _create_temporal_coordinates(historic_forecasts: Cube) -> List[Coord]: """Create forecast reference time and forecast period coordinates for the EMOS coefficients cube. Args: historic_forecasts: Historic forecasts from the training dataset. Returns: List of the temporal coordinates. """ # Create forecast reference time coordinate. frt_coord = create_unified_frt_coord( historic_forecasts.coord("forecast_reference_time") ) fp_coord = historic_forecasts.coord("forecast_period") if fp_coord.shape[0] != 1: msg = ( "The forecast period must be the same for all historic forecasts. " "Forecast periods found: {}".format(fp_coord.points) ) raise ValueError(msg) return [frt_coord, fp_coord]
[docs] def _get_spatial_associated_coordinates( self, historic_forecasts: Cube ) -> Tuple[List[int], List[Coord]]: """Set-up the spatial dimensions and coordinates for the EMOS coefficients cube. Args: historic_forecasts: Historic forecasts from the training dataset. Returns: List of the spatial dimensions to retain within the coefficients cube and a list of the auxiliary coordinates that share the same dimension as the spatial coordinates. """ template_dims = [] if self.point_by_point: spatial_coords = [historic_forecasts.coord(axis=axis) for axis in "yx"] spatial_dims = { n for (n,) in [historic_forecasts.coord_dims(c) for c in spatial_coords] } template_dims = [x for x in spatial_dims] spatial_associated_coords = [ c for d in template_dims for c in historic_forecasts.coords(dimensions=d) ] else: spatial_associated_coords = [ historic_forecasts.coord(axis=axis).collapsed() for axis in "yx" ] return template_dims, spatial_associated_coords
[docs] @staticmethod def _add_predictor_coords( template_cube: Cube, forecast_predictors: CubeList ) -> Cube: """Add predictor index and predictor name coordinates to the beta coefficient template cube to support the use of additional predictors. Args: template_cube: A template cube for storing the optimised beta coefficients. forecast_predictors CubeList where each cube contains a separate forecast predictor Returns: A cube with the predictor_index and predictor_name coordinates added. Single value dimension coordinates are converted to non-dimension coordinates. """ template_cubes = iris.cube.CubeList() fp_names = [] for index, fp in enumerate(forecast_predictors): template_cube_copy = template_cube.copy() predictor_index = iris.coords.DimCoord( np.array(index, dtype=np.int8), long_name="predictor_index", units="1" ) template_cube_copy.add_aux_coord(predictor_index) template_cube_copy = iris.util.new_axis(template_cube_copy, predictor_index) template_cubes.append(template_cube_copy) fp_names.append(fp.name()) template_cube = template_cubes.concatenate_cube() predictor_name = iris.coords.AuxCoord( fp_names, long_name="predictor_name", units="no_unit" ) template_cube.add_aux_coord( predictor_name, data_dims=template_cube.coord_dims("predictor_index") ) return iris.util.squeeze(template_cube)
[docs] def _create_cubelist( self, optimised_coeffs: ndarray, historic_forecasts: Cube, forecast_predictors: CubeList, ) -> CubeList: """Create a cubelist by combining the optimised coefficients and the appropriate metadata. The units of the alpha and gamma coefficients match the units of the historic forecast. If the predictor is the realizations, then the beta coefficient cube contains a realization coordinate. Args: optimised_coeffs historic_forecasts: Historic forecasts from the training dataset. forecast_predictors Returns: CubeList constructed using the coefficients provided and using metadata from the historic_forecasts cube. Each cube within the cubelist is for a separate EMOS coefficient e.g. alpha, beta, gamma, delta. """ ( template_dims, spatial_associated_coords, ) = self._get_spatial_associated_coordinates(historic_forecasts) coords_to_replace = ( self._create_temporal_coordinates(historic_forecasts) + spatial_associated_coords ) coords_to_replace_names = [c.name() for c in coords_to_replace] cubelist = iris.cube.CubeList([]) for optimised_coeff, coeff_name in zip(optimised_coeffs, self.coeff_names): used_dims = template_dims.copy() replacements = coords_to_replace_names.copy() if self.predictor == "realizations" and "beta" == coeff_name: used_dims = ["realization"] + used_dims replacements += ["realization"] template_cube = next(historic_forecasts.slices(used_dims)) if "beta" == coeff_name: template_cube = self._add_predictor_coords( template_cube, forecast_predictors ) optimised_coeff = np.reshape(optimised_coeff, template_cube.shape) replacements += ["predictor_index", "predictor_name"] else: optimised_coeff = np.array(optimised_coeff) for coord in coords_to_replace: template_cube.replace_coord(coord) # Remove coordinates in the template cube that have not been replaced # and therefore updated. for coord in set([c.name() for c in template_cube.coords()]) - set( replacements ): template_cube.remove_coord(coord) coeff_units = "1" if coeff_name in ["alpha", "gamma"]: coeff_units = historic_forecasts.units cube = create_new_diagnostic_cube( f"emos_coefficient_{coeff_name}", coeff_units, template_cube, generate_mandatory_attributes([historic_forecasts]), optional_attributes=self._set_attributes(historic_forecasts), data=optimised_coeff, ) cubelist.append(cube) return cubelist
[docs] def create_coefficients_cubelist( self, optimised_coeffs: Union[List[float], ndarray], historic_forecasts: Cube, forecast_predictors: CubeList, ) -> CubeList: """Create a cubelist for storing the coefficients computed using EMOS. .. See the documentation for examples of these cubes. .. include:: extended_documentation/calibration/ ensemble_calibration/create_coefficients_cube.rst Args: optimised_coeffs: Array or list of optimised coefficients. Order of coefficients is [alpha, beta, gamma, delta]. Multiple beta values can be provided if either realizations are provided as the predictor, or if additional predictors are provided. historic_forecasts: Historic forecasts from the training dataset. forecast_predictors: The predictors are the historic forecasts processed to be either in the form of the ensemble mean or the ensemble realizations and any additional predictors. Returns: CubeList constructed using the coefficients provided and using metadata from the historic_forecasts cube. Each cube within the cubelist is for a separate EMOS coefficient e.g. alpha, beta, gamma, delta. Raises: ValueError: If the number of coefficients in the optimised_coeffs does not match the expected number. """ if self.predictor == "realizations" or len(forecast_predictors) > 1: optimised_coeffs = [ optimised_coeffs[0], optimised_coeffs[1:-2], optimised_coeffs[-2], optimised_coeffs[-1], ] if len(optimised_coeffs) != len(self.coeff_names): msg = ( "The number of coefficients in {} must equal the " "number of coefficient names {}.".format( optimised_coeffs, self.coeff_names ) ) raise ValueError(msg) return self._create_cubelist( optimised_coeffs, historic_forecasts, forecast_predictors )
[docs] def compute_initial_guess( self, truths: ndarray, forecast_predictor: ndarray, predictor: str, number_of_realizations: Optional[int], ) -> List[float]: """ Function to compute initial guess of the alpha, beta, gamma and delta components of the EMOS coefficients by linear regression of the forecast predictor and the truths, if requested. Otherwise, default values for the coefficients will be used. If the predictor is "mean", then the order of the initial_guess is [alpha, beta, gamma, delta]. Otherwise, if the predictor is "realizations" then the order of the initial_guess is [alpha, beta0, beta1, beta2, gamma, delta], where the number of beta variables will correspond to the number of realizations. In this example initial guess with three beta variables, there will correspondingly be three realizations. The default values for the initial guesses are in [alpha, beta, gamma, delta] ordering: * For the ensemble mean, the default initial guess: [0, 1, 0, 1] assumes that the raw forecast is skilful and the expected adjustments are small. * For the ensemble realizations, the default initial guess is effectively: [0, 1/3., 1/3., 1/3., 0, 1], such that each realization is assumed to have equal weight. If linear regression is enabled, the alpha and beta coefficients associated with the ensemble mean or ensemble realizations are modified based on the results from the linear regression fit. Args: truths: Array containing the truth fields. forecast_predictor: The predictors are the historic forecasts processed to be either in the form of the ensemble mean or the ensemble realizations and any additional predictors. predictor: String to specify the form of the predictor used to calculate the location parameter when estimating the EMOS coefficients. Currently the ensemble mean ("mean") and the ensemble realizations ("realizations") are supported as the predictors. number_of_realizations: Number of realizations within the forecast predictor. If no realizations are present, this option is None. Returns: List of coefficients to be used as initial guess. Order of coefficients is [alpha, beta, gamma, delta]. Multiple beta values can be provided if either realizations are provided as the predictor, or if additional predictors are provided. """ import statsmodels.api as sm default_initial_guess = ( self.use_default_initial_guess or np.any(np.isnan(truths)) or np.any(np.isnan(forecast_predictor)) ) if predictor == "mean" and default_initial_guess: initial_beta = np.repeat( 1.0 / forecast_predictor.shape[0], forecast_predictor.shape[0] ).tolist() initial_guess = [0] + initial_beta + [0, 1] elif predictor == "realizations" and default_initial_guess: initial_beta = np.repeat( np.sqrt(1.0 / number_of_realizations), number_of_realizations ).tolist() initial_guess = [0] + initial_beta + [0, 1] elif not self.use_default_initial_guess: truths_flattened = flatten_ignoring_masked_data(truths) forecast_predictor_flattened = flatten_ignoring_masked_data( forecast_predictor, preserve_leading_dimension=True ) val = sm.add_constant( forecast_predictor_flattened.T, has_constant="add" ).astype(np.float64) est = sm.OLS(truths_flattened.astype(np.float64), val).fit() intercept = est.params[0].astype(np.float32) gradient = est.params[1:].astype(np.float32) initial_guess = [intercept] + gradient.tolist() + [0, 1] return np.array(initial_guess, dtype=np.float32)
[docs] @staticmethod def mask_cube(cube: Cube, landsea_mask: Cube) -> None: """ Mask the input cube using the given landsea_mask. Sea points are filled with nans and masked. Args: cube: A cube to be masked, on the same grid as the landsea_mask. The last two dimensions on this cube must match the dimensions in the landsea_mask cube. landsea_mask: A cube containing a land-sea mask. Within the land-sea mask cube land points should be specified as ones, and sea points as zeros. Raises: IndexError: if the cube and landsea_mask shapes are not compatible. """ try: cube.data[..., ~landsea_mask.data.astype(bool)] = np.nan except IndexError as err: msg = "Cube and landsea_mask shapes are not compatible. {}".format(err) raise IndexError(msg) else: cube.data = np.ma.masked_invalid(cube.data)
[docs] def guess_and_minimise( self, truths: Cube, historic_forecasts: Cube, forecast_predictors: CubeList, forecast_var: Cube, number_of_realizations: Optional[int], ) -> CubeList: """Function to consolidate calls to compute the initial guess, compute the optimised coefficients using minimisation and store the resulting coefficients within a CubeList. Args: truths: Truths from the training dataset. historic_forecasts: Historic forecasts from the training dataset. These are used as a template cube for creating the coefficient cube. forecast_predictors: The predictors are the historic forecasts processed to be either in the form of the ensemble mean or the ensemble realizations and any additional predictors. forecast_var: Variance of the forecast for use in the minimisation. number_of_realizations: Number of realizations within the forecast predictor. If no realizations are present, this option is None. Returns: CubeList constructed using the coefficients provided and using metadata from the historic_forecasts cube. Each cube within the cubelist is for a separate EMOS coefficient e.g. alpha, beta, gamma, delta. """ if self.point_by_point and not self.use_default_initial_guess: y_name = truths.coord(axis="y").name() x_name = truths.coord(axis="x").name() initial_guess = [] for truth_slice in truths.slices_over([y_name, x_name]): constr = iris.Constraint( coord_values={ y_name: lambda cell: any( np.isclose(cell.point, truth_slice.coord(y_name).points) ), x_name: lambda cell: any( np.isclose(cell.point, truth_slice.coord(x_name).points) ), } ) forecast_predictors_slice = forecast_predictors.extract(constr) if self.predictor == "realizations": forecast_predictors_data = forecast_predictors_slice[0].data else: # If using mean as predictor, stack to produce one array where # the leading dimension represents the number of predictors. forecast_predictors_data = np.ma.stack( broadcast_data_to_time_coord(forecast_predictors_slice) ) initial_guess.append( self.compute_initial_guess( truth_slice.data, forecast_predictors_data, self.predictor, number_of_realizations, ) ) else: # Computing initial guess for EMOS coefficients if self.predictor == "realizations": forecast_predictor_data = forecast_predictors[0].data else: # If using mean as predictor, stack to produce one array where # the leading dimension represents the number of predictors. forecast_predictor_data = np.ma.stack( broadcast_data_to_time_coord(forecast_predictors) ) initial_guess = self.compute_initial_guess( truths.data, forecast_predictor_data, self.predictor, number_of_realizations, ) if self.point_by_point: initial_guess = np.broadcast_to( initial_guess, ( len(truths.coord(axis="y").points) * len(truths.coord(axis="x").points), len(initial_guess), ), ) # Calculate coefficients if there are no nans in the initial guess. optimised_coeffs = self.minimiser( initial_guess, forecast_predictors, truths, forecast_var, self.distribution.lower(), ) coefficients_cubelist = self.create_coefficients_cubelist( optimised_coeffs, historic_forecasts, forecast_predictors ) return coefficients_cubelist
[docs] def process( self, historic_forecasts: Cube, truths: Cube, additional_fields: Optional[CubeList] = None, landsea_mask: Optional[Cube] = None, ) -> CubeList: """ Using Nonhomogeneous Gaussian Regression/Ensemble Model Output Statistics, estimate the required coefficients from historical forecasts. The main contents of this method is: 1. Check that the predictor is valid. 2. Filter the historic forecasts and truths to ensure that these inputs match in validity time. 3. Apply unit conversion to ensure that the historic forecasts and truths have the desired units for calibration. 4. Calculate the variance of the historic forecasts. If the chosen predictor is the mean, also calculate the mean of the historic forecasts. 5. If a land-sea mask is provided then mask out sea points in the truths and predictor from the historic forecasts. 6. Calculate initial guess at coefficient values by performing a linear regression, if requested, otherwise default values are used. 7. Perform minimisation. Args: historic_forecasts: Historic forecasts from the training dataset. truths: Truths from the training dataset. additional_fields: Additional fields to use as supplementary predictors. landsea_mask: The optional cube containing a land-sea mask. If provided, only land points are used to calculate the coefficients. Within the land-sea mask cube land points should be specified as ones, and sea points as zeros. Returns: CubeList constructed using the coefficients provided and using metadata from the historic_forecasts cube. Each cube within the cubelist is for a separate EMOS coefficient e.g. alpha, beta, gamma, delta. Raises: ValueError: If either the historic_forecasts or truths cubes were not passed in. ValueError: If the units of the historic and truth cubes do not match. """ if landsea_mask and self.point_by_point: msg = ( "The use of a landsea mask with the option to compute " "coefficients independently at each point is not implemented." ) raise NotImplementedError(msg) if not (historic_forecasts and truths): raise ValueError("historic_forecasts and truths cubes must be provided.") historic_forecasts, truths = filter_non_matching_cubes( historic_forecasts, truths ) check_forecast_consistency(historic_forecasts) check_data_sufficiency( historic_forecasts, truths, self.point_by_point, self.proportion_of_nans ) if additional_fields: if self.predictor.lower() == "realizations": msg = ( "Currently the usage of additional fields with the use " "of realizations as the predictor is not supported." ) raise NotImplementedError(msg) disallowed_coords = [ "forecast_period", "forecast_reference_time", "realization", ] for cube in additional_fields: if any([cube.coords(c) for c in disallowed_coords]): coords = [ cube.coord(c) for c in disallowed_coords if cube.coords(c) ] msg = ( "Only static additional predictors are supported. " f"The {cube.name()} cube provided contains {coords}." ) raise ValueError(msg) # Make sure inputs have the same units. if self.desired_units: historic_forecasts.convert_units(self.desired_units) truths.convert_units(self.desired_units) if historic_forecasts.units != truths.units: msg = ( "The historic forecast units of {} do not match " "the truths units {}. These units must match, so that " "the coefficients can be estimated." ) raise ValueError(msg) number_of_realizations = None if self.predictor == "mean": forecast_predictors = iris.cube.CubeList( [collapsed(historic_forecasts, "realization", iris.analysis.MEAN)] ) elif self.predictor == "realizations": number_of_realizations = len(historic_forecasts.coord("realization").points) enforce_coordinate_ordering(historic_forecasts, "realization") forecast_predictors = iris.cube.CubeList([historic_forecasts]) if additional_fields: forecast_predictors.extend(additional_fields) forecast_var = collapsed( historic_forecasts, "realization", iris.analysis.VARIANCE ) # If a landsea_mask is provided mask out the sea points if landsea_mask: [self.mask_cube(fp, landsea_mask) for fp in forecast_predictors] self.mask_cube(forecast_var, landsea_mask) self.mask_cube(truths, landsea_mask) coefficients_cubelist = self.guess_and_minimise( truths, historic_forecasts, forecast_predictors, forecast_var, number_of_realizations, ) return coefficients_cubelist
[docs] class CalibratedForecastDistributionParameters(BasePlugin): """ Class to calculate calibrated forecast distribution parameters given an uncalibrated input forecast and EMOS coefficients. """
[docs] def __init__(self, predictor: str = "mean") -> None: """ Create a plugin that uses the coefficients created using EMOS from historical forecasts and corresponding truths and applies these coefficients to the current forecast to generate location and scale parameters that represent the calibrated distribution at each point. Args: predictor: String to specify the form of the predictor used to calculate the location parameter when estimating the EMOS coefficients. Currently the ensemble mean ("mean") and the ensemble realizations ("realizations") are supported as the predictors. """ self.predictor = check_predictor(predictor) self.coefficients_cubelist = None self.current_forecast = None self.additional_fields = None
def __repr__(self) -> str: """Represent the configured plugin instance as a string.""" result = "<CalibratedForecastDistributionParameters: predictor: {}>" return result.format(self.predictor)
[docs] def _diagnostic_match(self) -> None: """Check that the forecast diagnostic matches the coefficients used to construct the coefficients. Raises: ValueError: If the forecast diagnostic and coefficients cube diagnostic does not match. """ for cube in self.coefficients_cubelist: diag = cube.attributes["diagnostic_standard_name"] if self.current_forecast.name() != diag: msg = ( f"The forecast diagnostic ({self.current_forecast.name()}) " "does not match the diagnostic used to construct the " f"coefficients ({diag})" ) raise ValueError(msg)
[docs] def _spatial_domain_match(self) -> None: """ Check that the domain of the current forecast and coefficients cube match for gridded forecasts. For spot forecasts, the spatial domain check is skipped. Raises: ValueError: If the points or bounds of the specified axis of the current_forecast and coefficients_cube do not match. """ # If spot data, skip the spatial domain check. if self.current_forecast.coords("wmo_id"): return msg = ( "The points or bounds of the {} axis given by the current forecast {} " "do not match those given by the coefficients cube {}." ) for axis in ["x", "y"]: for coeff_cube in self.coefficients_cubelist: if ( self.current_forecast.coord(axis=axis).collapsed().points != coeff_cube.coord(axis=axis).collapsed().points ).all() or ( self.current_forecast.coord(axis=axis).collapsed().bounds != coeff_cube.coord(axis=axis).collapsed().bounds ).all(): raise ValueError( msg.format( axis, self.current_forecast.coord(axis=axis).collapsed(), coeff_cube.coord(axis=axis).collapsed(), ) )
[docs] def _calculate_location_parameter_from_mean(self) -> ndarray: """ Function to calculate the location parameter when the ensemble mean at each grid point is the predictor. Further information is available in the :mod:`module level docstring <improver.calibration.ensemble_calibration>`. Returns: Location parameter calculated using the ensemble mean as the predictor. """ forecast_predictors = iris.cube.CubeList( [collapsed(self.current_forecast, "realization", iris.analysis.MEAN)] ) if self.additional_fields: forecast_predictors.extend(self.additional_fields) beta_cube = self.coefficients_cubelist.extract_cube("emos_coefficient_beta") fp_names = [fp.name() for fp in forecast_predictors] if len(forecast_predictors) != len(beta_cube.coord("predictor_index").points): n_coord_points = len(beta_cube.coord("predictor_index").points) coord_names = beta_cube.coord("predictor_name").points msg = ( "The number of forecast predictors must equal the number of " "beta coefficients in order to create a calibrated forecast. " f"Number of predictor cubes = {len(forecast_predictors)}: {fp_names}, " f"Number of predictor coords = {n_coord_points}: {coord_names}" ) raise ValueError(msg) # Calculate location parameter = a + b*X, where X is the # raw ensemble mean. In this case, b = beta. location_parameter = np.zeros(forecast_predictors[0].shape) for fp in forecast_predictors: constr = iris.Constraint(predictor_name=fp.name()) location_parameter += beta_cube.extract(constr).data * fp.data location_parameter += self.coefficients_cubelist.extract_cube( "emos_coefficient_alpha" ).data location_parameter = location_parameter.astype(np.float32) return location_parameter
[docs] def _calculate_location_parameter_from_realizations(self) -> ndarray: """ Function to calculate the location parameter when the ensemble realizations are the predictor. Further information is available in the :mod:`module level docstring <improver.calibration.ensemble_calibration>`. Returns: Location parameter calculated using the ensemble realizations as the predictor. """ forecast_predictor = self.current_forecast # Calculate location parameter = a + b1*X1 .... + bn*Xn, where X is the # ensemble realizations. The number of b and X terms depends upon the # number of ensemble realizations. In this case, b = beta^2. beta_cube = self.coefficients_cubelist.extract_cube("emos_coefficient_beta") beta_values = np.atleast_2d(beta_cube.data * beta_cube.data) beta_values = ( np.atleast_2d(np.squeeze(beta_values.T)) if beta_cube.data.ndim != 1 else beta_values ) a_and_b = np.hstack( ( np.atleast_2d( self.coefficients_cubelist.extract_cube( "emos_coefficient_alpha" ).data ).T, beta_values, ) ) forecast_predictor_flat = convert_cube_data_to_2d(forecast_predictor) xy_shape = next(forecast_predictor.slices_over("realization")).shape col_of_ones = np.ones(np.prod(xy_shape), dtype=np.float32) ones_and_predictor = np.column_stack((col_of_ones, forecast_predictor_flat)) location_parameter = ( np.sum(ones_and_predictor * a_and_b, axis=-1) .reshape(xy_shape) .astype(np.float32) ) return location_parameter
[docs] def _calculate_scale_parameter(self) -> ndarray: """ Calculation of the scale parameter using the ensemble variance adjusted using the gamma and delta coefficients calculated by EMOS. Further information is available in the :mod:`module level docstring <improver.calibration.ensemble_calibration>`. Returns: Scale parameter for defining the distribution of the calibrated forecast. """ forecast_var = self.current_forecast.collapsed( "realization", iris.analysis.VARIANCE ) # Calculating the scale parameter, based on the raw variance S^2, # where predicted scale parameter (or equivalently standard deviation # for a normal distribution) = sqrt(c + dS^2), where c = (gamma)^2 and # d = (delta)^2. scale_parameter = np.sqrt( self.coefficients_cubelist.extract_cube("emos_coefficient_gamma").data * self.coefficients_cubelist.extract_cube("emos_coefficient_gamma").data + self.coefficients_cubelist.extract_cube("emos_coefficient_delta").data * self.coefficients_cubelist.extract_cube("emos_coefficient_delta").data * forecast_var.data ).astype(np.float32) return scale_parameter
[docs] def _create_output_cubes( self, location_parameter: ndarray, scale_parameter: ndarray ) -> Tuple[Cube, Cube]: """ Creation of output cubes containing the location and scale parameters. Args: location_parameter: Location parameter of the calibrated distribution. scale_parameter: Scale parameter of the calibrated distribution. Returns: - Location parameter of the calibrated distribution with associated metadata. - Scale parameter of the calibrated distribution with associated metadata. """ template_cube = next(self.current_forecast.slices_over("realization")) template_cube.remove_coord("realization") location_parameter_cube = create_new_diagnostic_cube( "location_parameter", template_cube.units, template_cube, template_cube.attributes, data=location_parameter, ) scale_parameter_cube = create_new_diagnostic_cube( "scale_parameter", template_cube.units, template_cube, template_cube.attributes, data=scale_parameter, ) return location_parameter_cube, scale_parameter_cube
[docs] def process( self, current_forecast: Cube, coefficients_cubelist: CubeList, additional_fields: Optional[CubeList] = None, landsea_mask: Optional[Cube] = None, tolerate_time_mismatch: Optional[bool] = False, ) -> Tuple[Cube, Cube]: """ Apply the EMOS coefficients to the current forecast, in order to generate location and scale parameters for creating the calibrated distribution. Args: current_forecast: The cube containing the current forecast. coefficients_cubelist: CubeList of EMOS coefficients where each cube within the cubelist is for a separate EMOS coefficient e.g. alpha, beta, gamma, delta. additional_fields: Additional fields to be used as forecast predictors. landsea_mask: The optional cube containing a land-sea mask. If provided sea points will be masked in the output cube. This cube needs to have land points set to 1 and sea points to 0. tolerate_time_mismatch: If True, tolerate a mismatch in validity time and forecast period for coefficients vs forecasts. Use with caution! Returns: - Cube containing the location parameter of the calibrated distribution calculated using either the ensemble mean or the ensemble realizations. The location parameter represents the point at which a resulting PDF would be centred. - Cube containing the scale parameter of the calibrated distribution calculated using either the ensemble mean or the ensemble realizations. The scale parameter represents the statistical dispersion of the resulting PDF, so a larger scale parameter will result in a broader PDF. """ self.current_forecast = current_forecast self.additional_fields = additional_fields self.coefficients_cubelist = coefficients_cubelist # Check coefficients_cube and forecast cube are compatible. self._diagnostic_match() if not tolerate_time_mismatch: # Check validity time and forecast period matches. for cube in coefficients_cubelist: forecast_coords_match(cube, current_forecast) self._spatial_domain_match() if self.predictor == "mean": location_parameter = self._calculate_location_parameter_from_mean() else: location_parameter = self._calculate_location_parameter_from_realizations() scale_parameter = self._calculate_scale_parameter() location_parameter_cube, scale_parameter_cube = self._create_output_cubes( location_parameter, scale_parameter ) # Use a mask to confine calibration to land regions by masking the # sea. if landsea_mask: # Calibration is applied to all grid points, but the areas # where a mask is valid is then masked out at the end. The cube # containing a land-sea mask has sea points defined as zeroes and # the land points as ones, so the mask needs to be flipped here. flip_mask = np.logical_not(landsea_mask.data) scale_parameter_cube.data = np.ma.masked_where( flip_mask, scale_parameter_cube.data ) location_parameter_cube.data = np.ma.masked_where( flip_mask, location_parameter_cube.data ) return location_parameter_cube, scale_parameter_cube
[docs] class ApplyEMOS(PostProcessingPlugin): """ Class to calibrate an input forecast given EMOS coefficients """
[docs] def __init__(self, percentiles: Optional[Sequence] = None): """Initialise class. Args: percentiles: The set of percentiles used to create the calibrated forecast. """ self.percentiles = [np.float32(p) for p in percentiles] if percentiles else None
[docs] def _check_additional_field_sites(self, forecast, additional_fields): """Check that the forecast and additional fields have matching sites. Args: forecast: Uncalibrated forecast as probabilities, percentiles or realizations additional_fields: Additional fields to be used as forecast predictors. Raises: ValueError: If the sites mismatch between the forecast and additional fields. """ if additional_fields: if any([c.name() == "wmo_id" for c in forecast.coords()]): sites = [ np.array_equal( p.coord("wmo_id").points, forecast.coord("wmo_id").points ) for p in additional_fields ] if not np.all(sites): mismatching_sites = [] for ap in additional_fields: mismatching_sites.extend( list( set(ap.coord("wmo_id").points).symmetric_difference( set(forecast.coord("wmo_id").points) ) ) ) msg = ( "The forecast and additional predictors have " f"mismatching sites. The mismatching sites are: " f"{list(set(mismatching_sites))}" ) raise ValueError(msg)
[docs] def process( self, forecast: Cube, coefficients: CubeList, additional_fields: Optional[CubeList] = None, land_sea_mask: Optional[Cube] = None, prob_template: Optional[Cube] = None, realizations_count: Optional[int] = None, ignore_ecc_bounds: bool = True, tolerate_time_mismatch: bool = False, predictor: str = "mean", randomise: bool = False, random_seed: Optional[int] = None, return_parameters: bool = False, ) -> Union[Cube, CubeList]: """Calibrate input forecast using pre-calculated coefficients Args: forecast: Uncalibrated forecast as probabilities, percentiles or realizations coefficients: EMOS coefficients additional_fields: Additional fields to be used as forecast predictors. land_sea_mask: Land sea mask where a value of "1" represents land points and "0" represents sea. If set, allows calibration of land points only. prob_template: A cube containing a probability forecast that will be used as a template when generating probability output when the input format of the forecast cube is not probabilities i.e. realizations or percentiles. realizations_count: Number of realizations to use when generating the intermediate calibrated forecast from probability or percentile inputs ignore_ecc_bounds: If True, allow percentiles from probabilities to exceed the ECC bounds range. If input is not probabilities, this is ignored. tolerate_time_mismatch: If True, tolerate a mismatch in validity time and forecast period for coefficients vs forecasts. Use with caution! predictor: Predictor to be used to calculate the location parameter of the calibrated distribution. Value is "mean" or "realizations". randomise: Used in generating calibrated realizations. If input forecast is probabilities or percentiles, this is ignored. random_seed: Used in generating calibrated realizations. If input forecast is probabilities or percentiles, this is ignored. return_parameters: If True, return location and scale parameters of the calibrated forecast distribution, rather than forecasts. This option supercedes all other inputs which affect the format of the output. Returns: Calibrated forecast in the form of the input (ie probabilities percentiles or realizations), or a tuple containing cubes of location and scale parameters of the calibrated forecast distribution if return_parameters is True. """ self.input_forecast_type = get_forecast_type(forecast) self.output_forecast_type = ( "probabilities" if prob_template else self.input_forecast_type ) if land_sea_mask and self.input_forecast_type != self.output_forecast_type: msg = ( "If supplying a land-sea mask, the format of the input " "forecast must be the same as the format of the output " "forecast to facilitate merging of pre-calibration " "and post-calibration data. The input forecast type was " f"{self.input_forecast_type}. The output forecast type " f"was {self.output_forecast_type}." ) raise ValueError(msg) self._check_additional_field_sites(forecast, additional_fields) forecast_as_realizations = forecast.copy() if self.input_forecast_type != "realizations": forecast_as_realizations = convert_to_realizations( forecast.copy(), realizations_count, ignore_ecc_bounds ) calibration_plugin = CalibratedForecastDistributionParameters( predictor=predictor ) location_parameter, scale_parameter = calibration_plugin( forecast_as_realizations, coefficients, additional_fields=additional_fields, landsea_mask=land_sea_mask, tolerate_time_mismatch=tolerate_time_mismatch, ) if return_parameters: return location_parameter, scale_parameter else: self.distribution = { "name": get_attribute_from_coefficients(coefficients, "distribution"), "location": location_parameter, "scale": scale_parameter, "shape": get_attribute_from_coefficients( coefficients, "shape_parameters", optional=True ), } template = prob_template if prob_template else forecast result = generate_forecast_from_distribution( self.distribution, template, self.percentiles, randomise, random_seed ) if land_sea_mask: # fill in masked sea points with uncalibrated data merge_land_and_sea(result, forecast) return result
[docs] def get_forecast_type(forecast: Cube) -> str: """Identifies whether the forecast is in probability, realization or percentile space. Args: forecast Returns: forecast_type: str One of "probabilities", "realizations" or "percentiles" """ try: find_percentile_coordinate(forecast) except CoordinateNotFoundError: if forecast.name().startswith("probability_of"): forecast_type = "probabilities" else: forecast_type = "realizations" else: forecast_type = "percentiles" return forecast_type
[docs] def convert_to_realizations( forecast: Cube, realizations_count: Optional[int], ignore_ecc_bounds: bool ) -> Cube: """Convert an input forecast of probabilities or percentiles into pseudo-realizations. Args: forecast realizations_count: Number of pseudo-realizations to generate from the input forecast ignore_ecc_bounds: If True, allow percentiles from probabilities to exceed the ECC bounds range. If input is not probabilities, this is ignored. Returns: Cube with pseudo-realizations. """ input_forecast_type = get_forecast_type(forecast) if not realizations_count: raise ValueError( "The 'realizations_count' argument must be defined " f"for forecasts provided as {input_forecast_type}" ) if input_forecast_type == "probabilities": conversion_plugin = ConvertProbabilitiesToPercentiles( ecc_bounds_warning=ignore_ecc_bounds ) if input_forecast_type == "percentiles": conversion_plugin = ResamplePercentiles(ecc_bounds_warning=ignore_ecc_bounds) forecast_as_percentiles = conversion_plugin( forecast, no_of_percentiles=realizations_count ) forecast_as_realizations = RebadgePercentilesAsRealizations()( forecast_as_percentiles ) return forecast_as_realizations
[docs] def get_attribute_from_coefficients( coefficients: CubeList, attribute_name: str, optional: bool = False ) -> Optional[Any]: """Get the value for the requested attribute, ensuring that the attribute is present consistently across the cubes within the coefficients cubelist. Args: coefficients: EMOS coefficients attribute_name: Name of expected attribute optional: Indicate whether the attribute is allowed to be optional. Returns: Returns None if the attribute is not present. Otherwise, the value of the attribute is returned. Raises: AttributeError: If the expected attribute is not on all coefficients cubes. AttributeError: If the expected attribute is not the same across all coefficients cubes. """ attributes = [ str(c.attributes[attribute_name]) for c in coefficients if c.attributes.get(attribute_name) is not None ] if not attributes and optional: return None if not attributes and not optional: msg = ( f"The {attribute_name} attribute must be specified on all " "coefficients cubes." ) raise AttributeError(msg) if len(set(attributes)) == 1 and len(attributes) == len(coefficients): return coefficients[0].attributes[attribute_name] msg = ( "Coefficients must share the same {0} attribute. " "{0} attributes provided: {1}".format(attribute_name, attributes) ) raise AttributeError(msg)
[docs] def generate_forecast_from_distribution( distribution: Dict, template: Cube, percentiles: Optional[Sequence], randomise: bool, random_seed: Optional[int], ) -> Cube: """ Generate calibrated probability, percentile or realization output in the desired format. Args: distribution: A dictionary with the following key-value pairs: - "name": the name of the distribution - "location": a cube of location parameters - "scale": a cube of scale parameters - "shape": an optional array of shape parameters template: A template cube containing the coordinates and metadata expected on the calibrated forecast. percentiles: The set of percentiles used to create the calibrated forecast if the template cube contains percentile data. randomise: If True, order realization output randomly rather than using the input forecast. If forecast type is not realizations, this is ignored. random_seed: For realizations input if randomise is True, random seed for generating re-ordered percentiles. If randomise is False, the random seed may still be used for splitting ties. Returns: Calibrated forecast """ output_forecast_type = get_forecast_type(template) if output_forecast_type == "probabilities": conversion_plugin = ConvertLocationAndScaleParametersToProbabilities( distribution=distribution["name"], shape_parameters=distribution["shape"], ) result = conversion_plugin( distribution["location"], distribution["scale"], template ) else: conversion_plugin = ConvertLocationAndScaleParametersToPercentiles( distribution=distribution["name"], shape_parameters=distribution["shape"], ) if output_forecast_type == "percentiles": perc_coord = find_percentile_coordinate(template) result = conversion_plugin( distribution["location"], distribution["scale"], template, percentiles=(percentiles if percentiles else perc_coord.points), ) else: no_of_percentiles = len(template.coord("realization").points) percentiles = conversion_plugin( distribution["location"], distribution["scale"], template, no_of_percentiles=no_of_percentiles, ) result = EnsembleReordering().process( percentiles, template, random_ordering=randomise, random_seed=random_seed, ) # Preserve cell methods from template. for cm in template.cell_methods: result.add_cell_method(cm) return result