Source code for improver.nowcasting.utilities

#!/usr/bin/env python
# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Module with utilities required for nowcasting."""

from typing import List, Union

import iris
import numpy as np
from cf_units import Unit
from iris.cube import Cube, CubeList

from improver import BasePlugin
from improver.utilities import neighbourhood_tools
from improver.utilities.temporal import (
    extract_nearest_time_point,
    iris_time_to_datetime,
)


[docs] class ExtendRadarMask(BasePlugin): """Extend the mask on radar rainrate data based on the radar coverage composite"""
[docs] def __init__(self) -> None: """ Initialise with known values of the coverage composite for which radar data is valid. All other areas will be masked. """ self.coverage_valid = [1, 2]
[docs] def process(self, radar_data: Cube, coverage: Cube) -> Cube: """ Update the mask on the input rainrate cube to reflect where coverage is valid Args: radar_data: Radar data with mask corresponding to radar domains coverage: Radar coverage data containing values: 0: outside composite 1: precip detected 2: precip not detected & 1/32 mm/h detectable at this range 3: precip not detected & 1/32 mm/h NOT detectable Returns: Radar data with mask extended to mask out regions where 1/32 mm/h are not detectable """ # check cube coordinates match for crd in radar_data.coords(): if coverage.coord(crd.name()) != crd: raise ValueError( "Rain rate and coverage composites unmatched " "- coord {}".format( crd.name() ) ) # accommodate data from multiple times radar_data_slices = radar_data.slices( [radar_data.coord(axis="y"), radar_data.coord(axis="x")] ) coverage_slices = coverage.slices( [coverage.coord(axis="y"), coverage.coord(axis="x")] ) cube_list = iris.cube.CubeList() for rad, cov in zip(radar_data_slices, coverage_slices): # create a new mask that is False wherever coverage is valid new_mask = ~np.isin(cov.data, self.coverage_valid) # remask rainrate data remasked_data = np.ma.MaskedArray(rad.data.data, mask=new_mask) cube_list.append(rad.copy(remasked_data)) return cube_list.merge_cube()
[docs] class FillRadarHoles(BasePlugin): """Fill in small "no data" regions in the radar composite by interpolating in log rainrate space. The log-linear transformation does not preserve non-zero rainrates of less than 0.001 mm/h. Since the radar composite encodes trace rain rates with a value of 0.03 mm/h, this should not have any effect on "real" data from the Met Office. """ MIN_RR_MMH = 0.001
[docs] def __init__(self) -> None: """Initialise parameters of interpolation The constants defining neighbourhood size and proportion of neighbouring masked pixels for speckle identification have been empirically tuned for UK radar data. As configured, this method will flag "holes" of up to 24 pixels in size (30% of a 9 x 9 neighbourhood). The radius used to interpolate data into these holes has been chosen to match these constants, by defining the smallest radius that ensures there will always be valid data in the neighbourhood (25 pixels) over which averaging is performed. """ # shape of neighbourhood over which to search for masked neighbours self.r_speckle = 4 self.window_shape = ((self.r_speckle * 2) + 1, (self.r_speckle * 2) + 1) # proportion of masked neighbours below which a pixel is considered to # be isolated "speckle", which can be filled in by interpolation p_masked = 0.3 # number of masked neighbours in neighbourhood self.max_masked_values = self.window_shape[0] * self.window_shape[1] * p_masked # radius of neighbourhood from which to calculate interpolated values self.r_interp = 2
[docs] def _find_and_interpolate_speckle(self, cube: Cube) -> None: """Identify and interpolate "speckle" points, where "speckle" is defined as areas of "no data" that are small enough to fill by interpolation without affecting data integrity. We would not wish to interpolate large areas as this may give false confidence in "no precipitation", where in fact precipitation exists in a "no data" region. Masked pixels near the borders of the input data array are not considered for interpolation. Args: cube: Cube containing rainrates (mm/h). Data modified in place. """ mask_windows = neighbourhood_tools.pad_and_roll( cube.data.mask, self.window_shape, mode="constant", constant_values=1 ) data_windows = neighbourhood_tools.pad_and_roll( cube.data, self.window_shape, mode="constant", constant_values=np.nan ) # find indices of "speckle" pixels indices = np.where( (mask_windows[..., self.r_speckle, self.r_speckle] == 1) & (np.sum(mask_windows, axis=(-2, -1)) < self.max_masked_values) ) # average data from the 5x5 nbhood around each "speckle" point bounds = slice( self.r_speckle - self.r_interp, self.r_speckle + self.r_interp + 1 ) data = data_windows[indices][..., bounds, bounds] mask = mask_windows[indices][..., bounds, bounds] for row_ind, col_ind, data_win, mask_win in zip(*indices, data, mask): valid_points = data_win[mask_win == 0] mean = np.mean( np.where(valid_points > self.MIN_RR_MMH, np.log10(valid_points), np.nan) ) # when data value is set, mask is removed at that point if np.isnan(mean): cube.data[row_ind, col_ind] = 0 else: cube.data[row_ind, col_ind] = np.power(10, mean)
[docs] def process(self, masked_radar: Cube) -> Cube: """ Fills in and unmasks small "no data" regions within the radar composite, to minimise gaps in the extrapolation nowcast. Args: masked_radar: A masked cube of radar precipitation rates Returns: A masked cube with continuous coverage over the radar composite domain, where missing data has been interpolated """ # extract precipitation rate data in mm h-1 masked_radar_mmh = masked_radar.copy() masked_radar_mmh.convert_units("mm h-1") # fill "holes" in data by interpolation self._find_and_interpolate_speckle(masked_radar_mmh) # return new cube in original units masked_radar_mmh.convert_units(masked_radar.units) return masked_radar_mmh
[docs] class ApplyOrographicEnhancement(BasePlugin): """Apply orographic enhancement to precipitation rate input, either to add or subtract an orographic enhancement component."""
[docs] def __init__(self, operation: str) -> None: """Initialise class. Args: operation: Operation ("add" or "subtract") to apply to the incoming cubes. Raises: ValueError: Operation not supported. """ # A minimum precipitation rate in mm/h that will be used as a lower # precipitation rate threshold. self.min_precip_rate_mmh = 1 / 32.0 self.operation = operation
def __repr__(self) -> str: """Represent the configured plugin instance as a string.""" result = "<ApplyOrographicEnhancement: operation: {}>" return result.format(self.operation)
[docs] @staticmethod def _select_orographic_enhancement_cube( precip_cube: Cube, oe_cube: Cube, allowed_time_diff: int = 1800 ) -> Cube: """Select the orographic enhancement cube with the required time coordinate. Args: precip_cube: Cube containing the input precipitation fields. oe_cube: Cube containing orographic enhancement fields at one or more times. allowed_time_diff: The maximum permitted difference, in integer seconds, between the datetime of the precipitation cube and the time points available within the orographic enhancement cube. If this limit is exceeded, then an error is raised. Returns: Cube containing the orographic enhancement field at the required time. Raises: ValueError: If required time step is not available within tolerance (in theory. In practise, the tolerance is left as the default None, which matches ANY available field regardless of time offset. So this error will never be thrown.) """ (time_point,) = iris_time_to_datetime(precip_cube.coord("time").copy()) oe_cube_slice = extract_nearest_time_point( oe_cube, time_point, allowed_dt_difference=allowed_time_diff ) return oe_cube_slice
[docs] def _apply_orographic_enhancement(self, precip_cube: Cube, oe_cube: Cube) -> Cube: """Combine the precipitation rate cube and the orographic enhancement cube. Args: precip_cube: Cube containing the input precipitation field. oe_cube: Cube containing the orographic enhancement field matching the validity time of the precipitation cube. Returns: Cube containing the precipitation rate field modified by the orographic enhancement cube. """ # Convert orographic enhancement into the units of the precipitation # rate cube. oe_cube.convert_units(precip_cube.units) # Set orographic enhancement to be zero for points with a # precipitation rate of < 1/32 mm/hr. original_units = Unit("mm/hr") threshold_in_cube_units = original_units.convert( self.min_precip_rate_mmh, precip_cube.units ) # Ignore invalid warnings generated if e.g. a NaN is encountered # within the less than (<) comparison. with np.errstate(invalid="ignore"): oe_cube.data[precip_cube.data < threshold_in_cube_units] = 0.0 # Add / subtract orographic enhancement where data is not masked cube = precip_cube.copy() if self.operation == "add": cube.data = cube.data + oe_cube.data elif self.operation == "subtract": cube.data = cube.data - oe_cube.data else: msg = ( "Operation '{}' not supported for combining " "precipitation rate and " "orographic enhancement.".format(self.operation) ) raise ValueError(msg) return cube
[docs] def _apply_minimum_precip_rate(self, precip_cube: Cube, cube: Cube) -> Cube: """Ensure that negative precipitation rates are capped at the defined minimum precipitation rate. Args: precip_cube: Cube containing a precipitation rate input field. cube: Cube containing the precipitation rate field after combining with orographic enhancement. Returns: Cube containing the precipitation rate field where any negative precipitation rates have been capped at the defined minimum precipitation rate. """ if self.operation == "subtract": original_units = Unit("mm/hr") threshold_in_cube_units = original_units.convert( self.min_precip_rate_mmh, cube.units ) threshold_in_precip_cube_units = original_units.convert( self.min_precip_rate_mmh, precip_cube.units ) # Ignore invalid warnings generated if e.g. a NaN is encountered # within the less than (<) comparison. with np.errstate(invalid="ignore"): # Create a mask computed from where the input precipitation # cube is greater or equal to the threshold and the result # of combining the precipitation rate input cube with the # orographic enhancement has generated a cube with # precipitation rates less than the threshold. mask = (precip_cube.data >= threshold_in_precip_cube_units) & ( cube.data <= threshold_in_cube_units ) # Set any values lower than the threshold to be equal to # the minimum precipitation rate. cube.data[mask] = threshold_in_cube_units return cube
[docs] def process( self, precip_cubes: Union[Cube, List[Cube]], orographic_enhancement_cube: Cube ) -> CubeList: """Apply orographic enhancement by modifying the input fields. This can include either adding or deleting the orographic enhancement component from the input precipitation fields. Args: precip_cubes: Cube or iterable (list, CubeList or tuple) of cubes containing the input precipitation fields. orographic_enhancement_cube: Cube containing the orographic enhancement fields. Returns: CubeList of precipitation rate cubes that have been updated using orographic enhancement. """ if isinstance(precip_cubes, iris.cube.Cube): precip_cubes = iris.cube.CubeList([precip_cubes]) updated_cubes = iris.cube.CubeList([]) for precip_cube in precip_cubes: oe_cube = self._select_orographic_enhancement_cube( precip_cube, orographic_enhancement_cube.copy() ) cube = self._apply_orographic_enhancement(precip_cube, oe_cube) cube = self._apply_minimum_precip_rate(precip_cube, cube) updated_cubes.append(cube) return updated_cubes