Source code for improver.utilities.cube_extraction

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Utilities to parse a list of constraints and extract matching subcube"""

from ast import literal_eval
from typing import Callable, Dict, List, Optional, Tuple, Union

import numpy as np
from iris import Constraint
from iris.cube import Cube, CubeList
from iris.exceptions import CoordinateNotFoundError

from improver import BasePlugin
from improver.metadata.constants import FLOAT_DTYPE
from improver.metadata.utilities import minimum_increment
from improver.utilities.common_input_handle import as_cube
from improver.utilities.cube_constraints import create_sorted_lambda_constraint
from improver.utilities.cube_manipulation import get_dim_coord_names


[docs] def parse_range_string_to_dict(value: str) -> Dict[str, str]: """ Splits up a string in the form "[min:max:step]" into a list of [min, max, step]. Args: value: A string containing the range information. It is assumed that the input value is of the form: "[2:10]". Returns: A list containing the min and max (and step). """ value = value.replace("[", "").replace("]", "").split(":") return dict(zip(["min", "max", "step"], value))
[docs] def create_constraint(value: Union[float, List[float]]) -> Union[Callable, List[int]]: """ Constructs an appropriate constraint for matching numerical values if they are floating point. If not, the original values are returned as a list (even if they were single valued on entry). Args: value: Constraint values that are being used to match against values in a cube for the purposes of extracting elements of the cube. Returns: If the input value(s) are floating point this function returns a lambda function that will enable for approximate matching to ensure they can be matched to cube values. If the inputs are int or non-numeric, they will be returned unchanged, except for single values that will have become lists. """ if not isinstance(value, list): value = [value] if np.issubdtype(np.array(value).dtype, np.number): return lambda cell: any(np.isclose(cell.point, value)) return value
[docs] def parse_constraint_list( constraints: List[str], units: Optional[List[str]] = None ) -> Tuple[Constraint, Optional[Dict], Optional[float], Optional[Dict]]: """ For simple constraints of a key=value format, these are passed in as a list of strings and converted to key-value pairs prior to creating the constraints. For more complex constraints, the list of strings given as input are evaluated by parsing for specific identifiers and then the constraints are created as required. The simple key-value pairs and other constraints are merged into a single constraint. Args: constraints: List of string constraints with keys and values split by "=": e.g: ["kw1=val1", "kw2 = val2", "kw3=val3"], where the vals could include ranges e.g. [0:20] or ranges with a step value e.g. [0:20:3]. units: List of units (as strings) corresponding to each coordinate in the list of constraints. One or more "units" may be None, and units may only be associated with coordinate constraints. Returns: - A combination of all the constraints that were supplied. - A dictionary of unit keys and values - A list containing the min and max values for a longitude constraint - A dictionary of coordinate and the step value, i.e. a step of 2 will skip every other point """ if units is None: list_units = len(constraints) * [None] units_dict = None else: if len(units) != len(constraints): msg = "units list must match constraints" raise ValueError(msg) list_units = units units_dict = {} simple_constraints_dict = {} complex_constraints = [] longitude_constraint = None thinning_values = {} for constraint_pair, unit_val in zip(constraints, list_units): key, value = constraint_pair.split("=", 1) key = key.strip(" ") value = value.strip(" ") if ":" in value: range_dict = parse_range_string_to_dict(value) # longitude is a circular coordinate, so needs to be treated in a # different way to a normal constraint if key == "longitude": longitude_constraint = [ FLOAT_DTYPE(range_dict[k]) for k in ["min", "max"] ] else: complex_constraints.append( create_sorted_lambda_constraint( key, [range_dict["min"], range_dict["max"]] ) ) if range_dict.get("step", None): thinning_values[key] = int(range_dict["step"]) else: try: typed_value = literal_eval(value) except ValueError: simple_constraints_dict[key] = value else: simple_constraints_dict[key] = create_constraint(typed_value) if unit_val is not None and unit_val.capitalize() != "None": units_dict[key] = unit_val.strip(" ") if simple_constraints_dict: simple_constraints = Constraint(**simple_constraints_dict) else: simple_constraints = None constraints = simple_constraints for constr in complex_constraints: constraints = constraints & constr return constraints, units_dict, longitude_constraint, thinning_values
[docs] def apply_extraction( cube: Cube, constraint: Constraint, units: Optional[Dict] = None, use_original_units: bool = True, longitude_constraint: Optional[List] = None, ) -> Cube: """ Using a set of constraints, extract a subcube from the provided cube if it is available. Args: cube: The cube from which a subcube is to be extracted. constraint: The constraint or ConstraintCombination that will be used to extract a subcube from the input cube. units: A dictionary of units for the constraints. Supplied if any coordinate constraints are provided in different units from those of the input cube (eg precip in mm/h for cube threshold in m/s). use_original_units: Boolean to state whether the coordinates used in the extraction should be converted back to their original units. The default is True, indicating that the units should be converted back to the original units. longitude_constraint: List containing the min and max values for the longitude. This has to be treated separately to the normal constraints due to the circular nature of longitude. Returns: A single cube matching the input constraints, or None if no subcube is found within cube that matches the constraints. """ if units is None: output_cube = cube.extract(constraint) else: original_units = {} for coord in units.keys(): original_units[coord] = cube.coord(coord).units cube.coord(coord).convert_units(units[coord]) output_cube = cube.extract(constraint) if use_original_units: for coord in original_units: cube.coord(coord).convert_units(original_units[coord]) try: output_cube.coord(coord).convert_units(original_units[coord]) except AttributeError: # an empty output cube (None) is handled by the CLI pass if longitude_constraint: output_cube = output_cube.intersection( longitude=longitude_constraint, ignore_bounds=True ) # TODO: Below can be removed when https://github.com/SciTools/iris/issues/4119 # is fixed output_cube.coord("longitude").points = output_cube.coord( "longitude" ).points.astype(FLOAT_DTYPE) if output_cube.coord("longitude").bounds is not None: output_cube.coord("longitude").bounds = output_cube.coord( "longitude" ).bounds.astype(FLOAT_DTYPE) return output_cube
[docs] def extract_subcube( cube: Cube, constraints: List[str], units: Optional[List[str]] = None, use_original_units: bool = True, ) -> Optional[Cube]: """ Using a set of constraints, extract a subcube from the provided cube if it is available. Args: cube: The cube from which a subcube is to be extracted. constraints: List of string constraints with keys and values split by "=": e.g: ["kw1=val1", "kw2 = val2", "kw3=val3"]. units: List of units (as strings) corresponding to each coordinate in the list of constraints. One or more "units" may be None, and units may only be associated with coordinate constraints. use_original_units: Boolean to state whether the coordinates used in the extraction should be converted back to their original units. The default is True, indicating that the units should be converted back to the original units. Returns: A single cube matching the input constraints, or None if no subcube is found within cube that matches the constraints. """ constraints, units, longitude_constraint, thinning_dict = parse_constraint_list( constraints, units=units ) output_cube = apply_extraction( cube, constraints, units, use_original_units, longitude_constraint ) if thinning_dict: output_cube = thin_cube(output_cube, thinning_dict) return output_cube
[docs] class ExtractSubCube(BasePlugin): """Extract a subcube from the provided cube, given constraints."""
[docs] def __init__( self, constraints: List[str], units: Optional[List[str]] = None, use_original_units: bool = True, ignore_failure: bool = False, ) -> None: """ Set up the ExtractSubCube plugin. Args: constraints: List of string constraints with keys and values split by "=": e.g: ["kw1=val1", "kw2 = val2", "kw3=val3"]. units: List of units (as strings) corresponding to each coordinate in the list of constraints. One or more "units" may be None, and units may only be associated with coordinate constraints. use_original_units: Boolean to state whether the coordinates used in the extraction should be converted back to their original units. The default is True, indicating that the units should be converted back to the original units. ignore_failure: Option to ignore constraint match failure and return the input cube. """ self._constraints = constraints self._units = units self._use_original_units = use_original_units self._ignore_failure = ignore_failure
[docs] def process(self, cube: Cube): """Perform the subcube extraction. Args: cube: The cube from which a subcube is to be extracted. Returns: A single cube matching the input constraints. Raises: ValueError: If the constraint(s) could not be matched to the input cube. """ cube = as_cube(cube) res = extract_subcube( cube, self._constraints, self._units, self._use_original_units ) if res is None: res = cube if not self._ignore_failure: raise ValueError("Constraint(s) could not be matched in input cube") return res
[docs] def thin_cube(cube: Cube, thinning_dict: Dict[str, int]) -> Cube: """ Thin the coordinate by taking every X points, defined in the thinning dict as {coordinate: X} Args: cube: The cube containing the coordinates to be thinned. thinning_dict: A dictionary of coordinate and the step value, i.e. a step of 2 will skip every other point Returns: A cube with thinned coordinates. """ coord_names = get_dim_coord_names(cube) slices = [slice(None, None, None)] * len(coord_names) for key, val in thinning_dict.items(): slices[coord_names.index(key)] = slice(None, None, val) return cube[tuple(slices)]
[docs] def cubelist_extract(cubes: CubeList, name: str): """Extract a cube from a CubeList based on provided name. Args: cubes: A CubeList. name: Name of the cube to be extracted. Returns: A single cube matching the input name. """ return cubes.extract_cube(name)
[docs] class ExtractLevel(BasePlugin): """ Class for extracting a pressure or height surface from data-on-levels. """
[docs] def __init__(self, positive_correlation: bool, value_of_level: float): """Sets up Class Args: positive_correlation: Set to True when the variable generally increases as pressure increase or when the variable generally increases as height decreases. value_of_level: The value of the input cube for which the pressure or height level is required """ self.positive_correlation = positive_correlation self.value_of_level = value_of_level self.coordinate = None
[docs] def coordinate_grid(self, variable_on_levels: Cube) -> np.ndarray: """Creates a pressure or height grid of the same shape as variable_on_levels cube. It is populated at every grid square and for every realization with a column of all pressure or height levels taken from variable_on_levels's pressure or height coordinate Args: variable_on_levels: Cube of some variable with pressure or height levels Returns: An n dimensional array with the same dimensions as variable_on_levels containing, at every grid square and for every realization, a column of all pressure or height levels taken from variable_on_levels's pressure or height coordinate """ required_shape = variable_on_levels.shape coordinate_points = variable_on_levels.coord(self.coordinate).points (coordinate_axis,) = variable_on_levels.coord_dims(self.coordinate) coordinate_shape = np.ones_like(required_shape) coordinate_shape[coordinate_axis] = required_shape[coordinate_axis] coordinate_array = np.broadcast_to( coordinate_points.reshape(coordinate_shape), required_shape ) return coordinate_array
[docs] def fill_invalid(self, cube: Cube): """Populate any invalid values in the source data with the neighbouring value in that column plus a small difference (determined by the least_significant_digit attribute, or 10^-2). This results in columns that do not have repeated values as repeated values confuse the stratify.interpolate method. Args: cube: Cube of variable on levels (3D) (modified in-place). """ if np.isfinite(cube.data).all() and not np.ma.is_masked(cube.data): return data = np.ma.masked_invalid(cube.data) (coordinate_axis,) = cube.coord_dims(self.coordinate) coordinate_points = cube.coord(self.coordinate).points # Find the least significant increment to use as an offset for filling missing values. # We don't really care so long as it is non-zero and has the same sign. increasing_order = np.all(np.diff(cube.coord(self.coordinate).points) > 0) sign = 1 if self.positive_correlation == increasing_order else -1 v_increment = sign * minimum_increment(cube, default=0.01) self._one_way_fill( data, coordinate_axis, coordinate_points, v_increment, reverse=True ) cube.data = data if np.isfinite(cube.data).all() and not np.ma.is_masked(cube.data): # We've filled everything in. No need to try again. return self._one_way_fill( data, coordinate_axis, coordinate_points, v_increment, reverse=False ) cube.data = data
[docs] @staticmethod def _one_way_fill( data: np.ma.MaskedArray, coordinate_axis: int, coordinate_points: np.ndarray, v_increment: np.ndarray, reverse: bool = False, ): """ Scans through the pressure or height axis forwards or backwards, filling any missing data with the previous value plus (or minus in reverse) the specified increment. Running this in both directions will therefore populate all columns so long as there is at least one valid data point to start with. """ last_slice = [slice(None)] * data.ndim if reverse: local_increment = -v_increment first = len(coordinate_points) - 2 last = -1 step = -1 last_slice[coordinate_axis] = slice( len(coordinate_points) - 1, len(coordinate_points) ) else: local_increment = v_increment first = 1 last = len(coordinate_points) step = 1 last_slice[coordinate_axis] = slice(0, 1) for c in range(first, last, step): c_slice = [slice(None)] * data.ndim c_slice[coordinate_axis] = slice(c, c + 1) data[*c_slice] = np.ma.where( data.mask[*c_slice], data[*last_slice] + local_increment, data[*c_slice] ) last_slice = c_slice
[docs] def _make_template_cube( self, result_data: np.array, variable_on_levels: Cube ) -> Cube: """Creates a cube of the variable on a pressure or height level based on the input cube""" template_cube = next(variable_on_levels.slices_over([self.coordinate])).copy( result_data ) if self.coordinate == "pressure": template_cube.rename( "pressure_of_atmosphere_at_" f"{self.value_of_level}{variable_on_levels.units}" ) else: template_cube.rename( "height_at_" f"{self.value_of_level}{variable_on_levels.units}" ) template_cube.units = variable_on_levels.coord(self.coordinate).units template_cube.remove_coord(self.coordinate) return template_cube
[docs] def process(self, variable_on_levels: Cube) -> Cube: """Extracts the pressure or height level (depending on which is present on the cube) where the environment variable first intersects self.value_of_level starting at a pressure or height value near the surface and ascending in altitude from there. Where the surface falls outside the available data, the maximum or minimum of the surface will be returned, even if the source data has no value at that point. Args: variable_on_levels: A cube of data on pressure or height levels Returns: A cube of the environment pressure or height at self.value_of_level Raises: NotImplementError: If variable_on_levels has both a height and pressure coordinate """ from stratify import EXTRAPOLATE_NEAREST, interpolate coord_list = [coord.name() for coord in variable_on_levels.coords()] if "height" in coord_list and "pressure" in coord_list: raise NotImplementedError( """Input Cube has both a pressure and height coordinate. Only one of these should be present on the cube.""" ) try: self.coordinate = variable_on_levels.coord("pressure").name() except CoordinateNotFoundError: self.coordinate = variable_on_levels.coord("height").name() self.fill_invalid(variable_on_levels) if "realization" in [c.name() for c in variable_on_levels.dim_coords]: slicer = variable_on_levels.slices_over((["realization"])) one_slice = variable_on_levels.slices_over((["realization"])).next() has_r_coord = True else: slicer = [variable_on_levels] one_slice = variable_on_levels has_r_coord = False grid = self.coordinate_grid(one_slice).astype(np.float32) (coordinate_axis,) = one_slice.coord_dims(self.coordinate) result_data = np.empty_like( variable_on_levels.slices_over(([self.coordinate])).next().data ) for i, zyx_slice in enumerate(slicer): interp_data = interpolate( np.array([self.value_of_level], dtype=np.float32), zyx_slice.data.data, grid, axis=coordinate_axis, rising=False, extrapolation=EXTRAPOLATE_NEAREST, ).squeeze(axis=coordinate_axis) if has_r_coord: result_data[i] = interp_data else: result_data = interp_data output_cube = self._make_template_cube(result_data, variable_on_levels) return output_cube