Source code for improver.nbhood.recursive_filter

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Module to apply a recursive filter to neighbourhooded data."""

from typing import List, Optional, Tuple

import iris
import numpy as np
from iris.cube import Cube, CubeList
from numpy import ndarray

from improver import PostProcessingPlugin
from improver.generate_ancillaries.generate_orographic_smoothing_coefficients import (
    OrographicSmoothingCoefficients,
)
from improver.utilities.cube_checker import check_cube_coordinates
from improver.utilities.pad_spatial import pad_cube_with_halo, remove_halo_from_cube


[docs] class RecursiveFilter(PostProcessingPlugin): """ Apply a recursive filter to the input cube. """
[docs] def __init__( self, iterations: Optional[int] = None, edge_width: int = 15, ) -> None: """ Initialise the class. Args: iterations: The number of iterations of the recursive filter. edge_width: Half the width of the padding halo applied before recursive filtering. Raises: ValueError: If number of iterations is not None and is set such that iterations is less than 1. """ if iterations is not None: if iterations < 1: raise ValueError( "Invalid number of iterations: must be >= 1: {}".format(iterations) ) self.iterations = iterations self.edge_width = edge_width self.smoothing_coefficient_name_format = "smoothing_coefficient_{}"
[docs] @staticmethod def _recurse_forward( grid: ndarray, smoothing_coefficients: ndarray, axis: int ) -> ndarray: """ Method to run the recursive filter in the forward direction. In the forward direction: Recursive filtering is calculated as: .. math:: B_i = ((1 - \\rm{smoothing\\_coefficient_{i-1}}) \\times A_i) + (\\rm{smoothing\\_coefficient_{i-1}} \\times B_{i-1}) Progressing from gridpoint i-1 to i: :math:`B_i` = new value at gridpoint i :math:`A_i` = Old value at gridpoint i :math:`B_{i-1}` = New value at gridpoint i - 1 Args: grid: 2D array containing the input data to which the recursive filter will be applied. smoothing_coefficients: Matching 2D array of smoothing_coefficient values that will be used when applying the recursive filter along the specified axis. axis: Index of the spatial axis (0 or 1) over which to recurse. Returns: 2D array containing the smoothed field after the recursive filter method has been applied to the input array in the forward direction along the specified axis. """ lim = grid.shape[axis] for i in range(1, lim): if axis == 0: grid[i, :] = (1.0 - smoothing_coefficients[i - 1, :]) * grid[ i, : ] + smoothing_coefficients[i - 1, :] * grid[i - 1, :] if axis == 1: grid[:, i] = (1.0 - smoothing_coefficients[:, i - 1]) * grid[ :, i ] + smoothing_coefficients[:, i - 1] * grid[:, i - 1] return grid
[docs] @staticmethod def _recurse_backward( grid: ndarray, smoothing_coefficients: ndarray, axis: int ) -> ndarray: """ Method to run the recursive filter in the backwards direction. In the backwards direction: Recursive filtering is calculated as: .. math:: B_i = ((1 - \\rm{smoothing\\_coefficient}) \\times A_i) + (\\rm{smoothing\\_coefficient} \\times B_{i+1}) Progressing from gridpoint i+1 to i: :math:`B_i` = new value at gridpoint i :math:`A_i` = Old value at gridpoint i :math:`B_{i+1}` = New value at gridpoint i+1 Args: grid: 2D array containing the input data to which the recursive filter will be applied. smoothing_coefficients: Matching 2D array of smoothing_coefficient values that will be used when applying the recursive filter along the specified axis. axis: Index of the spatial axis (0 or 1) over which to recurse. Returns: 2D array containing the smoothed field after the recursive filter method has been applied to the input array in the backwards direction along the specified axis. """ lim = grid.shape[axis] for i in range(lim - 2, -1, -1): if axis == 0: grid[i, :] = (1.0 - smoothing_coefficients[i, :]) * grid[ i, : ] + smoothing_coefficients[i, :] * grid[i + 1, :] if axis == 1: grid[:, i] = (1.0 - smoothing_coefficients[:, i]) * grid[ :, i ] + smoothing_coefficients[:, i] * grid[:, i + 1] return grid
[docs] @staticmethod def _run_recursion( cube: Cube, smoothing_coefficients_x: Cube, smoothing_coefficients_y: Cube, iterations: int, ) -> Cube: """ Method to run the recursive filter. Args: cube: 2D cube containing the input data to which the recursive filter will be applied. smoothing_coefficients_x: 2D cube containing array of smoothing_coefficient values that will be used when applying the recursive filter along the x-axis. smoothing_coefficients_y: 2D cube containing array of smoothing_coefficient values that will be used when applying the recursive filter along the y-axis. iterations: The number of iterations of the recursive filter Returns: Cube containing the smoothed field after the recursive filter method has been applied to the input cube. """ (x_index,) = cube.coord_dims(cube.coord(axis="x").name()) (y_index,) = cube.coord_dims(cube.coord(axis="y").name()) output = cube.data for _ in range(iterations): output = RecursiveFilter._recurse_forward( output, smoothing_coefficients_x.data, x_index ) output = RecursiveFilter._recurse_backward( output, smoothing_coefficients_x.data, x_index ) output = RecursiveFilter._recurse_forward( output, smoothing_coefficients_y.data, y_index ) output = RecursiveFilter._recurse_backward( output, smoothing_coefficients_y.data, y_index ) cube.data = output return cube
[docs] def _validate_coefficients( self, cube: Cube, smoothing_coefficients: CubeList ) -> List[Cube]: """Validate the smoothing coefficients cubes. Args: cube: 2D cube containing the input data to which the recursive filter will be applied. smoothing_coefficients: A cubelist containing two cubes of smoothing_coefficient values, one corresponding to smoothing in the x-direction, and the other to smoothing in the y-direction. Returns: A list of smoothing coefficients cubes ordered: [x-coeffs, y-coeffs]. Raises: ValueError: Smoothing coefficient cubes are not named correctly. ValueError: If any smoothing_coefficient cube value is over 0.5 ValueError: The coordinate to be smoothed within the smoothing coefficient cube is not of the expected length. ValueError: The coordinate to be smoothed within the smoothing coefficient cube does not have the expected points. """ # Ensure cubes are in x, y order. smoothing_coefficients.sort(key=lambda cell: cell.name()) axes = ["x", "y"] for axis, smoothing_coefficient in zip(axes, smoothing_coefficients): # Check the smoothing coefficient cube name is as expected expected_name = self.smoothing_coefficient_name_format.format(axis) if smoothing_coefficient.name() != expected_name: msg = ( "The smoothing coefficient cube name {} does not match the " "expected name {}".format( smoothing_coefficient.name(), expected_name ) ) raise ValueError(msg) # Check the smoothing coefficients do not exceed an empirically determined # maximum value; larger values damage conservation significantly. if (smoothing_coefficient.data > 0.5).any(): raise ValueError( "All smoothing_coefficient values must be less than 0.5. " "A large smoothing_coefficient value leads to poor " "conservation of probabilities" ) for test_axis in axes: coefficient_crd = smoothing_coefficient.coord(axis=test_axis) if test_axis == axis: expected_points = ( cube.coord(axis=test_axis).points[1:] + cube.coord(axis=test_axis).points[:-1] ) / 2 else: expected_points = cube.coord(axis=test_axis).points if len(coefficient_crd.points) != len( expected_points ) or not np.allclose(coefficient_crd.points, expected_points): msg = ( f"The smoothing coefficients {test_axis} dimension does not " "have the expected length or values compared with the cube " "to which smoothing is being applied.\n\nSmoothing " "coefficient cubes must have coordinates that are:\n" "- one element shorter along the dimension being smoothed " f"({axis}) than in the target cube, with points in that " "dimension equal to the mean of each pair of points along " "the dimension in the target cube\n- equal to the points " "in the target cube along the dimension not being smoothed" ) raise ValueError(msg) return smoothing_coefficients
[docs] def _pad_coefficients(self, coeff_x, coeff_y): """Pad smoothing coefficients""" pad_x, pad_y = [ pad_cube_with_halo( coeff, 2 * self.edge_width, 2 * self.edge_width, pad_method="symmetric", ) for coeff in [coeff_x, coeff_y] ] return pad_x, pad_y
[docs] @staticmethod def _update_coefficients_from_mask( coeffs_x: Cube, coeffs_y: Cube, mask: Cube ) -> Tuple[Cube, Cube]: """ Zero all smoothing coefficients for data points that are masked Args: coeffs_x coeffs_y mask Returns: Updated smoothing coefficients """ plugin = OrographicSmoothingCoefficients( use_mask_boundary=False, invert_mask=False ) plugin.zero_masked(coeffs_x, coeffs_y, mask) return coeffs_x, coeffs_y
[docs] def process( self, cube: Cube, smoothing_coefficients: CubeList, variable_mask: bool = False, mask_zeros: bool = False, ) -> Cube: """ Set up the smoothing_coefficient parameters and run the recursive filter. Smoothing coefficients can be generated using :class:`~.OrographicSmoothingCoefficients` and :func:`~improver.cli.generate_orographic_smoothing_coefficients`. The steps undertaken are: 1. Split the input cube into slices determined by the co-ordinates in the x and y directions. 2. Construct an array of filter parameters (smoothing_coefficients_x and smoothing_coefficients_y) for each cube slice that are used to weight the recursive filter in the x- and y-directions. 3. Pad each cube slice with a square-neighbourhood halo and apply the recursive filter for the required number of iterations. 4. Remove the halo from the cube slice and append the recursed cube slice to a 'recursed cube'. 5. Merge all the cube slices in the 'recursed cube' into a 'new cube'. 6. Modify the 'new cube' so that its scalar dimension co-ordinates are consistent with those in the original input cube. 7. Return the 'new cube' which now contains the recursively filtered values for the original input cube. The smoothing_coefficient determines how much "value" of a cell undergoing filtering is comprised of the current value at that cell and how much comes from the adjacent cell preceding it in the direction in which filtering is being applied. A larger smoothing_coefficient results in a more significant proportion of a cell's new value coming from its neighbouring cell. Args: cube: Cube containing the input data to which the recursive filter will be applied. smoothing_coefficients: A cubelist containing two cubes of smoothing_coefficient values, one corresponding to smoothing in the x-direction, and the other to smoothing in the y-direction. variable_mask Determines whether each spatial slice of the input cube can have a different mask. If False and cube is masked, a check will be made that the same mask is present on each spatial slice. If True, each spatial slice of cube may contain a different spatial mask. mask_zeros If set true all of the values of 0 in the cube will be masked, stopping the recursive filter from spreading values into these areas. They will then be unmasked later on. If the input cube was masked this mask will be reapplied to the output at the end. Returns: Cube containing the smoothed field after the recursive filter method has been applied. Raises: ValueError: If variable_mask is False and the masks on each spatial slice of cube are not identical. """ if np.ma.isMaskedArray(cube.data): cube_mask = np.ma.getmaskarray(cube.data) # If the array is masked this gets the mask so it can be used and # reapplied later on. else: cube_mask = None cube_format = next(cube.slices([cube.coord(axis="y"), cube.coord(axis="x")])) coeffs_x, coeffs_y = self._validate_coefficients( cube_format, smoothing_coefficients ) if not variable_mask and np.ma.is_masked(cube.data): # check that all spatial slices have identical masks mask_cube = next( cube.slices([cube.coord(axis="y"), cube.coord(axis="x")]) ).data.mask for cslice in cube.slices([cube.coord(axis="y"), cube.coord(axis="x")]): if not np.array_equal(cslice.data.mask, mask_cube): raise ValueError( "Input cube contains spatial slices with different masks." ) # This masks any array element that is zero. Performed after variable array # check as zeros may not be located consistently across slices. if mask_zeros: cube.data = np.ma.masked_where(cube.data == 0.0, cube.data, copy=False) recursed_cube = iris.cube.CubeList() for cslice in cube.slices([cube.coord(axis="y"), cube.coord(axis="x")]): padded_cube = pad_cube_with_halo( cslice, 2 * self.edge_width, 2 * self.edge_width, pad_method="symmetric" ) slice_coeffs_x = coeffs_x.copy() slice_coeffs_y = coeffs_y.copy() mask_cube = None if np.ma.is_masked(cslice.data): mask_cube = cslice.copy(data=cslice.data.mask) slice_coeffs_x, slice_coeffs_y = self._update_coefficients_from_mask( slice_coeffs_x, slice_coeffs_y, mask_cube, ) padded_coefficients_x, padded_coefficients_y = self._pad_coefficients( slice_coeffs_x, slice_coeffs_y ) new_cube = self._run_recursion( padded_cube, padded_coefficients_x, padded_coefficients_y, self.iterations, ) new_cube = remove_halo_from_cube( new_cube, 2 * self.edge_width, 2 * self.edge_width ) if mask_cube is not None: new_cube.data = np.ma.MaskedArray(new_cube.data, mask=mask_cube.data) recursed_cube.append(new_cube) new_cube = recursed_cube.merge_cube() if mask_zeros: new_cube.data = np.ma.getdata(new_cube.data) # This unmasks all the data on the cube if cube_mask is not None: # Reapplying the original mask so the data doesn't change. new_cube.data = np.ma.array(new_cube.data, mask=cube_mask) new_cube = check_cube_coordinates(cube, new_cube) return new_cube