Source code for improver.calibration.samos_calibration

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""
This module defines all the "plugins" specific to Standardised Anomaly Model Output
Statistics (SAMOS).
"""

import warnings
from typing import Dict, List, Optional, Sequence, Tuple

import iris
import iris.pandas
import pandas as pd

try:
    import pygam
except ModuleNotFoundError:
    # Define dummy class to avoid type hint errors.
[docs] class pygam:
[docs] def GAM(self): pass
from iris.analysis import MEAN, STD_DEV, SUM from iris.cube import Cube, CubeList from iris.util import new_axis from numpy import array, clip, float32, int64, isnan, nan from numpy.ma import masked_all_like from improver import BasePlugin, PostProcessingPlugin from improver.calibration import add_static_feature_from_cube_to_df from improver.calibration.emos_calibration import ( ApplyEMOS, EstimateCoefficientsForEnsembleCalibration, convert_to_realizations, generate_forecast_from_distribution, get_attribute_from_coefficients, get_forecast_type, ) from improver.ensemble_copula_coupling.utilities import get_bounds_of_distribution from improver.metadata.probabilistic import find_threshold_coordinate from improver.utilities.cube_manipulation import collapse_realizations from improver.utilities.generalized_additive_models import GAMFit, GAMPredict from improver.utilities.mathematical_operations import CalculateClimateAnomalies # Setting to allow cubes with more than 2 dimensions to be converted to/from dataframes. iris.FUTURE.pandas_ndim = True
[docs] def prepare_data_for_gam( input_cube: Cube, additional_fields: Optional[CubeList] = None, unique_site_id_key: Optional[str] = None, ) -> pd.DataFrame: """ Convert input cubes in to a single, combined dataframe. Each of the input cubes is converted to a pandas dataframe. The dataframe derived from input_cube then forms the left in a series of left dataframe joins with those derived from each cube in additional_fields. The x and y coordinates are used to perform this join. This means that the resulting combined dataframe will contain all of the sites/grid points in input_cube, but not any other sites/grid points in the additional_fields cubes. Args: input_cube: A cube of forecast or observation data. additional_fields: Additional cubes with points which can be matched with points in input_cube by matching spatial coordinate values. unique_site_id_key: If working with spot data and available, the name of the coordinate in the input cubes that contains unique site IDs, e.g. "wmo_id" if all sites have a valid wmo_id. Returns: A pandas dataframe with rows equal to the number of sites/grid points in input_cube and containing the following columns: 1. A column with the same name as input_cube containing the original cube data 2. A series of columns derived from the input_cube dimension coordinates 3. A series of columns associated with any auxiliary coordinates (scalar or otherwise) of input_cube 4. One column associated with each of the cubes in additional cubes, with column names matching the associated cube """ df = iris.pandas.as_data_frame( input_cube, add_aux_coords=True, add_cell_measures=True, add_ancillary_variables=True, ) df.reset_index(inplace=True) if additional_fields: # Check if we are dealing with spot data. site_data = "spot_index" in [c.name() for c in input_cube.coords()] # For site data we should use unique IDs wherever possible. As a # fallback we can match on latitude, longitude and altitude. We # need altitude to accommodate e.g. sites with a lower and upper # forecast altitude. If we are working with gridded data we use the # spatial coordinates of the input cube. if site_data and unique_site_id_key is not None: match_coords = [unique_site_id_key] elif site_data: match_coords = ["latitude", "longitude", "altitude"] else: match_coords = [ input_cube.coord(axis="X").name(), input_cube.coord(axis="Y").name(), ] for cube in additional_fields: df = add_static_feature_from_cube_to_df( df, cube, feature_name=cube.name(), possible_merge_columns=match_coords, float_decimals=4, ) return df
[docs] def convert_dataframe_to_cube( df: pd.DataFrame, template_cube: Cube, ) -> Cube: """Function to convert a Pandas dataframe to Iris cube format by using a template cube. The input template_cube provides all metadata for the output. Args: df: A Pandas dataframe which must contain at least the following columns: 1. A column matching the name of template_cube 2. A series of columns with names which match the dimension coordinates on template_cube. The data in these columns should match the points on the corresponding dimension of template_cube. template_cube: A cube which will provide all metadata for the output cube Returns: A copy of template_cube containing data from df. """ dim_coords = [c.name() for c in template_cube.coords(dim_coords=True)] diagnostic = template_cube.name() indexed_df = df.set_index(dim_coords, inplace=False) indexed_df.sort_index(inplace=True) # The as_cubes() function returns a cubelist. In this case, the cubelist contains # only one element. converted_cube = iris.pandas.as_cubes(indexed_df[[diagnostic]])[0] result = template_cube.copy(data=converted_cube.data) return result
[docs] def get_climatological_stats( input_cube: Cube, gams: List, gam_features: List[str], additional_cubes: Optional[CubeList], sd_clip: float = 0.25, unique_site_id_key: Optional[str] = None, ) -> Tuple[Cube, Cube]: """Function to predict climatological means and standard deviations given fitted GAMs for each statistic and cubes which can be used to construct a dataframe containing all required features for those GAMs. Args: input_cube gams: A list containing two fitted GAMs, the first for predicting the climatological mean of the locations in input_cube and the second predicting the climatological standard deviation. gam_features: The list of features. These must be either coordinates on input_cube or share a name with a cube in additional_cubes. The index of each feature should match the indices used in model_specification. additional_cubes: Additional fields to use as supplementary predictors. sd_clip: The minimum standard deviation value to allow when predicting from the GAM. Any predictions below this value will be set to this value. unique_site_id_key: If working with spot data and available, the name of the coordinate in the input cubes that contains unique site IDs, e.g. "wmo_id" if all sites have a valid wmo_id. Returns: A pair of cubes containing climatological mean and climatological standard deviation predictions respectively. """ diagnostic = input_cube.name() df = prepare_data_for_gam( input_cube, additional_cubes, unique_site_id_key=unique_site_id_key ) # Calculate climatological means and standard deviations using previously # fitted GAMs. mean_pred = GAMPredict().process(gams[0], df[gam_features]) sd_pred = GAMPredict().process(gams[1], df[gam_features]) # Convert means and standard deviations into cubes df[diagnostic] = mean_pred mean_cube = convert_dataframe_to_cube(df, input_cube) df[diagnostic] = sd_pred sd_cube = convert_dataframe_to_cube(df, input_cube) sd_cube.data = clip(sd_cube.data, a_min=sd_clip, a_max=None) return mean_cube, sd_cube
[docs] class TrainGAMsForSAMOS(BasePlugin): """ Class for fitting Generalised Additive Models (GAMs) to training data for use in a Standardised Anomaly Model Output Statistics (SAMOS) calibration scheme. Two GAMs are trained: one modelling the mean of the training data and one modelling the standard deviation. These can then be used to convert forecasts or observations to climatological anomalies. This plugin should be run separately for forecast and observation data. """
[docs] def __init__( self, model_specification: list[list[str], list[int], dict], max_iter: int = 100, tol: float = 0.0001, distribution: str = "normal", link: str = "identity", fit_intercept: bool = True, window_length: int = 11, valid_rolling_window_fraction: float = 0.5, unique_site_id_key: Optional[str] = None, ): """ Initialize the class. Args: model_specification: A list of lists which each contain three items (in order): 1. a string containing a single pyGAM term; one of 'linear', 'spline', 'tensor', or 'factor'. 2. a list of integers which correspond to the features to be included in that term. 3. a dictionary of kwargs to be included when defining the term. max_iter: A pyGAM argument which determines the maximum iterations allowed when fitting the GAM. tol: A pyGAM argument determining the tolerance used to define the stopping criteria. distribution: A pyGAM argument determining the distribution to be used in the model. link: A pyGAM argument determining the link function to be used in the model. fit_intercept: A pyGAM argument determining whether to include an intercept term in the model. window_length: This must be an odd integer greater than 1. The length of the rolling window used to calculate the mean and standard deviation of the input cube when the input cube does not have a realization dimension coordinate. valid_rolling_window_fraction: This must be a float between 0 and 1, inclusive. When performing rolling window calculations, if a given window has less than this fraction of valid data points (not NaN) then the value returned will be NaN and will be excluded from training. unique_site_id_key: An optional key to use for uniquely identifying each site in the training data. If not provided, the default behavior is to use the spatial coordinates (latitude, longitude) of each site. """ self.model_specification = model_specification self.max_iter = max_iter self.tol = tol self.distribution = distribution self.link = link self.fit_intercept = fit_intercept self.unique_site_id_key = unique_site_id_key if window_length < 3 or window_length % 2 == 0 or window_length % 1 != 0: raise ValueError( "The window_length input must be an odd integer greater than 1. " f"Received: {window_length}." ) else: self.window_length = window_length if not (0 <= valid_rolling_window_fraction <= 1): raise ValueError( "The valid_rolling_window_fraction input must be between 0 and 1. " f"Received: {valid_rolling_window_fraction}." ) else: self.valid_rolling_window_fraction = valid_rolling_window_fraction
[docs] def apply_aggregator( self, padded_cube: Cube, aggregator: iris.analysis.WeightedAggregator ) -> Cube: """ Internal function to apply rolling window aggregator to padded cube. Args: padded_cube: The cube to have rolling window calculation applied to. aggregator: The aggregator to use in the rolling window calculation. Returns: A cube containing the result of the rolling window calculation. Any cell methods and time bounds are removed from the cube as they are not necessary for later calculations. """ summary_cube = padded_cube.rolling_window( coord="time", aggregator=aggregator, window=self.window_length ) summary_cube.cell_methods = () summary_cube.coord("time").bounds = None summary_cube.coord("time").points = summary_cube.coord("time").points.astype( int64 ) summary_cube.data = summary_cube.data.filled(nan) return summary_cube
[docs] def calculate_statistic_by_rolling_window(self, input_cube: Cube): """Function to calculate mean and standard deviation of input_cube using a rolling window calculation over the time coordinate. The input_cube time coordinate is padded at the beginning and end of the time coordinate, to ensure that the result of the rolling window calculation has the same shape as input_cube. Additionally, any missing time points in the input cube are filled with masked data, so that the rolling window is always taken over a period containing an equal number of time points. """ removed_coords = [] pad_width = int((self.window_length - 1) / 2) # Define the minimum number of valid points required in each rolling window. # This threshold does not have to be an integer. allowed_valid_count = self.window_length * self.valid_rolling_window_fraction # Pad the time coordinate of the input cube, then calculate the mean and # standard deviation using a rolling window over the time coordinate. time_coord = input_cube.coord("time") increments = time_coord.points[1:] - time_coord.points[:-1] min_increment = increments.min() if not all(x % min_increment == 0 for x in increments): raise ValueError( "The increments between points in the time coordinate of the input " "cube must be divisible by the smallest increment between points to " "allow for rolling window calculations to be performed over the time " "coordinate. The increments between points in the time coordinate " f"were: {increments}. The smallest increment was: {min_increment}." ) padded_cube = input_cube.copy() # Check if we are dealing with a period diagnostic. if padded_cube.coord("time").bounds is not None: bounds_width = ( padded_cube.coord("time").bounds[0][1] - padded_cube.coord("time").bounds[0][0] ) else: bounds_width = None # Remove time related coordinates other than the coordinate called # "time" on the input cube in order to allow extension of the "time" # coordinate. These coordinates are saved and added back to the output # cubes. for coord in [ "forecast_reference_time", "forecast_period", "blend_time", ]: if padded_cube.coords(coord): removed_coords.append(padded_cube.coord(coord).copy()) padded_cube.remove_coord(coord) # Create slices of artificial cube data to pad the existing cube time # coordinate and fill any gaps. This ensures that all the time points # are equally spaced and the padding ensures that the output of the # rolling window calculation is the same shape as the input cube. existing_points = time_coord.points desired_points = array( [ x for x in range( min(existing_points) - (min_increment * pad_width), max(existing_points) + (min_increment * pad_width) + 1, min_increment, ) ], dtype=int64, ) # Slice input_cube over time dimension so that the artificial cubes can be # concatenated correctly. padded_cubelist = iris.cube.CubeList([]) for cslice in padded_cube.slices_over("time"): padded_cubelist.append(new_axis(cslice.copy(), "time")) # For each desired point which doesn't already correspond to a time point # on the cube, create a new cube slice with that time point with all data # masked. cslice = padded_cube.extract(iris.Constraint(time=time_coord.cell(0))) for point in desired_points: if point not in existing_points: # Create a new cube slice with time point equal to point. new_slice = cslice.copy() new_slice.coord("time").points = point if bounds_width: # Add correct bounds to point if required. new_slice.coord("time").bounds = [point - bounds_width, point] new_slice = new_axis(new_slice, "time") new_slice.data = masked_all_like(new_slice.data) padded_cubelist.append(new_slice) padded_cube = padded_cubelist.concatenate_cube() # A data sufficiency check is performed to ensure that each rolling window # contains enough valid data points to produce a sensible result. Data is # considered sufficient if at least 50% of the data points in the window are # valid. Where data is considered insufficient, the result of the rolling # window calculation is replaced with nan. check_data = padded_cube.copy() check_data.data = (~isnan(check_data.data)).astype(float32) valid_count = self.apply_aggregator(check_data, SUM) aggregated_cubes = {} for aggregator in [MEAN, STD_DEV]: aggregated_cube = self.apply_aggregator(padded_cube, aggregator) aggregated_cube.data[valid_count.data < allowed_valid_count] = nan aggregated_cubes[aggregator.name()] = aggregated_cube.copy() # Create constraint to extract only those time points which were present in # the original input cube. constr = iris.Constraint( time=lambda cell: cell.point in input_cube.coord("time").cells() ) input_mean = aggregated_cubes["mean"].extract(constr) input_sd = aggregated_cubes["standard_deviation"].extract(constr) # Add any removed time coordinates back on to the mean and standard # deviation cubes. for coord in removed_coords: time_dim = padded_cube.coord_dims("time") kwargs = {"data_dims": time_dim} if len(coord.points) > 1 else {} input_mean.add_aux_coord(coord, **kwargs) input_sd.add_aux_coord(coord, **kwargs) return input_mean, input_sd
[docs] def calculate_cube_statistics(self, input_cube: Cube) -> CubeList: """Function to calculate mean and standard deviation of the input cube. If the cube has a realization dimension then statistics will be calculated by collapsing over this dimension. Otherwise, a rolling window calculation over the time dimension will be used. The rolling window method calculates a statistic over data in a fixed time window and assigns the value of the statistic to the central time in the window. For example, for data points [0.0, 1.0, 2.0, 1.0, 0.0] each valid in consecutive hours T+0, T+1, T+2, T+3, T+4, the mean calculated by a rolling window of width 5 would be 0.8. This value would be associated with T+2 in the resulting cube. To enable this calculation to produce a cube of the same dimensions as input_cube, the data in input_cube is first padded with additional data. For a rolling window of width 5, 2 data slices are added to the start and end of the input_cube time coordinate. The data in these slices are masked so that they don't affect the calculated statistics. Args: input_cube: A cube with at least one of the following coordinates: 1. A realization dimension coordinate 2. A time coordinate with more than one point and evenly spaced points. Returns: CubeList containing a mean cube and standard deviation cube. Raises: ValueError: If input_cube does not contain a realization coordinate and does contain a time coordinate with unevenly spaced points. """ if input_cube.coords("realization"): input_mean = collapse_realizations(input_cube, method="mean") input_sd = collapse_realizations(input_cube, method="std_dev") else: input_mean, input_sd = self.calculate_statistic_by_rolling_window( input_cube ) return CubeList([input_mean, input_sd])
[docs] def process( self, input_cube: Cube, features: List[str], additional_fields: Optional[CubeList] = None, ) -> List: """ Function to fit GAMs to model the mean and standard deviation of the input_cube for use in SAMOS. Args: input_cube: Historic forecasts or observations from the training dataset. Must contain at least one of: - a realization coordinate - a time coordinate with more than one point and equally spaced points features: The list of features. These must be either coordinates on input_cube or share a name with a cube in additional_fields. The index of each feature should match the indices used in model_specification. additional_fields: Additional fields to use as supplementary predictors. Returns: A list containing fitted GAMs which model the input_cube mean and standard deviation. Raises: ValueError: If input_cube does not contain at least one of a realization or time coordinate. ValueError: If the input cube does not have a realization coordinate and the time coordinate that it does have contains only one point. """ if not input_cube.coords("realization", dim_coords=True): if not input_cube.coords("time"): msg = ( "The input cube must contain at least one of a realization or time " "coordinate in order to allow the calculation of means and " "standard deviations. The following coordinates were found: " f"{input_cube.coords()}." ) raise ValueError(msg) elif len(input_cube.coord("time").points) == 1: msg = ( "The input cube does not contain a realization coordinate. In " "order to calculate means and standard deviations the time " "coordinate must contain more than one point. The following time " f"coordinate was found: {input_cube.coord('time')}." ) warnings.warn(msg) return None # Calculate mean and standard deviation from input cube. stat_cubes = self.calculate_cube_statistics(input_cube) # Create list to put fitted GAM models in. output = [] # Initialize plugin used to fit GAMs. plugin = GAMFit( model_specification=self.model_specification, max_iter=self.max_iter, tol=self.tol, distribution=self.distribution, link=self.link, fit_intercept=self.fit_intercept, ) for stat_cube in stat_cubes: df = prepare_data_for_gam( stat_cube, additional_fields, unique_site_id_key=self.unique_site_id_key ) feature_values = df[features].values targets = df[input_cube.name()].values output.append(plugin.process(feature_values, targets)) if None in output: return None return output
[docs] class TrainEMOSForSAMOS(BasePlugin): """Class to calculate Ensemble Model Output Statistics (EMOS) coefficients to calibrate climate anomaly forecasts given training data including forecasts and verifying observations and four Generalized Additive Models (GAMs) which model: - forecast mean, - forecast standard deviation, - observation mean, - observation standard deviation. This class first calculates climatological means and standard deviations by predicting them from the input GAMs. Following this, the input forecasts and observations are converted to climatological anomalies using the predicted means and standard deviations. Finally, EMOS coefficients are calculated from the climatological anomaly training data. """
[docs] def __init__( self, distribution: str, emos_kwargs: Optional[Dict] = None, unique_site_id_key: Optional[str] = None, ) -> None: """Initialize the class. Args: distribution: Name of distribution. Assume that a calibrated version of the climate anomaly forecast could be represented using this distribution. emos_kwargs: Keyword arguments accepted by the EstimateCoefficientsForEnsembleCalibration plugin. Should not contain a distribution argument. unique_site_id_key: If working with spot data and available, the name of the coordinate in the input cubes that contains unique site IDs, e.g. "wmo_id" if all sites have a valid wmo_id. """ self.distribution = distribution self.emos_kwargs = emos_kwargs if emos_kwargs else {} self.unique_site_id_key = unique_site_id_key
[docs] def climate_anomaly_emos( self, forecast_cubes: List[Cube], truth_cubes: List[Cube], additional_fields: Optional[CubeList] = None, landsea_mask: Optional[Cube] = None, ) -> CubeList: """Function to convert forecasts and truths to climate anomalies then calculate EMOS coefficients for the climate anomalies. Args: forecast_cubes: A list of three cubes: a cube containing historic forecasts, a cube containing climatological mean predictions of the forecasts and a cube containing climatological standard deviation predictions of the forecasts. truth_cubes: A list of three cubes: a cube containing historic truths, a cube containing climatological mean predictions of the truths and a cube containing climatological standard deviation predictions of the truths. additional_fields: Additional fields to use as supplementary predictors. landsea_mask: The optional cube containing a land-sea mask. If provided, only land points are used to calculate the coefficients. Within the land-sea mask cube land points should be specified as ones, and sea points as zeros. Returns: CubeList constructed using the coefficients provided and using metadata from the historic_forecasts cube. Each cube within the cubelist is for a separate EMOS coefficient e.g. alpha, beta, gamma, delta. """ # Convert forecasts and truths to climatological anomalies. forecast_ca = CalculateClimateAnomalies(ignore_temporal_mismatch=True).process( *forecast_cubes ) truth_ca = CalculateClimateAnomalies(ignore_temporal_mismatch=True).process( *truth_cubes ) plugin = EstimateCoefficientsForEnsembleCalibration( distribution=self.distribution, **self.emos_kwargs, ) return plugin.process( historic_forecasts=forecast_ca, truths=truth_ca, additional_fields=additional_fields, landsea_mask=landsea_mask, )
[docs] def process( self, historic_forecasts: Cube, truths: Cube, forecast_gams: List, truth_gams: List, gam_features: List[str], gam_additional_fields: Optional[CubeList] = None, emos_additional_fields: Optional[CubeList] = None, landsea_mask: Optional[Cube] = None, ) -> CubeList: """Function to convert historic forecasts and truths to climatological anomalies, then fit EMOS coefficients to these anomalies. Args: historic_forecasts: Historic forecasts from the training dataset. truths: Truths from the training dataset. forecast_gams: A list containing two fitted GAMs, the first for predicting the climatological mean of the locations in historic_forecasts and the second predicting the climatological standard deviation. Appropriate GAMs are produced by the TrainGAMsForSAMOS plugin. truth_gams: A list containing two fitted GAMs, the first for predicting the climatological mean of the locations in truths and the second predicting the climatological standard deviation. Appropriate GAMs are produced by the TrainGAMsForSAMOS plugin. gam_features: The list of features. These must be either coordinates on input_cube or share a name with a cube in gam_additional_fields. The index of each feature must match the indices used in model_specification. gam_additional_fields: Additional fields to use as supplementary predictors in the GAMs. emos_additional_fields: Additional fields to use as supplementary predictors in EMOS. landsea_mask: The optional cube containing a land-sea mask. If provided, only land points are used to calculate the EMOS coefficients. Within the land-sea mask cube land points should be specified as ones, and sea points as zeros. Returns: CubeList constructed using the coefficients provided and using metadata from the historic_forecasts cube. Each cube within the cubelist is for a separate EMOS coefficient e.g. alpha, beta, gamma, delta. """ forecast_mean, forecast_sd = get_climatological_stats( historic_forecasts, forecast_gams, gam_features, gam_additional_fields, unique_site_id_key=self.unique_site_id_key, ) truth_mean, truth_sd = get_climatological_stats( truths, truth_gams, gam_features, gam_additional_fields, unique_site_id_key=self.unique_site_id_key, ) emos_coefficients = self.climate_anomaly_emos( forecast_cubes=[historic_forecasts, forecast_mean, forecast_sd], truth_cubes=[truths, truth_mean, truth_sd], additional_fields=emos_additional_fields, landsea_mask=landsea_mask, ) return emos_coefficients
[docs] class ApplySAMOS(PostProcessingPlugin): """ Class to calibrate an input forecast using SAMOS given the following inputs: - Two GAMs which model, respectively, the climatological mean and standard deviation of the forecast. This allows the forecast to be converted to climatological anomalies. - A set of EMOS coefficients which can be applied to correct the climatological anomalies. """
[docs] def __init__( self, percentiles: Optional[Sequence] = None, unique_site_id_key: Optional[str] = None, ): """Initialize class. Args: percentiles: The set of percentiles used to create the calibrated forecast. unique_site_id_key: If working with spot data and available, the name of the coordinate in the input cubes that contains unique site IDs, e.g. "wmo_id" if all sites have a valid wmo_id. """ self.percentiles = [float32(p) for p in percentiles] if percentiles else None self.unique_site_id_key = unique_site_id_key
[docs] def transform_anomalies_to_original_units( self, location_parameter: Cube, scale_parameter: Cube, truth_mean: Cube, truth_sd: Cube, forecast: Cube, input_forecast_type: str, ) -> None: """Function to transform location and scale parameters which describe a climatological anomaly distribution to location and scale parameters which describe a distribution in the units of the original forecast. Both parameter cubes are modified in place. Predictions of mean and standard deviation from the 'truth' GAMs are used for this transformation. This ensures that the calibrated forecast follows the 'true' distribution, rather than the distribution of the original forecast, following the suggested method in: Dabernig, M., Mayr, G.J., Messner, J.W. and Zeileis, A. (2017). Spatial ensemble post-processing with standardized anomalies. Q.J.R. Meteorol. Soc, 143: 909-916. https://doi.org/10.1002/qj.2975 Args: location_parameter: Cube containing the location parameter of the climatological anomaly distribution. This is modified in place. scale_parameter: Cube containing the scale parameter of the climatological anomaly distribution. This is modified in place. truth_mean: Cube containing climatological mean predictions of the truths. truth_sd: Cube containing climatological standard deviation predictions of the truths. forecast: The original, uncalibrated forecast. input_forecast_type: The type of the original, uncalibrated forecast. One of 'realizations', 'percentiles' or 'probabilities'. """ # The data in these cubes are identical along the realization dimensions. truth_mean = next(truth_mean.slices_over("realization")) truth_sd = next(truth_sd.slices_over("realization")) # Transform location and scale parameters so that they represent a distribution # in the units of the original forecast, rather than climatological anomalies. forecast_units = ( forecast.units if input_forecast_type != "probabilities" else find_threshold_coordinate(forecast).units ) location_parameter.data = ( location_parameter.data * truth_sd.data ) + truth_mean.data location_parameter.units = forecast_units # The scale parameter returned by ApplyEMOS is the standard deviation for a # normal distribution. To get the desired standard deviation in # realization/percentile space we must multiply by the estimated forecast # standard deviation. scale_parameter.data = scale_parameter.data * truth_sd.data scale_parameter.units = forecast_units
[docs] def process( self, forecast: Cube, forecast_gams: List, truth_gams: List, gam_features: List[str], emos_coefficients: CubeList, gam_additional_fields: Optional[CubeList] = None, emos_additional_fields: Optional[CubeList] = None, prob_template: Optional[Cube] = None, realizations_count: Optional[int] = None, ignore_ecc_bounds: bool = True, tolerate_time_mismatch: bool = False, predictor: str = "mean", randomise: bool = False, random_seed: Optional[int] = None, ): """Calibrate input forecast using GAMs to convert the forecast to climatological anomalies and pre-calculated EMOS coefficients to apply to those anomalies. Args: forecast: Uncalibrated forecast as probabilities, percentiles or realizations. forecast_gams: A list containing two fitted GAMs, the first for predicting the climatological mean of the historic forecasts at each location and the second predicting the climatological standard deviation. truth_gams: A list containing two fitted GAMs, the first for predicting the climatological mean of the truths at each location and the second predicting the climatological standard deviation. gam_features: The list of features. These must be either coordinates on input_cube or share a name with a cube in additional_cubes. The index of each feature should match the indices used in model_specification. emos_coefficients: EMOS coefficients. gam_additional_fields: Additional fields to use as supplementary predictors in the GAMs. emos_additional_fields: Additional fields to use as supplementary predictors in EMOS. prob_template: A cube containing a probability forecast that will be used as a template when generating probability output when the input format of the forecast cube is not probabilities i.e. realizations or percentiles. realizations_count: Number of realizations to use when generating the intermediate calibrated forecast from probability or percentile inputs ignore_ecc_bounds: If True, allow percentiles from probabilities to exceed the ECC bounds range. If input is not probabilities, this is ignored. tolerate_time_mismatch: If True, tolerate a mismatch in validity time and forecast period for coefficients vs forecasts. Use with caution! predictor: Predictor to be used to calculate the location parameter of the calibrated distribution. Value is "mean" or "realizations". randomise: Used in generating calibrated realizations. If input forecast is probabilities or percentiles, this is ignored. random_seed: Used in generating calibrated realizations. If input forecast is probabilities or percentiles, this is ignored. Returns: Calibrated forecast in the form of the input (ie probabilities percentiles or realizations). """ input_forecast_type = get_forecast_type(forecast) forecast_as_realizations = forecast.copy() if input_forecast_type != "realizations": forecast_as_realizations = convert_to_realizations( forecast.copy(), realizations_count, ignore_ecc_bounds ) forecast_mean, forecast_sd = get_climatological_stats( forecast_as_realizations, forecast_gams, gam_features, gam_additional_fields, unique_site_id_key=self.unique_site_id_key, ) forecast_ca = CalculateClimateAnomalies(ignore_temporal_mismatch=True).process( diagnostic_cube=forecast_as_realizations, mean_cube=forecast_mean, std_cube=forecast_sd, ) # Returns parameters which describe a climate anomaly distribution. location_parameter, scale_parameter = ApplyEMOS( percentiles=self.percentiles ).process( forecast=forecast_ca, coefficients=emos_coefficients, additional_fields=emos_additional_fields, prob_template=prob_template, realizations_count=realizations_count, ignore_ecc_bounds=ignore_ecc_bounds, tolerate_time_mismatch=tolerate_time_mismatch, predictor=predictor, randomise=randomise, random_seed=random_seed, return_parameters=True, ) truth_mean, truth_sd = get_climatological_stats( forecast_as_realizations, truth_gams, gam_features, gam_additional_fields, unique_site_id_key=self.unique_site_id_key, ) # Transform location and scale parameters to be in the same units as the # original forecast. self.transform_anomalies_to_original_units( truth_mean=truth_mean, truth_sd=truth_sd, location_parameter=location_parameter, scale_parameter=scale_parameter, forecast=forecast, input_forecast_type=input_forecast_type, ) # Generate output in desired format from distribution. self.distribution = { "name": get_attribute_from_coefficients(emos_coefficients, "distribution"), "location": location_parameter, "scale": scale_parameter, "shape": get_attribute_from_coefficients( emos_coefficients, "shape_parameters", optional=True ), } template = prob_template if prob_template else forecast result = generate_forecast_from_distribution( self.distribution, template, self.percentiles, randomise, random_seed ) if input_forecast_type != "probabilities" and not prob_template: # Enforce that the result is within sensible bounds. bounds_pairing = get_bounds_of_distribution( bounds_pairing_key=result.name(), desired_units=result.units ) result.data = clip( result.data, a_min=bounds_pairing[0], a_max=bounds_pairing[1] ) # Enforce correct dtype. result.data = result.data.astype(dtype=float32) return result