Source code for improver.calibration.rainforest_calibration

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""RainForests calibration Plugins.

.. Further information is available in:
.. include:: extended_documentation/calibration/rainforests_calibration/
   rainforests_calibration.rst

"""

import os
import typing
import warnings
from collections import OrderedDict
from pathlib import Path
from typing import Literal

import numpy as np
from cf_units import Unit
from iris.analysis import MEAN
from iris.coords import DimCoord
from iris.cube import Cube, CubeList
from numpy import ndarray

from improver import PostProcessingPlugin
from improver.constants import MINUTES_IN_HOUR, SECONDS_IN_MINUTE
from improver.ensemble_copula_coupling.utilities import (
    get_bounds_of_distribution,
    interpolate_multiple_rows_same_x,
)
from improver.metadata.utilities import (
    create_new_diagnostic_cube,
    generate_mandatory_attributes,
)
from improver.utilities.cube_manipulation import add_coordinate_to_cube, compare_coords

Model = Literal["lightgbm_model", "treelite_model"]


[docs] def treelite_packages_available(): """Return True if treelite packages are available, False otherwise.""" try: import tl2cgen # noqa: F401 import treelite # noqa: F401 except ModuleNotFoundError: return False return True
[docs] def lightgbm_package_available(): """Return True if LightGBM package is available, False otherwise.""" try: import lightgbm # noqa: F401 except ModuleNotFoundError: return False return True
[docs] class ModelFileNotFoundError(Exception): """Used when the path to a treelite/LightGBM model object is invalid.""" pass
[docs] class ApplyRainForestsCalibration(PostProcessingPlugin): """Generic class to calibrate input forecast via RainForests. The choice of tree-model library is determined from package availability, and whether all required models files are available. Treelite is the preferred option, defaulting to lightGBM if requirements are missing. """ def __new__( cls, model_config_dict: dict[str, dict[str, dict[str, str]]], threads: int | None = None, bin_data: bool = False, ): """Initialise class object based on package and model file availability. Args: model_config_dict: Dictionary containing Rainforests model configuration variables. threads: Number of threads to use during prediction with tree-model objects. If unset, use the default number of threads used by Treelite or LightGBM, depending on which library is used. bin_data: Bin data according to splits used in models. This speeds up prediction if there are many data points which fall into the same bins for all threshold models. Limits the calculation of common feature values by only calculating them once. Defaults to False. Dictionary is of format:: { "24": { "0.000010": { "lightgbm_model": "<path_to_lightgbm_model_object>", "treelite_model": "<path_to_treelite_model_object>" }, "0.000050": { "lightgbm_model": "<path_to_lightgbm_model_object>", "treelite_model": "<path_to_treelite_model_object>" }, "0.000100": { "lightgbm_model": "<path_to_lightgbm_model_object>", "treelite_model": "<path_to_treelite_model_object>" }, } The keys specify the lead times and model threshold values, while the associated values are the path to the corresponding tree-model objects for that lead time and threshold. Treelite predictors are used if treelite_runtime is an installed dependency and an associated path has been provided for all thresholds, otherwise LightGBM Boosters are used as the default tree model type. """ treelite_available = treelite_packages_available() lightgbm_available = lightgbm_package_available() if not treelite_available and not lightgbm_available: raise ModuleNotFoundError("Could not find treelite or LightGBM modules") if treelite_available: try: cls = ApplyRainForestsCalibrationTreelite # Check that all required files have been specified. ApplyRainForestsCalibration.check_filenames( "treelite_model", model_config_dict ) return super(ApplyRainForestsCalibration, cls).__new__(cls) except ModelFileNotFoundError as e: # Treelite files not specified if not lightgbm_available: # Re-raise error if LightGBM unavailable raise (e) if lightgbm_available: if threads is not None: # Workaround to address segfault issue in LightGBM raise RuntimeError( "Manual thread specification is unsupported due to compatibility issues with LightGBM. Please remove the --threads argument, or install Treelite dependencies." ) cls = ApplyRainForestsCalibrationLightGBM # Ensure all required files have been specified. ApplyRainForestsCalibration.check_filenames( "lightgbm_model", model_config_dict ) return super(ApplyRainForestsCalibration, cls).__new__(cls)
[docs] def process(self) -> None: """Subclasses should override this function.""" raise NotImplementedError( "Process function must be called via subclass method." )
[docs] def _get_num_features(self) -> int: """Subclasses should override this function.""" raise NotImplementedError( "_get_num_features must be called via subclass method." )
[docs] def _check_num_features(self, features: CubeList) -> None: """Check that the correct number of features has been passed into the model. Args: features: Cubelist containing feature variables. """ expected_num_features = self._get_num_features() if expected_num_features != len(features): raise ValueError( "Number of expected features does not match number of feature cubes." )
[docs] def _get_feature_splits(self, model_config_dict) -> dict[np.float32, list[ndarray]]: """Get the combined feature splits (over all thresholds) for each lead time. Args: model_config_dict: dictionary of the same format expected by __init__ Returns: dict where keys are the lead times and the values are lists of lists. The outer list has length equal to the number of model features, and it contains the lists of feature splits for each feature. Each feature's list of splits is ordered. """ # These string patterns are defined by LightGBM and are used for finding the feature and # threshold information in the model .txt files. split_feature_string = "split_feature=" feature_threshold_string = "threshold=" combined_feature_splits = {} for lead_time in model_config_dict.keys(): all_splits = [set() for i in range(self._get_num_features())] for threshold_str in model_config_dict[lead_time].keys(): lgb_model_filename = Path( os.path.expandvars( model_config_dict[lead_time][threshold_str].get( "lightgbm_model" ) ) ).expanduser() with open(lgb_model_filename, "r") as f: for line in f: if line.startswith(split_feature_string): line = line[len(split_feature_string) : -1] if len(line) == 0: # This deals with the situation where the tree has no splits continue features = [int(x) for x in line.split(" ")] elif line.startswith(feature_threshold_string): line = line[len(feature_threshold_string) : -1] if len(line) == 0: continue splits = [float(x) for x in line.split(" ")] for feature_ind, threshold in zip(features, splits): all_splits[feature_ind].add(threshold) combined_feature_splits[np.float32(lead_time)] = [ np.sort(list(x)) for x in all_splits ] return combined_feature_splits
[docs] @staticmethod def check_filenames( key_name: Model, model_config_dict: dict[str, dict[str, dict[str, str]]] ): """Check whether files specified by model_config_dict exist, and raise an error if any are missing. Args: key_name: One of "treelite_model" or "lightgbm_model". model_config_dict: Dictionary containing Rainforests model configuration variables. Raises: ValueError: If an invalid value for key_name is used. ModelFileNotFoundError: If the path to the corresponding model is missing for one or more model thresholds in model_config_dict. """ if key_name not in typing.get_args(Model): raise ValueError( f"key_name must be one of the following: {typing.get_args(Model)}" ) model_filenames = [] for lead_time in model_config_dict.keys(): for threshold in model_config_dict[lead_time].keys(): model_filenames.append( model_config_dict[lead_time][threshold].get(key_name) ) if None in model_filenames: if key_name == "lightgbm_model": raise ModelFileNotFoundError( "Path to LightGBM model missing for one or more model thresholds " "in model_config_dict." ) elif key_name == "treelite_model": raise ModelFileNotFoundError( "Path to treelite model missing for one or more model thresholds " "in model_config_dict." )
[docs] def _parse_model_config( self, model_config_dict: dict[str, dict[str, dict[str, str]]] ) -> OrderedDict[np.float32, OrderedDict[np.float32, dict[str, str]]]: """Parse the model config dictionary, set self.lead_times and self.model_thresholds, and return a sorted version of the config dictionary. Args: model_config_dict: Nested dictionary with string keys. Keys of outer level are lead times, and keys of inner level are thresholds. Returns: Dictionary with the same nested structure as model_config_dict, but the lead time and threshold keys now have type np.float. """ sorted_model_config_dict = OrderedDict() for key, lead_time_dict in model_config_dict.items(): sorted_model_config_dict[np.float32(key)] = OrderedDict( sorted({np.float32(k): v for k, v in lead_time_dict.items()}.items()) ) self.lead_times = np.sort(np.array([*sorted_model_config_dict.keys()])) if len(self.lead_times) > 0: self.model_thresholds = np.sort( np.array([*sorted_model_config_dict[self.lead_times[0]].keys()]) ) else: warnings.warn( "Model config does not match the expected specification; calibration will not work" ) self.model_thresholds = np.array([]) return sorted_model_config_dict
[docs] class ApplyRainForestsCalibrationLightGBM(ApplyRainForestsCalibration): """Class to calibrate input forecast given via RainForests approach using light-GBM tree models.""" def __new__( cls, model_config_dict: dict[str, dict[str, dict[str, str]]], threads: int | None = None, bin_data: bool = False, ): """Check all model files are available before initialising.""" ApplyRainForestsCalibration.check_filenames("lightgbm_model", model_config_dict) return super(ApplyRainForestsCalibration, cls).__new__(cls)
[docs] def __init__( self, model_config_dict: dict[str, dict[str, dict[str, str]]], threads: int | None = None, bin_data: bool = False, ): """Initialise the tree model variables used in the application of RainForests Calibration. LightGBM Boosters are used for tree model predictors. Args: model_config_dict: Dictionary containing Rainforests model configuration variables. threads: Number of threads to use during prediction with tree-model objects. Values other than None will currently result in an error. If unspecified, calibration will use the default value defined by LightGBM. bin_data: Bin data according to splits used in models. This speeds up prediction if there are many data points which fall into the same bins for all threshold models. Limits the calculation of common feature values by only calculating them once. Dictionary is of format:: { "24": { "0.000010": { "lightgbm_model": "<path_to_lightgbm_model_object>", "treelite_model": "<path_to_treelite_model_object>", }, "0.000050": { "lightgbm_model": "<path_to_lightgbm_model_object>", "treelite_model": "<path_to_treelite_model_object>", }, "0.000100": { "lightgbm_model": "<path_to_lightgbm_model_object>", "treelite_model": "<path_to_treelite_model_object>", }, } } The keys specify the lead times and model threshold values, while the associated values are the path to the corresponding tree-model objects for that lead time and threshold. """ from lightgbm import Booster sorted_model_config_dict = self._parse_model_config(model_config_dict) self.model_input_converter = np.array self.tree_models = {} for lead_time in self.lead_times: # check all lead times have the same thresholds curr_thresholds = np.array([*sorted_model_config_dict[lead_time].keys()]) if np.any(curr_thresholds != self.model_thresholds): raise ValueError( "The same thresholds must be used for all lead times. " f"Lead time {self.lead_times[0]} has thresholds: {self.model_thresholds}," f"lead time {lead_time} has thresholds: {curr_thresholds}" ) for threshold in self.model_thresholds: model_filename = Path( os.path.expandvars( str( sorted_model_config_dict[lead_time][threshold].get( "lightgbm_model" ) ) ) ).expanduser() booster = Booster(model_file=str(model_filename)) if threads is not None: # Workaround to avoid segfault issue in LightGBM raise RuntimeError( "Manual thread specification is unsupported due to compatibility issues with LightGBM." ) self.tree_models[lead_time, threshold] = booster self.bin_data = bin_data if self.bin_data: self.combined_feature_splits = self._get_feature_splits(model_config_dict)
[docs] def _get_num_features(self) -> int: return next(iter(self.tree_models.values())).num_feature()
[docs] def _align_feature_variables( self, feature_cubes: CubeList, forecast_cube: Cube ) -> tuple[CubeList, Cube]: """Ensure that feature cubes have consistent dimension coordinates. If realization dimension present in any cube, all cubes lacking this dimension will have realization dimension added and broadcast along this new dimension. This situation occurs when derived fields (such as accumulated solar radiation) are used as predictors. As these fields do not contain a realization dimension, they must be broadcast to match the NWP fields that do contain realization, so that all features have consistent shape. In the case of deterministic models (those without a realization dimension), a realization dimension is added to allow consistent behaviour between ensemble and deterministic models. Args: feature_cubes: Cubelist containing feature variables to align. forecast_cube: Cube containing the forecast variable to align. Returns: - feature_cubes with realization coordinate added to each cube if absent - forecast_cube with realization coordinate added if absent Raises: ValueError: if feature/forecast variables have inconsistent dimension coordinates (excluding realization dimension), or if feature/forecast variables have different length realization coordinate over cubes containing a realization coordinate. """ combined_cubes = CubeList(list([*feature_cubes, forecast_cube])) # Compare feature cube coordinates, raise error if dim-coords don't match compare_feature_coords = compare_coords( combined_cubes, ignored_coords=["realization"] ) for misaligned_coords in compare_feature_coords: for coord_info in misaligned_coords.values(): if coord_info["data_dims"] is not None: raise ValueError( "Mismatch between non-realization dimension coords." ) # Compare realization coordinates across cubes where present; # raise error if realization coordinates don't match, otherwise set # common_realization_coord to broadcast over. realization_coords = { variable.name(): variable.coord("realization") for variable in combined_cubes if variable.coords("realization") } if not realization_coords: # Case I: realization_coords is empty. Add single realization dim to all cubes. common_realization_coord = DimCoord( [0], standard_name="realization", units=1 ) else: # Case II: realization_coords is not empty. # Note: In future, another option here could be to filter to common realization # values using filter_realizations() in utilities.cube_manipulation. variables_with_realization = list(realization_coords.keys()) sample_realization = realization_coords[variables_with_realization[0]] for feature in variables_with_realization[1:]: if realization_coords[feature] != sample_realization: raise ValueError("Mismatch between realization dimension coords.") common_realization_coord = sample_realization # Add realization coord to cubes where absent by broadcasting along this dimension aligned_cubes = CubeList() for cube in combined_cubes: if not cube.coords("realization"): expanded_cube = add_coordinate_to_cube( cube, new_coord=common_realization_coord ) aligned_cubes.append(expanded_cube) else: aligned_cubes.append(cube) # Make data contiguous (required for numba interpolation) for cube in aligned_cubes: if not cube.data.flags["C_CONTIGUOUS"]: cube.data = np.ascontiguousarray(cube.data, dtype=cube.data.dtype) return aligned_cubes[:-1], aligned_cubes[-1]
[docs] def _prepare_threshold_probability_cube(self, forecast_cube, thresholds): """Initialise a cube with the same dimensions as the input forecast_cube, with an additional threshold dimension added as the leading dimension. Args: forecast_cube: Cube containing the forecast to be calibrated. thresholds: Points of the the threshold dimension. Returns: An empty probability cube. """ # Create a template for CDF, with threshold the leading dimension. forecast_variable = forecast_cube.name() probability_cube = create_new_diagnostic_cube( name=f"probability_of_{forecast_variable}_above_threshold", units="1", template_cube=forecast_cube, mandatory_attributes=generate_mandatory_attributes([forecast_cube]), optional_attributes=forecast_cube.attributes, ) threshold_coord = DimCoord( thresholds, standard_name=forecast_variable, var_name="threshold", units=forecast_cube.units, attributes={"spp__relative_to_threshold": "greater_than_or_equal_to"}, ) probability_cube = add_coordinate_to_cube( probability_cube, new_coord=threshold_coord ) return probability_cube
[docs] def _prepare_features_array(self, feature_cubes: CubeList) -> ndarray: """Convert gridded feature cubes into a numpy array, with feature variables sorted alphabetically. Note: It is expected that feature_cubes has been aligned using _align_feature_variables prior to calling this function. Args: feature_cubes: Cubelist containing the independent feature variables for prediction. Returns: Array containing flattened feature variables, Raises: ValueError: If flattened cubes have differing length. """ # Get the names of features and sort alphabetically feature_variables = [cube.name() for cube in feature_cubes] feature_variables.sort() # Unpack the cube-data into an array to feed into the tree-models. features_list = [] for feature in feature_variables: cube = feature_cubes.extract_cube(feature) features_list.append(cube.data.ravel()[:, np.newaxis]) features_arr = np.concatenate(features_list, axis=1) return features_arr
[docs] def _make_decreasing(self, probability_data: ndarray) -> ndarray: """Enforce monotonicity on the CDF data, where threshold dimension is assumed to be the leading dimension. This is achieved by identifying the minimum value progressively along the leading dimension by comparing to all preceding probability values along this dimension. The same is done for maximum values, comparing to all succeeding values along the leading dimension. Averaging these resulting arrays results in an array decreasing monotonically in the threshold dimension. Args: probability_data: The probability data as exceedence probabilities. Returns: The probability data, enforced to be monotonically decreasing along the leading dimension. """ lower = np.minimum.accumulate(probability_data, axis=0) upper = np.flip( np.maximum.accumulate(np.flip(probability_data, axis=0), axis=0), axis=0 ) return 0.5 * (upper + lower)
[docs] def _evaluate_probabilities( self, input_data: ndarray, lead_time_hours: int, output_data: ndarray ) -> None: """Evaluate probability that forecast exceeds thresholds. Args: input_data: 2-d array of data for the feature variables of the model lead_time_hours: lead time in hours output_data: array to populate with output; will be modified in place """ if np.float32(lead_time_hours) in self.lead_times: model_lead_time = np.float32(lead_time_hours) else: # find closest model lead time best_ind = np.argmin(np.abs(self.lead_times - lead_time_hours)) model_lead_time = self.lead_times[best_ind] if self.bin_data: # bin by feature splits feature_splits = self.combined_feature_splits[model_lead_time] binned_data = np.empty(input_data.shape, dtype=np.int32) n_features = len(feature_splits) for i in range(n_features): binned_data[:, i] = np.digitize( input_data[:, i], bins=feature_splits[i] ) # sort so rows in the same bins are grouped sort_ind = np.lexsort(tuple([binned_data[:, i] for i in range(n_features)])) sorted_data = binned_data[sort_ind] reverse_sort_ind = np.argsort(sort_ind) # we only need to predict for rows which are different from the previous row diff = np.any(np.diff(sorted_data, axis=0) != 0, axis=1) predict_rows = np.concatenate([[0], np.nonzero(diff)[0] + 1]) data_for_prediction = input_data[sort_ind][predict_rows] full_prediction = np.empty((input_data.shape[0],)) # forward fill code inspired by this: https://stackoverflow.com/a/41191127 fill_inds = np.zeros(len(full_prediction), dtype=np.int32) fill_inds[predict_rows] = predict_rows fill_inds = np.maximum.accumulate(fill_inds) dataset_for_prediction = self.model_input_converter(data_for_prediction) for threshold_index, threshold in enumerate(self.model_thresholds): model = self.tree_models[model_lead_time, threshold] prediction = model.predict(dataset_for_prediction) prediction = np.clip(prediction, 0, 1) if type(self) is ApplyRainForestsCalibrationTreelite: # treelite 4.x.x changed output dimensions, so must flatten here # See https://treelite.readthedocs.io/en/latest/treelite-gtil-api.html#treelite.gtil.predict prediction = prediction.flatten() full_prediction[predict_rows] = prediction full_prediction = full_prediction[fill_inds] # restore original order full_prediction = full_prediction[reverse_sort_ind] output_data[threshold_index, :] = np.reshape( full_prediction, output_data.shape[1:] ) else: dataset_for_prediction = self.model_input_converter(input_data) for threshold_index, threshold in enumerate(self.model_thresholds): model = self.tree_models[model_lead_time, threshold] prediction = model.predict(dataset_for_prediction) prediction = np.clip(prediction, 0, 1) if type(self) is ApplyRainForestsCalibrationTreelite: # treelite 4.x.x changed output dimensions, so must flatten here # See https://treelite.readthedocs.io/en/latest/treelite-gtil-api.html#treelite.gtil.predict prediction = prediction.flatten() output_data[threshold_index, :] = np.reshape( prediction, output_data.shape[1:] )
[docs] def _calculate_threshold_probabilities( self, forecast_cube: Cube, feature_cubes: CubeList ) -> Cube: """Evaluate the threshold exceedence probabilities for each ensemble member in forecast_cube using the tree_models, with the associated feature_cubes taken as inputs to the tree_model predictors. Args: forecast_cube: Cube containing the variable to be calibrated. feature_cubes: Cubelist containing the independent feature variables for prediction. Returns: A cube containing threshold exceedence probabilities. Raises: ValueError: If an unsupported model object is passed. Expects LightGBM Booster, or tl2cgen Predictor (if the latter's dependencies are available). """ threshold_probability_cube = self._prepare_threshold_probability_cube( forecast_cube, self.model_thresholds ) input_dataset = self._prepare_features_array(feature_cubes) lead_time_hours = forecast_cube.coord("forecast_period").points[0] / ( SECONDS_IN_MINUTE * MINUTES_IN_HOUR ) self._evaluate_probabilities( input_dataset, lead_time_hours, threshold_probability_cube.data ) # Enforcing monotonicity threshold_probability_cube.data = self._make_decreasing( threshold_probability_cube.data ) return threshold_probability_cube
[docs] def _get_ensemble_distributions( self, per_realization_CDF: Cube, forecast: Cube, output_thresholds: ndarray ) -> Cube: """ Interpolate probabilities calculated at model thresholds to extract probabilities at output thresholds for all realizations. Args: per_realization_CDF: Cube containing the CDF probabilities for each ensemble member at model thresholds, with threshold as the first dimension. forecast: Cube containing NWP ensemble forecast. output_thresholds: Sorted array of thresholds at which to calculate the output probabilities. Returns: Cube containing probabilities at output thresholds for all realizations. Dimensions are same as forecast cube with additional threshold dimension first. """ input_probabilities = per_realization_CDF.data output_thresholds = np.array(output_thresholds, dtype=np.float32) bounds_data = get_bounds_of_distribution(forecast.name(), forecast.units) lower_bound = bounds_data[0].astype(np.float32) if (len(self.model_thresholds) == len(output_thresholds)) and np.allclose( self.model_thresholds, output_thresholds ): output_probabilities = np.copy(input_probabilities.data) else: # add lower bound with probability 1 input_probabilities = np.concatenate( [ np.ones((1,) + input_probabilities.shape[1:], dtype=np.float32), input_probabilities, ], axis=0, ) input_thresholds = np.concatenate([[lower_bound], self.model_thresholds]) # reshape to 2 dimensions input_probabilities_2d = np.reshape( input_probabilities, (input_probabilities.shape[0], -1) ) output_probabilities_2d = interpolate_multiple_rows_same_x( output_thresholds, input_thresholds, input_probabilities_2d.transpose() ) output_probabilities = np.reshape( output_probabilities_2d.transpose(), (len(output_thresholds),) + input_probabilities.shape[1:], ) # force interpolated probabilities to be monotone (sometimes they # are not due to small floating-point errors) output_probabilities = self._make_decreasing(output_probabilities) # set probability for lower bound to 1 if np.isclose(output_thresholds[0], lower_bound): output_probabilities[0, :] = 1 # Make output cube probability_cube = self._prepare_threshold_probability_cube( forecast, output_thresholds ) probability_cube.data = output_probabilities.astype(np.float32) return probability_cube
[docs] def process( self, forecast_cube: Cube, feature_cubes: CubeList, output_thresholds: list, threshold_units: str | None = None, ) -> Cube: """Apply rainforests calibration to forecast cube. Ensemble forecasts must be in realization representation. Deterministic forecasts can be processed to produce a pseudo-ensemble; a realization dimension will be added to deterministic forecast cubes if one is not already present. The calibration is done in a situation dependent fashion using a series of decision-tree models to construct representative probability distributions for each input ensemble member which are then blended to give the calibrated distribution for the full ensemble. These distributions are formed in a two-step process: 1. Evaluate CDF defined over the specified model thresholds for each ensemble member. Each threshold exceedance probability is evaluated using the corresponding decision-tree model. 2. Interpolate each ensemble member distribution to the output thresholds. Args: forecast_cube: Cube containing the forecast to be calibrated; must be as realizations. feature_cubes: Cubelist containing the feature variables (physical parameters) used as inputs to the tree-models for the generation of the associated probability distributions. Feature cubes are expected to have the same dimensions as forecast_cube, with the exception of the realization dimension. Where the feature_cube contains a realization dimension this is expected to be consistent, otherwise the cube will be broadcast along the realization dimension. output_thresholds: Set of output thresholds. threshold_units: Units in which output_thresholds are specified. If None, assumed to be the same as forecast_cube. Returns: The calibrated forecast cube. Raises: RuntimeError: If the number of tree models is inconsistent with the number of model thresholds. """ # Check that the correct number of feature variables has been supplied. self._check_num_features(feature_cubes) # Align forecast and feature datasets aligned_features, aligned_forecast = self._align_feature_variables( feature_cubes, forecast_cube ) # Evaluate the CDF using tree models. per_realization_CDF = self._calculate_threshold_probabilities( aligned_forecast, aligned_features ) # convert units of output thresholds if threshold_units: original_threshold_unit = Unit(threshold_units) forecast_unit = forecast_cube.units output_thresholds_in_forecast_units = np.array( [ original_threshold_unit.convert(x, forecast_unit) for x in output_thresholds ] ) else: output_thresholds_in_forecast_units = np.array(output_thresholds) # Calculate probabilities at output thresholds interpolated_per_realization_CDF = self._get_ensemble_distributions( per_realization_CDF, aligned_forecast, output_thresholds_in_forecast_units ) # Average over realizations calibrated_probability_cube = interpolated_per_realization_CDF.collapsed( "realization", MEAN ) calibrated_probability_cube.remove_coord("realization") return calibrated_probability_cube
[docs] class ApplyRainForestsCalibrationTreelite(ApplyRainForestsCalibrationLightGBM): """Class to calibrate input forecast given via RainForests approach using treelite compiled tree models.""" def __new__( cls, model_config_dict: dict[str, dict[str, dict[str, str]]], threads: int | None = None, bin_data: bool = False, ): """Check required dependencies and all model files are available before initialising.""" # Treelite packages must be available to initialise this class import tl2cgen # noqa: F401 import treelite # noqa: F401 # Check that all required files have been specified. ApplyRainForestsCalibration.check_filenames("treelite_model", model_config_dict) return super(ApplyRainForestsCalibration, cls).__new__(cls)
[docs] def __init__( self, model_config_dict: dict[str, dict[str, dict[str, str]]], threads: int | None = None, bin_data: bool = False, ): """Initialise the tree model variables used in the application of RainForests Calibration. Treelite Predictors are used for tree model predictors. Args: model_config_dict: Dictionary containing Rainforests model configuration variables. threads: Number of threads to use during prediction with tree-model objects. If not specified, use the default value used in the TL2cgen library. bin_data: Bin data according to splits used in models. This speeds up prediction if there are many data points which fall into the same bins for all threshold models. Limits the calculation of common feature values by only calculating them once. Dictionary is of format:: { "24": { "0.000010": { "lightgbm_model": "<path_to_lightgbm_model_object>", "treelite_model": "<path_to_treelite_model_object>", }, "0.000050": { "lightgbm_model": "<path_to_lightgbm_model_object>", "treelite_model": "<path_to_treelite_model_object>", }, "0.000100": { "lightgbm_model": "<path_to_lightgbm_model_object>", "treelite_model": "<path_to_treelite_model_object>", }, } } The keys specify the model threshold value, while the associated values are the path to the corresponding tree-model objects for that threshold. """ from tl2cgen import DMatrix, Predictor sorted_model_config_dict = self._parse_model_config(model_config_dict) self.model_input_converter = DMatrix self.tree_models = {} for lead_time in self.lead_times: # check all lead times have the same thresholds curr_thresholds = np.array([*sorted_model_config_dict[lead_time].keys()]) if np.any(curr_thresholds != self.model_thresholds): raise ValueError("The same thresholds must be used for all lead times.") for threshold in self.model_thresholds: model_filename = Path( os.path.expandvars( str( sorted_model_config_dict[lead_time][threshold].get( "treelite_model" ) ) ) ).expanduser() self.tree_models[lead_time, threshold] = Predictor( # OK for Predictor nthreads to be None here libpath=str(model_filename), verbose=False, nthread=threads, ) self.bin_data = bin_data if self.bin_data: self.combined_feature_splits = self._get_feature_splits(model_config_dict)
[docs] def _get_num_features(self) -> int: return next(iter(self.tree_models.values())).num_feature