Source code for improver.utilities.temporal_interpolation

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Class for Temporal Interpolation calculations."""

from datetime import datetime, timedelta
from typing import List, Optional, Tuple

import iris
import numpy as np
from iris.cube import Cube, CubeList
from iris.exceptions import CoordinateNotFoundError
from numpy import ndarray

from improver import BasePlugin
from improver.metadata.constants import FLOAT_DTYPE
from improver.metadata.constants.time_types import TIME_COORDS
from improver.metadata.forecast_times import unify_cycletime
from improver.metadata.utilities import enforce_time_point_standard
from improver.utilities.complex_conversion import complex_to_deg, deg_to_complex
from improver.utilities.cube_manipulation import MergeCubes
from improver.utilities.round import round_close
from improver.utilities.solar import DayNightMask, calc_solar_elevation
from improver.utilities.spatial import lat_lon_determine, transform_grid_to_lat_lon
from improver.utilities.temporal import iris_time_to_datetime


[docs] class TemporalInterpolation(BasePlugin): """ Interpolate data to intermediate times between the validity times of two cubes. This can be used to fill in missing data (e.g. for radar fields) or to ensure data is available at the required intervals when model data is not available at these times. The plugin will return the interpolated times and the later of the two input times. This allows us to modify the input diagnostics if they represent accumulations. The IMPROVER convention is that period diagnostics have their time coordinate point at the end of the period. The later of the two inputs therefore covers the period that has been broken down into shorter periods by the interpolation and, if working with accumulations, must itself be modified. The result of this approach is that in a long run of lead-times, e.g. T+0 to T+120 all the lead-times will be available except T+0. If working with period maximums and minimums we cannot return values in the new periods that do not adhere to the inputs. For example, we might have a 3-hour maximum of 5 ms-1 between 03-06Z. The period before it might have a maximum of 11 ms-1. Upon splitting the 3-hour period into 1-hour periods the gradient might give us the following results: Inputs: 00-03Z: 11 ms-1, 03-06Z: 5 ms-1 Outputs: 03-04Z: 9 ms-1, 04-05Z: 7 ms-1, 05-06Z: 5ms-1 However these outputs are not in agreement with the original 3-hour period maximum of 5 ms-1 over the period 03-06Z. We enforce the maximum from the original period which results in: Inputs: 00-03Z: 10 ms-1, 03-06Z: 5 ms-1 Outputs: 03-04Z: 5 ms-1, 04-05Z: 5 ms-1, 05-06Z: 5ms-1 If instead the preceding period maximum was 2 ms-1 we would use the trend to produce lower maximums in the interpolated 1-hour periods, becoming: Inputs: 00-03Z: 2 ms-1, 03-06Z: 5 ms-1 Outputs: 03-04Z: 3 ms-1, 04-05Z: 4 ms-1, 05-06Z: 5ms-1 This interpretation of the gradient information is retained in the output as it is consistent with the original period maximum of 5 ms-1 between 03-06Z. As such we can impart increasing trends into maximums over periods but not decreasing trends. The counter argument can be made when interpolating minimums in periods, allowing us only to introduce decreasing trends for these. We could use the cell methods to determine whether we are working with accumulations, maximums, or minimums. This should be denoted as a cell method associated with the time coordinate, e.g. for an accumulation it would be `time: sum`, whilst a maximum would have `time: max`. However we cannot guarantee these cell methods are present. As such the interpolation of periods here relies on the user supplying a suitable keyword argument that denotes the type of period being processed. """
[docs] def __init__( self, interval_in_minutes: Optional[int] = None, times: Optional[List[datetime]] = None, interpolation_method: str = "linear", accumulation: bool = False, max: bool = False, min: bool = False, ) -> None: """ Initialise class. Args: interval_in_minutes: Specifies the interval in minutes at which to interpolate between the two input cubes. A number of minutes which does not divide up the interval equally will raise an exception. | e.g. cube_t0 valid at 03Z, cube_t1 valid at 06Z, | interval_in_minutes = 60 --> interpolate to 04Z and 05Z. times: A list of datetime objects specifying the times to which to interpolate. interpolation_method: Method of interpolation to use. Default is linear. Only methods in known_interpolation_methods can be used. accumulation: Set True if the diagnostic being temporally interpolated is a period accumulation. The output will be renormalised to ensure that the total across the period constructed from the shorter intervals matches the total across the period from the coarser intervals. max: Set True if the diagnostic being temporally interpolated is a period maximum. Trends between adjacent input periods will be used to provide variation across the interpolated periods where these are consistent with the inputs. min: Set True if the diagnostic being temporally interpolated is a period minimum. Trends between adjacent input periods will be used to provide variation across the interpolated periods where these are consistent with the inputs. Raises: ValueError: If neither interval_in_minutes nor times are set. ValueError: If both interval_in_minutes and times are both set. ValueError: If interpolation method not in known list. ValueError: If multiple period diagnostic kwargs are set True. ValueError: A period diagnostic is being interpolated with a method not found in the period_interpolation_methods list. """ if interval_in_minutes is None and times is None: raise ValueError( "TemporalInterpolation: One of " "'interval_in_minutes' or 'times' must be set. " "Currently both are none." ) if interval_in_minutes is not None and times is not None: raise ValueError( "TemporalInterpolation: Only one of " "'interval_in_minutes' or 'times' must be set. " "Currently both are set." ) self.interval_in_minutes = interval_in_minutes self.times = times known_interpolation_methods = ["linear", "solar", "daynight"] if interpolation_method not in known_interpolation_methods: raise ValueError( "TemporalInterpolation: Unknown interpolation method {}. ".format( interpolation_method ) ) self.interpolation_method = interpolation_method self.period_inputs = False if np.sum([accumulation, max, min]) > 1: raise ValueError( "Only one type of period diagnostics may be specified: " f"accumulation = {accumulation}, max = {max}, " f"min = {min}" ) self.accumulation = accumulation self.max = max self.min = min if any([accumulation, max, min]): self.period_inputs = True period_interpolation_methods = ["linear"] if self.interpolation_method not in period_interpolation_methods: raise ValueError( "Period diagnostics can only be temporally interpolated " f"using these methods: {period_interpolation_methods}.\n" f"Currently selected method is: {self.interpolation_method}." )
[docs] def construct_time_list( self, initial_time: datetime, final_time: datetime ) -> List[Tuple[str, List[datetime]]]: """ A function to construct a list of datetime objects formatted appropriately for use by iris' interpolation method. Args: initial_time: The start of the period over which a time list is to be constructed. final_time: The end of the period over which a time list is to be constructed. Returns: A list containing a tuple that specifies the coordinate and a list of points along that coordinate to which to interpolate, as required by the iris interpolation method, e.g.:: [('time', [<datetime object 0>, <datetime object 1>])] Raises: ValueError: If list of times provided falls outside the range specified by the initial and final times. ValueError: If the interval_in_minutes does not divide the time range up equally. """ time_list = [] if self.times is not None: self.times = sorted(self.times) if self.times[0] < initial_time or self.times[-1] > final_time: raise ValueError( "List of times falls outside the range given by " "initial_time and final_time. " ) time_list = self.times elif self.interval_in_minutes is not None: if (final_time - initial_time).seconds % ( 60 * self.interval_in_minutes ) != 0: msg = ( "interval_in_minutes of {} does not" " divide into the interval of" " {} mins equally.".format( self.interval_in_minutes, int((final_time - initial_time).seconds / 60), ) ) raise ValueError(msg) time_entry = initial_time while True: time_entry = time_entry + timedelta(minutes=self.interval_in_minutes) if time_entry >= final_time: break time_list.append(time_entry) time_list.append(final_time) time_list = sorted(set(time_list)) return [("time", time_list)]
[docs] @staticmethod def enforce_time_coords_dtype(cube: Cube) -> Cube: """ Enforce the data type of the time, forecast_reference_time and forecast_period within the cube, so that time coordinates do not become mis-represented. The units of the time and forecast_reference_time are enforced to be "seconds since 1970-01-01 00:00:00" with a datatype of int64. The units of forecast_period are enforced to be seconds with a datatype of int32. This functions modifies the cube in-place. Args: cube: The cube that will have the datatype and units for the time, forecast_reference_time and forecast_period coordinates enforced. Returns: Cube where the datatype and units for the time, forecast_reference_time and forecast_period coordinates have been enforced. """ for coord_name in ["time", "forecast_reference_time", "forecast_period"]: coord_spec = TIME_COORDS[coord_name] if cube.coords(coord_name): coord = cube.coord(coord_name) coord.convert_units(coord_spec.units) coord.points = round_close(coord.points, dtype=coord_spec.dtype) if hasattr(coord, "bounds") and coord.bounds is not None: coord.bounds = round_close(coord.bounds, dtype=coord_spec.dtype) return cube
[docs] @staticmethod def calc_sin_phi(dtval: datetime, lats: ndarray, lons: ndarray) -> ndarray: """ Calculate sin of solar elevation Args: dtval: Date and time. lats: Array 2d of latitudes for each point lons: Array 2d of longitudes for each point Returns: Array of sine of solar elevation at each point """ day_of_year = (dtval - datetime(dtval.year, 1, 1)).days utc_hour = (dtval.hour * 60.0 + dtval.minute) / 60.0 sin_phi = calc_solar_elevation( lats, lons, day_of_year, utc_hour, return_sine=True ) return sin_phi
[docs] @staticmethod def calc_lats_lons(cube: Cube) -> Tuple[ndarray, ndarray]: """ Calculate the lats and lons of each point from a non-latlon cube, or output a 2d array of lats and lons, if the input cube has latitude and longitude coordinates. Args: cube: cube containing x and y axis Returns: - 2d Array of latitudes for each point. - 2d Array of longitudes for each point. """ trg_crs = lat_lon_determine(cube) if trg_crs is not None: xycube = next(cube.slices([cube.coord(axis="y"), cube.coord(axis="x")])) lats, lons = transform_grid_to_lat_lon(xycube) else: lats_row = cube.coord("latitude").points lons_col = cube.coord("longitude").points lats = np.repeat(lats_row[:, np.newaxis], len(lons_col), axis=1) lons = np.repeat(lons_col[np.newaxis, :], len(lats_row), axis=0) return lats, lons
[docs] def solar_interpolate(self, diag_cube: Cube, interpolated_cube: Cube) -> CubeList: """ Temporal Interpolation code using solar elevation for parameters (e.g. solar radiation parameters like Downward Shortwave (SW) radiation or UV index) which are zero if the sun is below the horizon and scaled by the sine of the solar elevation angle if the sun is above the horizon. Args: diag_cube: cube containing diagnostic data valid at the beginning of the period and at the end of the period. interpolated_cube: cube containing Linear interpolation of diag_cube at interpolation times in time_list. Returns: A list of cubes interpolated to the desired times. """ interpolated_cubes = CubeList() (lats, lons) = self.calc_lats_lons(diag_cube) prev_data = diag_cube[0].data next_data = diag_cube[1].data dtvals = iris_time_to_datetime(diag_cube.coord("time")) # Calculate sine of solar elevation for cube valid at the # beginning of the period. dtval_prev = dtvals[0] sin_phi_prev = self.calc_sin_phi(dtval_prev, lats, lons) # Calculate sine of solar elevation for cube valid at the # end of the period. dtval_next = dtvals[1] sin_phi_next = self.calc_sin_phi(dtval_next, lats, lons) # Length of time between beginning and end in seconds diff_step = (dtval_next - dtval_prev).seconds for single_time in interpolated_cube.slices_over("time"): # Calculate sine of solar elevation for cube at this # interpolated time. dtval_interp = iris_time_to_datetime(single_time.coord("time"))[0] sin_phi_interp = self.calc_sin_phi(dtval_interp, lats, lons) # Length of time between beginning and interpolated time in seconds diff_interp = (dtval_interp - dtval_prev).seconds # Set all values to 0.0, to be replaced # with values calculated through this solar method. single_time.data[:] = 0.0 sun_up = np.where(sin_phi_interp > 0.0) # Solar value is calculated only for points where the sun is up # and is a weighted combination of the data using the sine of # solar elevation and the data in the diag_cube valid # at the beginning and end. # If the diag_cube containing data valid at the # beginning of the period and at the end of the period # has more than x and y coordinates # the calculation needs to adapted to accommodate this. if len(single_time.shape) > 2: prevv = prev_data[..., sun_up[0], sun_up[1]] / sin_phi_prev[sun_up] nextv = next_data[..., sun_up[0], sun_up[1]] / sin_phi_next[sun_up] single_time.data[..., sun_up[0], sun_up[1]] = sin_phi_interp[sun_up] * ( prevv + (nextv - prevv) * (diff_interp / diff_step) ) else: prevv = prev_data[sun_up] / sin_phi_prev[sun_up] nextv = next_data[sun_up] / sin_phi_next[sun_up] single_time.data[sun_up] = sin_phi_interp[sun_up] * ( prevv + (nextv - prevv) * (diff_interp / diff_step) ) # cube with new data added to interpolated_cubes cube List. interpolated_cubes.append(single_time) return interpolated_cubes
[docs] @staticmethod def daynight_interpolate(interpolated_cube: Cube) -> CubeList: """ Set linearly interpolated data to zero for parameters (e.g. solar radiation parameters) which are zero if the sun is below the horizon. Args: interpolated_cube: cube containing Linear interpolation of cube at interpolation times in time_list. Returns: A list of cubes interpolated to the desired times. """ daynightplugin = DayNightMask() daynight_mask = daynightplugin(interpolated_cube) index = daynight_mask.data == daynightplugin.night # Reshape the time, y, x mask to match the input which may include addtional # dimensions, such as realization. dropped_crds = [ crd for crd in interpolated_cube.coords(dim_coords=True) if crd not in daynight_mask.coords(dim_coords=True) ] if dropped_crds: cslices = interpolated_cube.slices_over(dropped_crds) masked_data = CubeList() for cslice in cslices: cslice.data[index] = 0.0 masked_data.append(cslice) interpolated_cube = masked_data.merge_cube() else: interpolated_cube.data[index] = 0.0 return CubeList(list(interpolated_cube.slices_over("time")))
[docs] @staticmethod def add_bounds(cube_t0: Cube, interpolated_cube: Cube): """Calcualte bounds using the interpolated times and the time taken from cube_t0. This function is used rather than iris's guess bounds method as we want to use the earlier time cube to inform the lowest bound. The interpolated_cube `crd` is modified in place. Args: cube_t0: The input cube corresponding to the earlier time. interpolated_cube: The cube containing the interpolated times, which includes the data corresponding to the time of the later of the two input cubes. Raises: CoordinateNotFoundError: if time or forecast_period coordinates are not present on the input cubes. """ for crd in ["time", "forecast_period"]: try: interpolated_times = np.concatenate( [cube_t0.coord(crd).points, interpolated_cube.coord(crd).points] ) except CoordinateNotFoundError: raise CoordinateNotFoundError( f"Period diagnostic cube is missing expected coordinate: {crd}" ) all_bounds = [] for start, end in zip(interpolated_times[:-1], interpolated_times[1:]): all_bounds.append([start, end]) interpolated_cube.coord(crd).bounds = all_bounds
[docs] @staticmethod def _calculate_accumulation( cube_t0: Cube, period_reference: Cube, interpolated_cube: Cube ): """If the input is an accumulation we use the trapezium rule to calculate a new accumulation for each output period from the rates we converted the accumulations to prior to interpolating. We then renormalise to ensure the total accumulation across the period is unchanged by expressing it as a series of shorter periods. The interpolated cube is modified in place. Args: cube_t0: The input cube corresponding to the earlier time. period_reference: The input cube corresponding to the later time, with the values prior to conversion to rates. interpolated_cube: The cube containing the interpolated times, which includes the data corresponding to the time of the later of the two input cubes. """ # Calculate an average rate for the period from the edges accumulation_edges = [cube_t0, *interpolated_cube.slices_over("time")] period_rates = np.array( [ (a.data + b.data) / 2 for a, b in zip(accumulation_edges[:-1], accumulation_edges[1:]) ] ) interpolated_cube.data = period_rates # Multiply the average rate by the length of each period to get a new # accumulation. new_periods = np.diff(interpolated_cube.coord("forecast_period").bounds) for _ in range(interpolated_cube.ndim - new_periods.ndim): new_periods = np.expand_dims(new_periods, axis=1) interpolated_cube.data = np.multiply(new_periods, interpolated_cube.data) # Renormalise the total of the new periods to ensure it matches the # total expressed in the longer original period. (time_coord,) = interpolated_cube.coord_dims("time") interpolated_total = np.sum(interpolated_cube.data, axis=time_coord) renormalisation = period_reference.data / interpolated_total interpolated_cube.data *= renormalisation interpolated_cube.data = interpolated_cube.data.astype(FLOAT_DTYPE)
[docs] def process(self, cube_t0: Cube, cube_t1: Cube) -> CubeList: """ Interpolate data to intermediate times between validity times of cube_t0 and cube_t1. Args: cube_t0: A diagnostic cube valid at the beginning of the period within which interpolation is to be permitted. cube_t1: A diagnostic cube valid at the end of the period within which interpolation is to be permitted. Returns: A list of cubes interpolated to the desired times. Raises: TypeError: If cube_t0 and cube_t1 are not of type iris.cube.Cube. ValueError: A mix of instantaneous and period diagnostics have been used as inputs. ValueError: A period type has been declared but inputs are not period diagnostics. ValueError: Period diagnostics with overlapping periods. CoordinateNotFoundError: The input cubes contain no time coordinate. ValueError: Cubes contain multiple validity times. ValueError: The input cubes are ordered such that the initial time cube has a later validity time than the final cube. """ if not isinstance(cube_t0, iris.cube.Cube) or not isinstance( cube_t1, iris.cube.Cube ): msg = ( "Inputs to TemporalInterpolation are not of type " "iris.cube.Cube, first input is type " "{}, second input is type {}".format(type(cube_t0), type(cube_t1)) ) raise TypeError(msg) try: (initial_time,) = iris_time_to_datetime(cube_t0.coord("time")) (final_time,) = iris_time_to_datetime(cube_t1.coord("time")) except CoordinateNotFoundError: msg = "Cube provided to TemporalInterpolation contains no time coordinate." raise CoordinateNotFoundError(msg) except ValueError: msg = ( "Cube provided to TemporalInterpolation contains multiple " "validity times, only one expected." ) raise ValueError(msg) if initial_time > final_time: raise ValueError( "TemporalInterpolation input cubes " "ordered incorrectly" ", with the final time being before the initial " "time." ) cube_t0_bounds = cube_t0.coord("time").has_bounds() cube_t1_bounds = cube_t1.coord("time").has_bounds() if cube_t0_bounds + cube_t1_bounds == 1: raise ValueError( "Period and non-period diagnostics cannot be combined for" " temporal interpolation." ) if cube_t0_bounds and not self.period_inputs: raise ValueError( "Interpolation of period diagnostics should be done using " "the appropriate period specifier (accumulation, min or max)." ) if self.period_inputs: # Declaring period type requires the inputs be period diagnostics. if not cube_t0_bounds: raise ValueError( "A period method has been declared for temporal " "interpolation (max, min, or accumulation). Period " "diagnostics must be provided. The input cubes have no " "time bounds." ) cube_interval = ( cube_t1.coord("time").points[0] - cube_t0.coord("time").points[0] ) (period,) = np.diff(cube_t1.coord("time").bounds[0]) if not cube_interval == period: raise ValueError( "The diagnostic provided represents the period " f"{period / 3600} hours. The interval between the " f"diagnostics is {cube_interval / 3600} hours. Temporal " "interpolation can only be applied to a period " "diagnostic provided at intervals that match the " "diagnostic period such that all points in time are " "captured by only one of the inputs and do not overlap." ) time_list = self.construct_time_list(initial_time, final_time) # If the target output time is the same as the time at which the # trailing input is valid, just return it unchanged. if ( len(time_list[0][1]) == 1 and time_list[0][1][0] == cube_t1.coord("time").cell(0).point ): return CubeList([cube_t1]) # If the units of the two cubes are degrees, assume we are dealing with # directions. Convert the directions to complex numbers so # interpolations (esp. the 0/360 wraparound) are handled in a sane # fashion. if cube_t0.units == "degrees" and cube_t1.units == "degrees": cube_t0.data = deg_to_complex(cube_t0.data) cube_t1.data = deg_to_complex(cube_t1.data) # Convert accumulations into rates to allow interpolation using trends # in the data and to accommodate non-uniform output intervals. This also # accommodates cube_t0 and cube_t1 representing different periods of # accumulation, for example where the forecast period interval changes # in an NWP model's output. if self.accumulation: cube_t0.data /= np.diff(cube_t0.coord("forecast_period").bounds[0])[0] period_reference = cube_t1.copy() cube_t1.data /= np.diff(cube_t1.coord("forecast_period").bounds[0])[0] cubes = CubeList([cube_t0, cube_t1]) cube = MergeCubes()(cubes) interpolated_cube = cube.interpolate(time_list, iris.analysis.Linear()) if cube_t0.units == "degrees" and cube_t1.units == "degrees": interpolated_cube.data = complex_to_deg(interpolated_cube.data) if self.period_inputs: # Add bounds to the time coordinates of the interpolated outputs # if the inputs were period diagnostics. self.add_bounds(cube_t0, interpolated_cube) # Apply suitable constraints to the returned values. # - accumulations are renormalised to ensure the period total is # unchanged when broken into shorter periods. # - period maximums are enforced to not exceed the original # maximum that occurred across the whole longer period. # - period minimums are enforced to not be below the original # minimum that occurred across the whole longer period. if self.accumulation: self._calculate_accumulation( cube_t0, period_reference, interpolated_cube ) elif self.max: interpolated_cube.data = np.minimum( cube_t1.data, interpolated_cube.data ) elif self.min: interpolated_cube.data = np.maximum( cube_t1.data, interpolated_cube.data ) self.enforce_time_coords_dtype(interpolated_cube) interpolated_cubes = CubeList() if self.interpolation_method == "solar": interpolated_cubes = self.solar_interpolate(cube, interpolated_cube) elif self.interpolation_method == "daynight": interpolated_cubes = self.daynight_interpolate(interpolated_cube) else: for single_time in interpolated_cube.slices_over("time"): interpolated_cubes.append(single_time) return interpolated_cubes
[docs] class DurationSubdivision: """Subdivide a duration diagnostic, e.g. sunshine duration, into shorter periods, optionally applying a night mask to ensure that quantities defined only in the day or night are not spread into night or day periods respectively. This is a very simple approach. In the case of sunshine duration the duration is divided up evenly across the short periods defined by the fidelity argument. These are then optionally masked to zero for chosen periods (day or night). Values in the non-zeroed periods are then renormalised relative to the original period total, such that the total across the whole period ought to equal the original. This is not always possible as the night mask applied is simpler than e.g. the radiation scheme impact on a 3D orography. As such the renormalisation could yield durations longer than the fidelity period in each non-zeroed period as it tries to allocate e.g. 5 hours of sunlight across 4 non-zeroed hours. This is not physical, so the renormalisation is partnered with a clip that limits the duration allocated to the renormalised periods to not exceed their length. The result of this is that the original sunshine durations cannot be recovered for points that are affected. Instead the calculated night mask is limiting the accuracy to allow the subdivision to occur. This is the cost of this method. Note that this method cannot account for any weather impacts e.g. cloud that is affecting the sunshine duration in a period. If a 6-hour period is split into three 2-hour periods the split will be even regardless of when thick cloud might occur. """
[docs] def __init__( self, target_period: int, fidelity: int, night_mask: bool = True, day_mask: bool = False, ): """Define the length of the target periods to be constructed and the intermediate fidelity. This fidelity is the length of the shorter periods into which the data is split and from which the target periods are constructed. A shorter fidelity period allows the time dependent day or night masks to be applied more accurately. Args: target_period: The time period described by the output cubes in seconds. The data will be reconstructed into non-overlapping periods. The target_period must be a factor of the original period. fidelity: The shortest increment in seconds into which the input periods are divided and to which the night mask is applied. The target periods are reconstructed from these shorter periods. Shorter fidelity periods better capture where the day / night discriminator falls. night_mask: If true, points that fall at night are zeroed and duration reallocated to day time periods as much as possible. day_mask: If true, points that fall in the day time are zeroed and duration reallocated to night time periods as much as possible. Raises: ValueError: If target_period and / or fidelity are not positive integers. ValueError: If day and night mask options are both set True. """ for item in [target_period, fidelity]: if item <= 0: raise ValueError( "Target period and fidelity must be a positive integer " "numbers of seconds. Currently set to " f"target_period: {target_period}, fidelity: {fidelity}" ) self.target_period = target_period self.fidelity = fidelity if night_mask and day_mask: raise ValueError( "Only one or neither of night_mask and day_mask may be set to True" ) elif not night_mask and not day_mask: self.mask_value = None else: self.mask_value = 0 if night_mask else 1
[docs] @staticmethod def cube_period(cube: Cube) -> int: """Return the time period of the cube in seconds. Args: cube: The cube for which the period is to be returned. Return: period: Period of cube time coordinate in seconds. """ (period,) = np.diff(cube.coord("time").bounds[0]) return period
[docs] def allocate_data(self, cube: Cube, period: int) -> Cube: """Allocate fractions of the original cube duration diagnostic to shorter fidelity periods with metadata that describes these shorter periods appropriately. The fidelity period cubes will be merged to form a cube with a longer time dimension. This cube will be returned and used elsewhere to construct the target period cubes. Args: cube: The original period cube from which duration data will be taken and divided up. period: The period of the input cube in seconds. Returns: A cube, with a time dimension, that contains the subdivided data. """ # Split the whole period duration into allocations for each fidelity # period. intervals = period // self.fidelity interval_data = (cube.data / intervals).astype(cube.data.dtype) daynightplugin = DayNightMask() start_time, _ = cube.coord("time").bounds.flatten() interpolated_cubes = iris.cube.CubeList() for i in range(intervals): interval_cube = cube.copy(data=interval_data.copy()) interval_start = start_time + i * self.fidelity interval_end = start_time + (i + 1) * self.fidelity interval_cube.coord("time").points = np.array( [interval_end], dtype=np.int64 ) interval_cube.coord("time").bounds = np.array( [[interval_start, interval_end]], dtype=np.int64 ) if self.mask_value is not None: daynight_mask = daynightplugin(interval_cube).data daynight_mask = np.broadcast_to(daynight_mask, interval_cube.shape) interval_cube.data[daynight_mask == self.mask_value] = 0.0 interpolated_cubes.append(interval_cube) return interpolated_cubes.merge_cube()
[docs] @staticmethod def renormalisation_factor(cube: Cube, fidelity_period_cube: Cube) -> np.ndarray: """Sum up the total of the durations distributed amongst the fidelity period cubes following the application of any masking. These are then used with the durations in the unsubdivided original data to calculate a factor to restore the correct totals; note that where clipping plays a role the original totals may not be restored. Args: cube: The original period cube of duration data. fidelity_period_cube: The cube of fidelity period durations (the original durations divided up into shorter fidelity periods). Returns: factor: An array of factors that can be used to multiply up the fidelity period durations such that when the are summed up they are equal to the original durations. """ retotal = fidelity_period_cube.collapsed("time", iris.analysis.SUM) factor = cube.data / retotal.data # Masked points indicate divide by 0, set these points to 0. Also handle # a case in which there is no masking on the factor array. try: factor = factor.filled(0) except AttributeError: factor[factor == np.inf] = 0 return factor
[docs] def construct_target_periods(self, fidelity_period_cube: Cube) -> Cube: """Combine the short fidelity period cubes into cubes that describe the target period. Args: fidelity_period_cube: The short fidelity period cubes from which the target periods are constructed. Returns: A cube containing the target period data with a time dimension with an entry for each target period. These periods combined span the original cube's period. """ new_period_cubes = iris.cube.CubeList() interval = timedelta(seconds=self.target_period) start_time = fidelity_period_cube.coord("time").cell(0).bound[0] end_time = fidelity_period_cube.coord("time").cell(-1).bound[-1] while start_time < end_time: period_constraint = iris.Constraint( time=lambda cell: start_time <= cell.bound[0] < start_time + interval ) components = fidelity_period_cube.extract(period_constraint) component_cube = components.collapsed("time", iris.analysis.SUM) enforce_time_point_standard(component_cube) new_period_cubes.append(component_cube) start_time += interval # The cycle times are already the same. This code will recalculate # the forecasts periods relative to the cycletime for each of our # extracted shorter duration cubes. cycle_time = fidelity_period_cube.coord("forecast_reference_time").cell(0).point new_period_cubes = unify_cycletime(new_period_cubes, cycle_time) return new_period_cubes.merge_cube()
[docs] def process(self, cube: Cube) -> Cube: """Create target period duration diagnostics from the original duration diagnostic data. Args: cube: The original duration diagnostic cube. Returns: A cube containing the target period data with a time dimension with an entry for each period. These periods combined span the original cube's period. Raises: ValueError: The target period is not a factor of the input period. ValueError: The fidelity period is not less than or equal to the target period. """ period = self.cube_period(cube) # If the input cube period matches the target period return it. if period == self.target_period: return cube if period / self.target_period % 1 != 0: raise ValueError( "The target period must be a factor of the original period " "of the input cube and the target period must be <= the input " "period. " f"Input period: {period}, target period: {self.target_period}" ) if self.fidelity > self.target_period: raise ValueError( "The fidelity period must be less than or equal to the " "target period." ) # Ensure that the cube is already self-consistent and does not include # any durations that exceed the period described. This is mostly to # handle grib packing errors for ECMWF data. cube.data = np.clip(cube.data, 0, period, dtype=cube.data.dtype) fidelity_period_cube = self.allocate_data(cube, period) factor = self.renormalisation_factor(cube, fidelity_period_cube) # Apply clipping to limit these values to the maximum possible # duration that can be contained within the period. fidelity_period_cube = fidelity_period_cube.copy( data=np.clip(fidelity_period_cube.data * factor, 0, self.fidelity) ) return self.construct_target_periods(fidelity_period_cube)