#!/usr/bin/env python
# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Script to load and apply the trained Quantile Regression Random Forest (QRF)
model."""
import warnings
from typing import Optional
import iris
import numpy as np
import pandas as pd
from iris.cube import Cube, CubeList
from iris.pandas import as_data_frame
from improver import PostProcessingPlugin
from improver.calibration import add_static_feature_from_cube_to_df, add_warning_comment
from improver.calibration.quantile_regression_random_forest import (
ApplyQuantileRegressionRandomForests,
quantile_forest_package_available,
)
from improver.ensemble_copula_coupling.utilities import choose_set_of_percentiles
from improver.utilities.cube_checker import assert_spatial_coords_match
from improver.utilities.temporal import datetime_to_iris_time
try:
from quantile_forest import RandomForestQuantileRegressor
except ModuleNotFoundError:
# Define empty class to avoid type hint errors.
[docs]
class RandomForestQuantileRegressor:
pass
iris.FUTURE.pandas_ndim = True
[docs]
class PrepareAndApplyQRF(PostProcessingPlugin):
"""Prepare the input forecast for application of a trained Quantile Regression
Random Forest (QRF) model and apply the QRF model."""
[docs]
def __init__(
self,
feature_config: dict[str, list[str]],
target_cf_name: str,
unique_site_id_keys: list[str] = ["wmo_id"],
cycletime: Optional[str] = None,
forecast_period: Optional[int] = None,
):
"""Initialise the plugin.
Args:
feature_config (dict):
Feature configuration defining the features to be used for quantile
regression. The configuration is a dictionary of strings, where the
keys are the names of the input cube(s) supplied, and the values are
a list. This list can contain both computed features, such as the mean
or standard deviation (std), or static features, such as the altitude.
The computed features will be computed using the cube defined in the
dictionary key. If the key is the feature itself e.g. a distance to
water cube, then the value should state "static". This will ensure
the cube's data is used as the feature.
The config will have the structure:
"DYNAMIC_VARIABLE_CF_NAME": ["FEATURE1", "FEATURE2"] e.g:
{
"air_temperature": ["mean", "std", "altitude"],
"visibility_at_screen_level": ["mean", "std"]
"distance_to_water": ["static"],
}
target_cf_name (str):
A string containing the CF name of diagnostic to be calibrated. This
will be used to separate it from the rest of the dynamic predictors,
if present.
unique_site_id_keys (list):
The names of the coordinates that uniquely identify each site,
e.g. "wmo_id" or ["latitude", "longitude"].
cycletime (str):
The cycle time of the forecast to be calibrated in the format
YYYYMMDDTHHMMZ. If not provided, the cycle time found in the first
forecast cube will be used.
forecast_period (int):
The forecast period of the forecast to be calibrated in seconds. If not
provided, the forecast period found in the first forecast cube
will be used.
"""
self.feature_config = feature_config
self.target_cf_name = target_cf_name
self.unique_site_id_keys = unique_site_id_keys
self.cycletime = cycletime
self.forecast_period = forecast_period
self.quantile_forest_installed = quantile_forest_package_available()
[docs]
@staticmethod
def _compute_quantile_list(forecast_cube: Cube, coord: str) -> list[float]:
"""Compute the list of quantiles e.g. 0.25, 0.5, 0.75 that will be produced
from a specified coordinate on the forecast cube.
Args:
forecast_cube: Forecast to be calibrated.
coord: Coordinate name. The length of the coordinate will be used to
determine the number of quantiles to compute.
Returns:
List of quantiles (e.g. 0.25, 0.5, 0.75) computed from the forecast cube.
"""
n_percentiles = len(forecast_cube.coord(coord).points)
quantiles = (np.array(choose_set_of_percentiles(n_percentiles)) / 100).tolist()
return quantiles
[docs]
def _update_forecast_reference_time_and_period(
self, cube_inputs: CubeList
) -> CubeList:
"""Update the forecast_reference_time and forecast_period coordinates
on the input cubes to match those provided, if they are provided. The rebadging
of the forecast_period introduces a slight discrepancy between the forecasts
used for training and application of the QRF model. However, as any forecast
period rebadging is expected to be small (e.g. a few hours), this is not
expected to be a significant issue.
Args:
cube_inputs: List of cubes containing the features and the forecast to be
calibrated.
Returns:
CubeList of the input cubes with updated forecast_reference_time and
forecast_period coordinates, if they were provided.
"""
if self.cycletime:
cycletime = datetime_to_iris_time(
pd.to_datetime(self.cycletime, format="%Y%m%dT%H%MZ")
)
else:
(cycletime,) = cube_inputs[0].coord("forecast_reference_time").points
if self.forecast_period:
forecast_period = self.forecast_period
else:
(forecast_period,) = cube_inputs[0].coord("forecast_period").points
# Update the forecast_reference_time and forecast_period to match those
# provided, if they are provided.
for cube in cube_inputs:
for coord_name in ["blend_time", "forecast_reference_time"]:
if coord_name in [coord.name() for coord in cube.coords()]:
cube.coord(coord_name).points = cycletime
if "forecast_period" in [coord.name() for coord in cube.coords()]:
cube.coord("forecast_period").points = forecast_period
if cube.coord("forecast_period").bounds is not None:
diff = (
cube.coord("forecast_period").bounds[0][1]
- cube.coord("forecast_period").bounds[0][0]
)
cube.coord("forecast_period").bounds = np.array(
[[forecast_period - diff, forecast_period]]
)
return cube_inputs
[docs]
def _cube_to_dataframe(self, cube_inputs: CubeList) -> pd.DataFrame:
"""Convert cube inputs to a pandas DataFrame.
Args:
cube_inputs: List of cubes containing the features and the forecast to be
calibrated.
Returns:
DataFrame containing the data from the cubes, with auxiliary coordinates
included as columns.
"""
# Convert the first cube to a DataFrame.
df = as_data_frame(cube_inputs[0], add_aux_coords=True).reset_index()
possible_columns = [
*self.unique_site_id_keys,
"time",
"forecast_reference_time",
"forecast_period",
"percentile",
"realization",
]
# Iteratively convert remaining cubes to DataFrame and merge.
for cube in cube_inputs[1:]:
df = add_static_feature_from_cube_to_df(
df,
cube,
cube.name(),
possible_columns,
)
for column in ["forecast_reference_time", "time"]:
df[column] = df[column].apply(lambda x: x._to_real_datetime())
return df
[docs]
def process(
self,
cube_inputs: CubeList,
qrf_descriptors: Optional[
tuple[RandomForestQuantileRegressor, str, float]
] = None,
) -> Cube:
"""Load and apply the trained Quantile Regression Random Forest (QRF) model.
The model is used to calibrated the forecast provided. The calibrated forecast
is written to a cube. If no model is provided the input forecast is returned
unchanged.
Args:
cube_inputs: List of cubes containing the features and the forecast to be
calibrated.
qrf_descriptors: The trained QRF model to be applied to the forecast
and the transformation and pre-transform addition applied during
training. The descriptors expected are a tuple of:
(qrf_model, transformation, pre_transform_addition).
Returns:
iris.cube.Cube:
The calibrated forecast cube.
"""
if qrf_descriptors is None:
# If no descriptors are provided, return the input forecast with a warning.
# Descriptors expected: (qrf_model, transformation, pre_transform_addition)
qrf_descriptors = (None, None, 0)
qrf_model, transformation, pre_transform_addition = qrf_descriptors
cube_inputs, forecast_cube = self._get_inputs(cube_inputs, qrf_model=qrf_model)
if cube_inputs:
assert_spatial_coords_match(cube_inputs)
if not self.quantile_forest_installed or not qrf_model:
msg = "Unable to apply Quantile Regression Random Forest model."
if not self.quantile_forest_installed:
msg += " The 'quantile_forest' package is not installed."
elif not qrf_model:
msg += " No trained model has been provided."
msg += " Returning the input forecast without calibration."
warnings.warn(msg)
return forecast_cube
template_forecast_cube = forecast_cube.copy()
if forecast_cube.coords("realization"):
quantile_list = self._compute_quantile_list(
forecast_cube.copy(), "realization"
)
elif forecast_cube.coords("percentile"):
quantile_list = (forecast_cube.coord("percentile").points / 100.0).tolist()
cube_inputs = self._update_forecast_reference_time_and_period(cube_inputs)
df = self._cube_to_dataframe(cube_inputs)
calibrated_forecast = ApplyQuantileRegressionRandomForests(
target_name=self.target_cf_name,
feature_config=self.feature_config,
quantiles=quantile_list,
transformation=transformation,
pre_transform_addition=pre_transform_addition,
unique_site_id_keys=self.unique_site_id_keys,
)(qrf_model, df)
calibrated_forecast_cube = template_forecast_cube.copy(
data=np.broadcast_to(calibrated_forecast.T, template_forecast_cube.shape)
)
return calibrated_forecast_cube