# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Provides support utilities."""
import copy
import warnings
from abc import ABC, abstractmethod
from typing import List, Optional, Tuple, Union
import cartopy.crs as ccrs
import iris
import netCDF4
import numpy as np
from cartopy.crs import CRS
from cf_units import Unit
from iris.coord_systems import CoordSystem, GeogCS
from iris.coords import AuxCoord, CellMethod, Coord, DimCoord
from iris.cube import Cube, CubeList
from numpy import ndarray
from numpy.ma import MaskedArray
from scipy.ndimage.filters import (
generic_filter,
maximum_filter,
minimum_filter,
uniform_filter,
)
from improver import BasePlugin, PostProcessingPlugin
from improver.metadata.amend import update_diagnostic_name
from improver.metadata.constants import FLOAT_DTYPE
from improver.metadata.probabilistic import in_vicinity_name_format, is_probability
from improver.metadata.utilities import (
create_new_diagnostic_cube,
generate_mandatory_attributes,
)
from improver.utilities.cube_checker import check_cube_coordinates, spatial_coords_match
from improver.utilities.cube_manipulation import enforce_coordinate_ordering
[docs]
def check_if_grid_is_equal_area(
cube: Cube, require_equal_xy_spacing: bool = True
) -> None:
"""
Identify whether the grid is an equal area grid, by checking whether points
are equally spaced along each of the x- and y-axes. By default this
function also checks whether the grid spacing is the same in both spatial
dimensions.
Args:
cube:
Cube with coordinates that will be checked.
require_equal_xy_spacing:
Flag to require the grid is equally spaced in the two spatial
dimensions (not strictly required for equal-area criterion).
Raises:
ValueError: If coordinate points are not equally spaced along either
axis (from calculate_grid_spacing)
ValueError: If point spacing is not equal for the two spatial axes
"""
x_diff = calculate_grid_spacing(cube, "metres", axis="x")
y_diff = calculate_grid_spacing(cube, "metres", axis="y")
if require_equal_xy_spacing and not np.isclose(x_diff, y_diff):
raise ValueError("Grid does not have equal spacing in x and y dimensions")
[docs]
def calculate_grid_spacing(
cube: Cube, units: Union[Unit, str], axis: str = "x", rtol: float = 1.0e-5
) -> float:
"""
Returns the grid spacing of a given spatial axis. This will be positive for
axes that stride negatively.
Args:
cube:
Cube of data on equal area grid
units:
Unit in which the grid spacing is required
axis:
Axis ('x' or 'y') to use in determining grid spacing
rtol:
relative tolerance
Returns:
Grid spacing in required unit
Raises:
ValueError: If points are not equally spaced
"""
coord = cube.coord(axis=axis).copy()
coord.convert_units(units)
diffs = np.abs(np.diff(coord.points))
diffs_mean = np.mean(diffs)
if not np.allclose(diffs, diffs_mean, rtol=rtol, atol=0.0):
raise ValueError(
"Coordinate {} points are not equally spaced".format(coord.name())
)
else:
return diffs_mean
[docs]
def distance_to_number_of_grid_cells(
cube: Cube, distance: float, axis: str = "x", return_int: bool = True
) -> Union[float, int]:
"""
Return the number of grid cells in the x and y direction based on the
input distance in metres. Requires an equal-area grid on which the spacing
is equal in the x- and y- directions.
Args:
cube:
Cube containing the x and y coordinates, which will be used for
calculating the number of grid cells in the x and y direction,
which equates to the requested distance in the x and y direction.
distance:
Distance in metres. Must be positive.
return_int:
If true only integer number of grid_cells are returned, rounded
down. If false the number of grid_cells returned will be a float.
axis:
Axis ('x' or 'y') to use in determining grid spacing
Returns:
Number of grid cells along the specified (x or y) axis equal to the
requested distance in metres.
Raises:
ValueError: If a non-positive distance is provided.
"""
d_error = f"Distance of {distance}m"
if distance <= 0:
raise ValueError(f"Please specify a positive distance in metres. {d_error}")
# calculate grid spacing along chosen axis
grid_spacing_metres = calculate_grid_spacing(cube, "m", axis=axis)
grid_cells = distance / abs(grid_spacing_metres)
if return_int:
grid_cells = int(grid_cells)
if grid_cells == 0:
zero_distance_error = f"{d_error} gives zero cell extent"
raise ValueError(zero_distance_error)
return grid_cells
[docs]
def number_of_grid_cells_to_distance(cube: Cube, grid_points: int) -> float:
"""
Calculate distance in metres equal to the given number of gridpoints
based on the coordinates on an input cube.
Args:
cube:
Cube for which the distance is to be calculated.
grid_points:
Number of grid points to convert.
Returns:
The radius in metres.
"""
check_if_grid_is_equal_area(cube)
spacing = calculate_grid_spacing(cube, "metres")
radius_in_metres = spacing * grid_points
return radius_in_metres
[docs]
class BaseDistanceCalculator(ABC):
"""Base class for distance calculators for cubes with different coordinate systems/axis types"""
[docs]
def __init__(self, cube: Cube):
"""
Args:
cube:
Cube for which the distances will be calculated.
"""
self.cube = cube
self.x_separations_axis, self.y_separation_axis = self.get_difference_axes()
[docs]
@staticmethod
def build_distances_cube(distances: ndarray, dims: List[Coord], axis: str) -> Cube:
"""
Constructs an output cube with units of metres.
Args:
distances:
Data array containing calculated distances with which to populate the output cube.
dims:
Coordinate axes for the output cube. Must match the shape of distances.
axis:
The axis along which distances have been calculated.
"""
return Cube(
distances,
long_name=f"{axis}_distance_between_grid_points",
units="m",
dim_coords_and_dims=dims,
)
[docs]
@staticmethod
def get_midpoints(axis: Coord) -> np.ndarray:
"""
Returns the midpoints along the supplied axis. If the axis is circular, the difference
between the last and first point is included with the assumption that this is in units of
degrees.
"""
points = axis.points
if axis.circular:
points = np.hstack((points, 360 + points[0]))
mean_points = (points[1:] + points[:-1]) / 2
return mean_points.astype(axis.dtype)
[docs]
def get_difference_axes(self) -> Tuple[DimCoord, DimCoord]:
"""Derives and returns the x and y coords for a cube of differences along one axis"""
input_cube_x_axis = self.cube.coord(axis="x")
input_cube_y_axis = self.cube.coord(axis="y")
distance_cube_x_axis = input_cube_x_axis.copy(
points=self.get_midpoints(input_cube_x_axis)
)
distance_cube_y_axis = input_cube_y_axis.copy(
points=self.get_midpoints(input_cube_y_axis)
)
return distance_cube_x_axis, distance_cube_y_axis
[docs]
@abstractmethod
def _get_x_distances(self) -> Cube:
"""
Abstract method for calculating distances along the x axis of the input cube.
The resulting cube shall have two dimensions as the result may be a function of position
along the y axis.
"""
[docs]
@abstractmethod
def _get_y_distances(self) -> Cube:
"""
Abstract method for calculating distances along the y axis of the input cube.
The resulting cube shall have two dimensions.
"""
[docs]
def get_distances(self) -> Tuple[Cube, Cube]:
"""
Calculates and returns the distances between grid points calculated along the cube's
x and y axis.
Returns:
- 2D Cube of x-axis distances.
- 2D Cube of y-axis distances.
"""
return self._get_x_distances(), self._get_y_distances()
[docs]
class LatLonCubeDistanceCalculator(BaseDistanceCalculator):
"""
Distance calculator for cubes using a Geographic Coordinate system.
Assumes that latitude and longitude are given in degrees, and that the origin is at the
intersection of the equator and the prime meridian.
Distances are calculated assuming a spherical earth, resulting in a < 0.15% error when compared
with the full haversine formula.
"""
def __init__(self, cube: Cube):
super().__init__(cube)
self.lats, self.longs = self._get_cube_latlon_points()
self.sphere_radius = cube.coord(axis="x").coord_system.semi_major_axis
[docs]
def _get_cube_latlon_points(self) -> Tuple[ndarray, ndarray]:
"""
Extracts the y-axis and x-axis grid points used by a cube
with a geographic coordinate system.
Returns:
- latitude points used by the cube's grid (in degrees).
- longitude points used by the cube's grid (in degrees).
Raises:
ValueError: Input cube does not use geographic coordinates, and/or
uses units other than degrees.
"""
if (
self.cube.coord(axis="x").units == "degrees"
and self.cube.coord(axis="y").units == "degrees"
):
longs = self.cube.coord(axis="x").points
lats = self.cube.coord(axis="y").points
return lats, longs
raise ValueError(
"Cannot parse spatial axes of the cube provided. "
"Expected lat-long cube with units of degrees."
)
[docs]
def _get_x_distances(self) -> Cube:
"""
Calculates the x-axis distances between adjacent grid points of a cube which uses
Geographic coordinates.
Returns:
A 2D cube containing the x-axis distances between adjacent grid points of the input
cube in metres. As the earth is an oblate spheroid, the x-axis distances vary as
a function of the y-axis.
If the x-axis is marked as being circular, the distance between the last and first
points is included in the output.
x-axis coord positions are shifted to the mid-point of each pair.
"""
lats_as_col = np.expand_dims(self.lats, axis=1)
if self.cube.coord(axis="x").circular:
longs = np.hstack([self.longs, 360 + self.longs[0]])
else:
longs = self.longs
lon_diffs = np.diff(longs)
x_distances = (
self.sphere_radius * np.cos(np.deg2rad(lats_as_col)) * np.deg2rad(lon_diffs)
)
dims = [(self.cube.coord(axis="y"), 0), (self.x_separations_axis, 1)]
return self.build_distances_cube(x_distances, dims, "x")
[docs]
def _get_y_distances(self) -> Cube:
"""
Calculates the y-axis distances between adjacent grid points of a cube which uses
Geographic coordinates.
Returns:
A 2D cube containing the y-axis distances between adjacent grid points of the input
cube in metres.
y-axis coord positions are shifted to the mid-point of each pair.
"""
lat_diffs = np.diff(self.lats)
y_distances = self.sphere_radius * np.deg2rad(lat_diffs)
y_distances_grid = np.tile(np.expand_dims(y_distances, axis=1), len(self.longs))
dims = [(self.y_separation_axis, 0), (self.cube.coord(axis="x"), 1)]
return self.build_distances_cube(y_distances_grid, dims, "y")
[docs]
class ProjectionCubeDistanceCalculator(BaseDistanceCalculator):
"""
Distance calculator for cubes using a projected coordinate system.
Assumes that x and y coordinates can be expressed in metres.
Distances are calculated assuming an equal-area projection.
"""
[docs]
def __init__(self, cube: Cube):
"""
Args:
cube:
Cube for which the distances will be calculated.
Raises:
NotImplementedError:
If the x-axis is marked as being circular.
"""
if cube.coord(axis="x").circular:
raise NotImplementedError(
"Cannot calculate distances between bounding points of a circular projected "
"coordinate."
)
super().__init__(cube)
[docs]
def _get_x_distances(self) -> Cube:
"""
Calculates the x-axis distances between adjacent grid points of a cube which uses
Equal Area coordinates.
Returns:
A 2D cube containing the x-axis distances between the grid points of the input
cube in metres.
x-axis coord positions are shifted to the mid-point of each pair.
"""
x_distances = calculate_grid_spacing(self.cube, axis="x", units="m")
data = np.full(
(self.cube.shape[0], len(self.x_separations_axis.points)), x_distances
)
dims = [
(self.cube.coord("projection_y_coordinate"), 0),
(self.x_separations_axis, 1),
]
return self.build_distances_cube(data, dims, "x")
[docs]
def _get_y_distances(self) -> Cube:
"""
Calculates the y-axis distances between adjacent grid points of a cube which uses
Equal Area coordinates.
Returns:
A 2D cube containing the y-axis distances between the grid points of the input
cube in metres.
y-axis coord positions are shifted to the mid-point of each pair.
"""
y_grid_spacing = calculate_grid_spacing(self.cube, axis="y", units="m")
data = np.full(
(len(self.y_separation_axis.points), self.cube.data.shape[1]),
y_grid_spacing,
)
dims = [
(self.y_separation_axis, 0),
(self.cube.coord("projection_x_coordinate"), 1),
]
return self.build_distances_cube(data, dims, "y")
[docs]
class DistanceBetweenGridSquares(BasePlugin):
"""
Calculates the distances between adjacent grid squares within a cube.
The distances are calculated along the x and y axes individually.
Returned distances are in metres.
The class can handle cubes with either Geographic (lat-long) or Equal Area projections.
For lat-lon cubes, the distances are calculated assuming a spherical earth.
This causes a < 0.15% error compared with the full haversine formula.
"""
[docs]
def _select_distance_calculator(self, cube: Cube):
"""
Chooses which distance calculator class to apply based on the cube's spatial coordinates.
Args:
cube:
Cube for which the distances will be calculated.
Raises:
ValueError: Cube does not have enough information from which to calculate distances
or uses an unsupported coordinate system.
"""
if self._cube_xy_dimensions_are_distances(cube):
self.distance_calculator = ProjectionCubeDistanceCalculator(cube)
elif self._get_cube_spatial_type(cube) == GeogCS:
self.distance_calculator = LatLonCubeDistanceCalculator(cube)
else:
raise ValueError(
"Unsupported cube coordinate system or insufficent information to "
"calculate cube distances. Cube must either have coordinates for the "
"x and y axis with distance units, or use the Geographic (GeogCS) "
"coordinate system. For cubes with x and y dimensions expressed as angles, "
"distance between points cannot be calculated without a coordinate system."
)
[docs]
@staticmethod
def _get_cube_spatial_type(cube: Cube) -> CoordSystem:
"""
Finds the coordinate system used by a cube.
Args:
cube:
Cube to find the coordinate system of.
Returns:
The coordinate system of the cube as an Iris Coordinate System.
"""
coord_system = cube.coord_system()
return type(coord_system)
[docs]
@staticmethod
def _cube_xy_dimensions_are_distances(cube: Cube) -> bool:
"""
Returns true if the given cube has coordinates mapping to the x and y axes with units
measuring distance (as opposed to angular separation) and false otherwise.
Args:
cube:
The iris cube to evaluate.
Returns:
Boolean representing whether the cube has x and y axes defined in a distance unit.
"""
try:
cube.coord(axis="x").convert_units("m")
cube.coord(axis="y").convert_units("m")
return True
except (
TypeError,
ValueError,
iris.exceptions.UnitConversionError,
iris.exceptions.CoordinateNotFoundError,
):
return False
[docs]
def process(self, cube: Cube) -> Tuple[Cube, Cube]:
"""
Calculate the distances between grid points along the x and y axes
and return the result in separate cubes.
Args:
cube:
Cube for which the distances will be calculated.
Returns:
- Cube of x-axis distances.
- Cube of y-axis distances.
"""
self._select_distance_calculator(cube)
return self.distance_calculator.get_distances()
[docs]
class DifferenceBetweenAdjacentGridSquares(BasePlugin):
"""
Calculate the difference between adjacent grid squares within
a cube. The difference is calculated along the x and y axes
individually.
"""
[docs]
@staticmethod
def _axis_wraps_around_meridian(axis: Coord, cube: Cube) -> bool:
"""Returns true if the cube is 'circular' with the given axis wrapping around, i.e. if there
is a smooth transition between 180 degrees and -180 degrees on the axis.
Args:
axis:
Axis to check for circularity.
cube:
The cube to which the axis belongs.
Returns:
True if the axis wraps around the meridian; false otherwise.
"""
return axis.circular and axis == cube.coord(axis="x")
[docs]
def create_difference_cube(
self, cube: Cube, coord_name: str, diff_along_axis: ndarray
) -> Cube:
"""
Put the difference array into a cube with the appropriate metadata.
Args:
cube:
Cube from which the differences have been calculated.
coord_name:
The name of the coordinate over which the difference
have been calculated.
diff_along_axis:
Array containing the differences.
Returns:
Cube containing the differences calculated along the
specified axis.
"""
axis = cube.coord(coord_name)
points = axis.points
if self._axis_wraps_around_meridian(axis, cube):
points = np.hstack((points, 360 + points[0]))
if type(axis.coord_system) is not GeogCS:
raise NotImplementedError(
"DifferenceBetweenAdjacentGridSquares does not support cubes with "
"circular x-axis that do not use a geographic (i.e. latlon) coordinate system."
)
mean_points = (points[1:] + points[:-1]) / 2
# Copy cube metadata and coordinates into a new cube.
# Create a new coordinate for the coordinate along which the
# difference has been calculated.
metadata_dict = copy.deepcopy(cube.metadata._asdict())
diff_cube = Cube(diff_along_axis, **metadata_dict)
for coord in cube.dim_coords:
dims = cube.coord_dims(coord)
if coord.name() in [coord_name]:
coord = coord.copy(points=mean_points)
diff_cube.add_dim_coord(coord.copy(), dims)
for coord in cube.aux_coords:
dims = cube.coord_dims(coord)
diff_cube.add_aux_coord(coord.copy(), dims)
for coord in cube.derived_coords:
dims = cube.coord_dims(coord)
diff_cube.add_aux_coord(coord.copy(), dims)
return diff_cube
[docs]
def calculate_difference(self, cube: Cube, coord_name: str) -> ndarray:
"""
Calculate the difference along the axis specified by the
coordinate.
Args:
cube:
Cube from which the differences will be calculated.
coord_name:
Name of coordinate along which the difference is calculated.
Returns:
Array after the differences have been calculated along the
specified axis.
"""
diff_axis = cube.coord(name_or_coord=coord_name)
diff_axis_number = cube.coord_dims(coord_name)[0]
diff_along_axis = np.diff(cube.data, axis=diff_axis_number)
if self._axis_wraps_around_meridian(diff_axis, cube):
# Get wrap-around difference:
first_column = np.take(cube.data, indices=0, axis=diff_axis_number)
last_column = np.take(cube.data, indices=-1, axis=diff_axis_number)
wrap_around_diff = first_column - last_column
# Apply wrap-around difference vector to diff array:
if diff_axis_number == 0:
diff_along_axis = np.vstack([diff_along_axis, wrap_around_diff])
elif diff_axis_number == 1:
diff_along_axis = np.hstack(
[diff_along_axis, wrap_around_diff.reshape([-1, 1])]
)
return diff_along_axis
[docs]
def process(self, cube: Cube) -> Tuple[Cube, Cube]:
"""
Calculate the difference along the x and y axes and return
the result in separate cubes. The difference along each axis is
calculated using numpy.diff.
Args:
cube:
Cube from which the differences will be calculated.
Returns:
- Cube after the differences have been calculated along the
x axis.
- Cube after the differences have been calculated along the
y axis.
"""
diffs = []
for axis in ["x", "y"]:
coord_name = cube.coord(axis=axis).name()
difference = self.calculate_difference(cube, coord_name)
diff_cube = self.create_difference_cube(cube, coord_name, difference)
self._update_metadata(diff_cube, coord_name, cube.name())
diffs.append(diff_cube)
return tuple(diffs)
[docs]
class GradientBetweenAdjacentGridSquares(PostProcessingPlugin):
"""Calculate the gradients between adjacent grid squares within
a cube. The gradient is calculated along the x and y axis
individually."""
[docs]
def __init__(self, regrid: bool = False) -> None:
"""Initialise plugin.
Args:
regrid:
If True, the gradient cube is regridded to match the spatial
dimensions of the input cube. If False, the two output gradient cubes will have
different spatial coords such that the coord matching the gradient axis will
represent the midpoint of the input cube and will have one fewer points.
If the x-axis is marked as circular, the gradient between the last and first points
is also included.
"""
self.regrid = regrid
[docs]
@staticmethod
def _create_output_cube(gradient: Cube, name: str) -> Cube:
"""
Create the output gradient cube, inheriting all metadata from source, but discarding
the "form_of_difference" attribute.
Args:
gradient:
Gradient values used in the data array of the resulting cube.
name:
Name to apply to the output cube.
Returns:
A cube of the gradients in the coordinate direction specified.
"""
attributes = gradient.attributes
attributes.pop("form_of_difference")
grad_cube = create_new_diagnostic_cube(
name,
gradient.units,
gradient,
generate_mandatory_attributes([gradient]),
optional_attributes=attributes,
data=gradient.data,
)
return grad_cube
[docs]
def process(self, cube: Cube) -> CubeList[Cube, Cube]:
"""
Calculate the gradient along the x and y axes and return
the result in separate cubes. The difference along each axis is
calculated using numpy.diff. This is then divided by the distance
between grid points along the same axis to get the gradient.
Args:
cube:
Cube from which the differences will be calculated.
Returns:
- Cube after the gradients have been calculated along the
x-axis.
- Cube after the gradients have been calculated along the
y-axis.
"""
axis = ["x", "y"]
gradients = CubeList([])
diffs = DifferenceBetweenAdjacentGridSquares()(cube)
distances = DistanceBetweenGridSquares()(cube)
for diff, distance, ax in zip(diffs, distances, axis):
distance.data = distance.data.astype(np.float32)
gradient = diff / distance
grad_cube = self._create_output_cube(
gradient, "gradient_of_" + cube.name() + "_wrt_" + ax
)
if self.regrid:
grad_cube = grad_cube.regrid(cube, iris.analysis.Linear())
gradients.append(grad_cube)
return gradients
[docs]
def operator_within_vicinity(
apply_filter: callable,
fill_value: Union[float, int],
grid: Union[MaskedArray, ndarray],
grid_point_radius: int,
landmask: Optional[ndarray] = None,
) -> Union[MaskedArray, ndarray]:
"""
Evaluate the specified filter over grid points within a defined radius.
Where the operator applied is either an area maxima or minima, all grid
points within the vicinity of the maxima/minima are recorded as having the
same occurrence. If a land-mask has been supplied, process land and sea
points separately.
Args:
apply_filter:
The filter to apply within the vicinity of each point.
fill_value:
The fill-value to use when masking out points within the grid.
grid:
An array of values to which the process is applied.
grid_point_radius:
The radius in grid points about each point within which to
determine the maximum value.
landmask:
A binary grid of the same size as grid that differentiates
between land and sea points to allow the different surface
types to be processed independently.
Returns:
Array where the occurrences have been spatially spread as per
the filter method applied. For maxima/minima, they're equally
likely to have occurred anywhere within the vicinity defined
using the specified radius.
"""
# Convert the grid_point_radius into a number of points along an edge
# length, including the central point, e.g. grid_point_radius = 1,
# points along the edge = 3
grid_points = (2 * grid_point_radius) + 1
processed_grid = grid.copy()
if np.ma.is_masked(grid):
unmasked_grid = grid.data.copy()
unmasked_grid[grid.mask] = fill_value
else:
unmasked_grid = grid.copy()
if landmask is not None:
patch_data = np.empty_like(grid)
for match in (True, False):
matched_data = unmasked_grid.copy()
matched_data[landmask != match] = fill_value
matched_patch_data = apply_filter(matched_data, grid_points)
patch_data = np.where(landmask == match, matched_patch_data, patch_data)
else:
# The following command finds the value for the specified operation for
# each grid point from within a square of length "grid_points"
patch_data = apply_filter(unmasked_grid, grid_points)
if np.ma.is_masked(grid):
# Update only the unmasked values
processed_grid.data[~grid.mask] = patch_data[~grid.mask]
else:
processed_grid = patch_data
return processed_grid
[docs]
def maximum_within_vicinity(
grid: Union[MaskedArray, ndarray],
grid_point_radius: int,
landmask: Optional[ndarray] = None,
) -> Union[MaskedArray, ndarray]:
"""
Find grid points where a phenomenon occurs within a defined radius.
The occurrences within this vicinity are maximised, such that all
grid points within the vicinity are recorded as having an occurrence.
For non-binary fields, if the vicinity of two occurrences overlap,
the maximum value within the vicinity is chosen.
If a land-mask has been supplied, process land and sea points
separately.
Args:
grid:
An array of values to which the process is applied.
grid_point_radius:
The radius in grid points about each point within which to
determine the maximum value.
landmask:
A binary grid of the same size as grid that differentiates
between land and sea points to allow the different surface
types to be processed independently.
Returns:
Array where maximum is evaluated over spatial area; values are
spatially spread, so that they're equally likely to have occurred
anywhere within the vicinity defined using the specified radius.
"""
def _apply_max_filter(data, width):
if np.any(np.isnan(data)):
# Fix-me: from scipy version 1.6.0, vectorized_filter method exists
# which can significantly speed up generic_filter methods.
msg = (
"This method utilises the scipy generic_filter which is inefficient"
"for large grids."
)
warnings.warn(msg)
return generic_filter(data, np.nanmax, size=width, mode="nearest")
else:
return maximum_filter(data, size=width, mode="nearest")
# Value, the negative of which is used to fill masked points, ensuring
# that when we take a maximum the masked points do not contribute.
fill_value = -1 * netCDF4.default_fillvals.get(grid.dtype.str[1:], np.inf)
processed_grid = operator_within_vicinity(
_apply_max_filter, fill_value, grid, grid_point_radius, landmask
)
return processed_grid
[docs]
def minimum_within_vicinity(
grid: Union[MaskedArray, ndarray],
grid_point_radius: int,
landmask: Optional[ndarray] = None,
) -> Union[MaskedArray, ndarray]:
"""
Find grid points where a phenomenon occurs within a defined radius.
The occurrences within this vicinity are minimised. For binary fields,
grid points within the vicinity of a non-occurrence are all recorded
as being a non-occurence. For non-binary fields, if the vicinity of
two occurrences overlap, the minimum value within the vicinity is
chosen. If a land-mask has been supplied, process land and sea points
separately.
Args:
grid:
An array of values to which the process is applied.
grid_point_radius:
The radius in grid points about each point within which to
determine the minimum value.
landmask:
A binary grid of the same size as grid that differentiates
between land and sea points to allow the different surface
types to be processed independently.
Returns:
Array where minimum is evaluated over spatial area; values are
spatially spread, so that they're equally likely to have occurred
anywhere within the vicinity defined using the specified radius.
"""
def _apply_min_filter(data, width):
if np.any(np.isnan(data)):
# Fix-me: from scipy version 1.6.0, vectorized_filter method exists
# which can significantly speed up generic_filter methods.
msg = (
"This method utilises the scipy generic_filter which is inefficient"
"for large grids."
)
warnings.warn(msg)
return generic_filter(data, np.nanmin, size=width, mode="nearest")
else:
return minimum_filter(data, size=width, mode="nearest")
# Value, which is used to fill masked points, ensuring that when we
# take a minimum the masked points do not contribute.
fill_value = netCDF4.default_fillvals.get(grid.dtype.str[1:], np.inf)
processed_grid = operator_within_vicinity(
_apply_min_filter, fill_value, grid, grid_point_radius, landmask
)
return processed_grid
[docs]
def mean_within_vicinity(
grid: Union[MaskedArray, ndarray],
grid_point_radius: int,
landmask: Optional[ndarray] = None,
) -> Union[MaskedArray, ndarray]:
"""
Find mean values over grid points within a defined radius.
If a land-mask has been supplied, process land and sea points
separately.
Args:
grid:
An array of values to which the process is applied.
grid_point_radius:
The radius in grid points about each point within which to
determine the mean value.
landmask:
A binary grid of the same size as grid that differentiates
between land and sea points to allow the different surface
types to be processed independently.
Returns:
Array where mean is evaluated over spatial area; values are
centred on each grid looking within the vicinity defined by
the specified radius.
"""
def _apply_mean_filter(data, width):
if np.any(np.isnan(data)):
# Fix-me: from scipy version 1.6.0, vectorized_filter method exists
# which can significantly speed up generic_filter methods.
msg = (
"This method utilises the scipy generic_filter which is inefficient"
"for large grids."
)
warnings.warn(msg)
return generic_filter(data, np.nanmean, size=width, mode="nearest")
else:
return uniform_filter(data, size=width, mode="nearest")
fill_value = np.nan
processed_grid = operator_within_vicinity(
_apply_mean_filter, fill_value, grid, grid_point_radius, landmask
)
return processed_grid
[docs]
def std_within_vicinity(
grid: Union[MaskedArray, ndarray],
grid_point_radius: int,
landmask: Optional[ndarray] = None,
) -> Union[MaskedArray, ndarray]:
"""
Find the standard deviation values over grid points within a defined
radius. If a land-mask has been supplied, process land and sea points
separately.
Args:
grid:
An array of values to which the process is applied.
grid_point_radius:
The radius in grid points about each point within which to
determine the standard deviation.
landmask:
A binary grid of the same size as grid that differentiates
between land and sea points to allow the different surface
types to be processed independently.
Returns:
Array where standard deviation is evaluated over spatial area;
values are centred on each grid looking within the vicinity defined
by the specified radius.
"""
def _apply_std_filter(data, width):
if np.any(np.isnan(data)):
# Fix-me: from scipy version 1.6.0, vectorized_filter method exists
# which can significantly speed up generic_filter methods.
msg = (
"This method utilises the scipy generic_filter which is inefficient"
"for large grids."
)
warnings.warn(msg)
return generic_filter(data, np.nanstd, size=width, mode="nearest")
else:
# Fix-me: from scipy version 1.6.0, vectorized_filter method exists
# which can significantly speed up generic_filter methods.
return generic_filter(data, np.std, size=width, mode="nearest")
fill_value = np.nan
processed_grid = operator_within_vicinity(
_apply_std_filter, fill_value, grid, grid_point_radius, landmask
)
return processed_grid
[docs]
def rename_vicinity_cube(cube: Cube, new_name: str = None):
"""
Rename a cube in place to indicate the cube has been vicinity processed.
Args:
cube:
Cube to be renamed.
"""
if new_name is not None:
cube.rename(new_name)
else:
if is_probability(cube):
cube.rename(in_vicinity_name_format(cube.name()))
else:
cube.rename(f"{cube.name()}_in_vicinity")
[docs]
def create_vicinity_coord(
radius: Union[float, int], native_grid_point_radius: bool = False
) -> AuxCoord:
"""
Create a coordinate that records the vicinity radius passed in.
This radius may be a distance in physical units, or if
native_grid_point_radius is True, it will be a number of grid cells.
If the latter an attribute comment is added to note that the radius
is in grid cells.
Args:
radius:
The radius as a physical distance or number of grid points, the
value of which is recorded in the coordinate.
native_grid_point_radius:
If set to True the radius is provided a a number of grid points
and the metadata created will reflect that.
"""
if native_grid_point_radius:
point = np.array(radius, dtype=np.float32)
units = "1"
attributes = {
"comment": "Units of 1 indicate radius of vicinity is defined "
"in grid points rather than physical distance"
}
else:
point = np.array(radius, dtype=FLOAT_DTYPE)
units = "m"
attributes = {}
coord = AuxCoord(
point, units=units, long_name="radius_of_vicinity", attributes=attributes
)
return coord
[docs]
def set_vicinity_cell_method(cube: Cube, operation: str) -> None:
"""
Add cell method to the cube to describe the vicinity operation applied.
Args:
cube:
Cube to which to add the cell method.
operation:
The operation type that has been applied to the spatial dimensions
of the cube through the in-vicinity calculations.
"""
cube.add_cell_method(CellMethod(method=operation, coords="area"))
[docs]
class OccurrenceWithinVicinity(PostProcessingPlugin):
"""Calculate whether a phenomenon occurs within the specified radii about
a point. These radii can be given in metres, or as numbers of grid points.
Each radius provided will result in a distinct output, with these demarked
using a `radius_of_vicinity` coordinate on the resulting cube. If a single
radius is provided, this will be a scalar coordinate.
Radii in metres may be used with data on a equal areas projection only.
Grid_point_radii will work with any projection, with caveats.
.. Further information is available in:
.. include:: extended_documentation/utilities/spatial/
occurrence_within_vicinity.rst
"""
SUPPORTED_VICINITY_OPERATORS = {
"max": maximum_within_vicinity,
"min": minimum_within_vicinity,
"mean": mean_within_vicinity,
"std": std_within_vicinity,
}
SUPPORTED_VICINITY_CELL_METHODS = {
"max": "maximum",
"min": "minimum",
"mean": "mean",
"std": "standard_deviation",
}
[docs]
def __init__(
self,
radii: Optional[List[Union[float, int]]] = None,
grid_point_radii: Optional[List[Union[float, int]]] = None,
land_mask_cube: Cube = None,
new_name: str = None,
operator: str = "max",
) -> None:
"""
Args:
radii:
A list of radii in metres used to define the vicinities within
which to search for occurrences.
grid_point_radii:
Alternatively, a list of numbers of grid points that define the
vicinity radii over which to search for occurrences. Only one of
radii or grid_point_radii should be set.
land_mask_cube:
Binary land-sea mask data. True for land-points, False for sea.
Restricts in-vicinity processing to only include points of a
like mask value.
new_name:
New name to give to the resultant cube
Operator:
Operator to evaluate over the vicinities. Defaults to max.
Raises:
ValueError: If both radii and grid point radii are set.
ValueError: If neither radii or grid point radii are set.
ValueError: If a provided vicinity radius is negative.
ValueError: Land mask not named land_binary_mask.
ValueError: If operator not in SUPPORTED_VICINITY_OPERATORS.
"""
if radii:
radii = [float(x) for x in radii]
if radii and grid_point_radii:
raise ValueError(
"Vicinity processing requires that only one of radii or "
"grid_point_radii should be set"
)
if not radii and not grid_point_radii:
raise ValueError(
"Vicinity processing requires that one of radii or "
"grid_point_radii should be set to a non-zero value"
)
if (radii and any(np.array(radii) < 0)) or (
grid_point_radii and any(np.array(grid_point_radii) < 0)
):
raise ValueError(
"Vicinity processing requires only positive vicinity radii"
)
self.radii = radii if radii else grid_point_radii
self.native_grid_point_radius = False if radii else True
if land_mask_cube:
if land_mask_cube.name() != "land_binary_mask":
raise ValueError(
f"Expected land_mask_cube to be called land_binary_mask, "
f"not {land_mask_cube.name()}"
)
self.land_mask = np.where(land_mask_cube.data >= 0.5, True, False)
else:
self.land_mask = None
self.land_mask_cube = land_mask_cube
if operator not in OccurrenceWithinVicinity.SUPPORTED_VICINITY_OPERATORS.keys():
raise ValueError("Unsupported operator to apply over vicinity.")
else:
self.vicinity_operator = (
OccurrenceWithinVicinity.SUPPORTED_VICINITY_OPERATORS[operator]
)
self.cell_method = OccurrenceWithinVicinity.SUPPORTED_VICINITY_CELL_METHODS[
operator
]
[docs]
def process(self, cube: Cube, new_name: str = None) -> Cube:
"""
Produces the vicinity processed data. The input data is sliced to
yield y-x slices to which the <operator>_within_vicinity method is applied.
The different vicinity radii (if multiple) are looped over and a
coordinate recording the radius used is added to each resulting cube.
A single cube is returned with the leading coordinates of the input cube
preserved. If a single vicinity radius is provided, a new scalar
radius_of_vicinity coordinate will be found on the returned cube. If
multiple radii are provided, this coordinate will be a dimension
coordinate following any probabilistic / realization coordinates.
Note that when applying the mean operator to thresholded data, this plugin
will produce output equivalent to the NeighbourhoodProcessing plugin
(disregarding the boundary), but less efficiently (particularly when
applied to masked or datasets containing NaNs). For such cases, it is
recommended that one use the NeighbourhoodProcessing plugin instead.
Args:
cube:
Thresholded cube.
new_name:
Name to assign to the resultant cube after calculating the vicinity
values for the specified operator. Where no value is provided, the
cube will retain the same name as the input cube.
Returns:
Cube containing the occurrences within a vicinity for each radius,
calculated for each yx slice, which have been merged to yield a
single cube.
Raises:
ValueError: Cube and land mask have differing spatial coordinates.
"""
if self.land_mask_cube and not spatial_coords_match(
[cube, self.land_mask_cube]
):
raise ValueError(
"Supplied cube do not have the same spatial coordinates and land mask"
)
if not self.native_grid_point_radius:
grid_point_radii = [
distance_to_number_of_grid_cells(cube, radius) for radius in self.radii
]
else:
grid_point_radii = self.radii
radii_cubes = CubeList()
# List of non-spatial dimensions to restore as leading on the output.
leading_dimensions = [
crd.name() for crd in cube.coords(dim_coords=True) if not crd.coord_system
]
for radius, grid_point_radius in zip(self.radii, grid_point_radii):
vicinity_cubes = CubeList([])
for cube_slice in cube.slices([cube.coord(axis="y"), cube.coord(axis="x")]):
result = cube_slice.copy(
data=self.vicinity_operator(
cube_slice.data, grid_point_radius, self.land_mask
)
)
vicinity_cubes.append(result)
result_cube = vicinity_cubes.merge_cube()
# Put dimensions back if they were there before.
result_cube = check_cube_coordinates(cube, result_cube)
# Add a coordinate recording the vicinity radius applied to the data.
vic_coord = create_vicinity_coord(radius, self.native_grid_point_radius)
result_cube.add_aux_coord(vic_coord)
radii_cubes.append(result_cube)
# Merge cubes produced for each vicinity radius.
result_cube = radii_cubes.merge_cube()
# Rename the variable if a new_name argument has been set.
# Set cube name to reflect vicinity processing.
rename_vicinity_cube(result_cube, new_name)
# Enforce order of leading dimensions on the output to match the input.
enforce_coordinate_ordering(result_cube, leading_dimensions)
# Add cell method to describe the vicinity operation applied.
set_vicinity_cell_method(result_cube, operation=self.cell_method)
return result_cube
[docs]
def lat_lon_determine(cube: Cube) -> Optional[CRS]:
"""
Test whether a diagnostic cube is on a latitude/longitude grid or uses an
alternative projection.
Args:
cube:
A diagnostic cube to examine for coordinate system.
Returns:
Coordinate system of the diagnostic cube in a cartopy format unless
it is already a latitude/longitude grid, in which case None is
returned.
"""
trg_crs = None
if (
not cube.coord(axis="x").name() == "longitude"
or not cube.coord(axis="y").name() == "latitude"
):
trg_crs = cube.coord_system().as_cartopy_crs()
return trg_crs
[docs]
def get_grid_y_x_values(cube: Cube) -> Tuple[ndarray, ndarray]:
"""Extract the y and x coordinate values of each points in the cube.
The result is defined over the spatial grid, of shape (ny, nx) where
ny is the length of the y-axis coordinate and nx the length of the
x-axis coordinate.
Args:
cube:
Cube with points to extract
Returns:
- Array of shape (ny, nx) containing y coordinate values
- Array of shape (ny, nx) containing x coordinate values
"""
x_points = cube.coord(axis="x").points
y_points = cube.coord(axis="y").points
nx = len(x_points)
ny = len(y_points)
x_zeros = np.zeros_like(x_points)
y_zeros = np.zeros_like(y_points)
# Broadcast x points and y points onto grid
all_x_points = y_zeros.reshape(ny, 1) + x_points.reshape(1, nx)
all_y_points = y_points.reshape(ny, 1) + x_zeros.reshape(1, nx)
return all_y_points, all_x_points
[docs]
def update_name_and_vicinity_coord(cube: Cube, new_name: str, vicinity_radius: float):
"""
Updates a cube with a new probabilistic-style name and replaces or adds a radius_of_vicinity
coord with the specified radius.
Args:
cube: Cube to be updated in-place
new_name: The new name to be applied to the Cube, the threshold coord and any related
cell methods.
vicinity_radius: The point value for the radius_of_vicinity coord. The units are assumed
to be the same as the x and y spatial coords of the Cube
"""
if new_name:
update_diagnostic_name(cube, new_name, cube)
if vicinity_radius:
# The cube blending will drop the radius_of_vicinity coord if the source cubes have
# differing points. We can use this to determine whether the vicinities matched:
vicinities_matched = "radius_of_vicinity" in [
coord.name() for coord in cube.coords()
]
if vicinities_matched:
cube.remove_coord("radius_of_vicinity")
add_vicinity_coordinate(
cube, vicinity_radius, radius_is_max=not vicinities_matched
)
[docs]
def add_vicinity_coordinate(
cube: Cube,
radius: Union[float, int],
native_grid_point_radius: bool = False,
radius_is_max: bool = False,
) -> None:
"""
Add a coordinate to the cube that records the vicinity radius that
has been applied to the data.
Args:
cube:
Vicinity processed cube.
radius:
The radius as a physical distance (m) or number of grid points, the
value of which is recorded in the coordinate.
native_grid_point_radius:
True if radius is "number of grid points", else False
radius_is_max:
True if the specified radius represents a maximum value from the source data. A
comment is associated with the coord in this case.
"""
attributes = {}
if radius_is_max:
attributes["comment"] = "Maximum"
if native_grid_point_radius:
point = np.array(radius, dtype=np.float32)
units = "1"
comment = (
"Units of 1 indicate radius of vicinity is defined "
"in grid points rather than physical distance"
)
attributes["comment"] = "; ".join(
[n for n in [attributes.get("comment", None), comment] if n]
)
else:
point = np.array(radius, dtype=np.float32)
units = "m"
coord = AuxCoord(
point, units=units, long_name="radius_of_vicinity", attributes=attributes
)
cube.add_aux_coord(coord)