Source code for improver.wind_calculations.wind_downscaling

# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""Module containing wind downscaling plugins."""

import copy
import itertools
from typing import Optional, Tuple, Union

import iris
import numpy as np
from cf_units import Unit
from iris.cube import Cube
from iris.exceptions import CoordinateNotFoundError
from numpy import ndarray

from improver import BasePlugin, PostProcessingPlugin
from improver.constants import RMDI

# Scale parameter to determine reference height
ABSOLUTE_CORRECTION_TOL = 0.04

# Scaling parameter to determine reference height
HREF_SCALE = 2.0

# Von Karman's constant
VONKARMAN = 0.4

# Default roughness length for sea points
Z0M_SEA = 0.0001


[docs] class FrictionVelocity(BasePlugin): """Class to calculate the friction velocity. This holds the function to calculate the friction velocity u_star, given a reference height h_ref, the velocity at the reference height u_href and the surface roughness z_0. """
[docs] def __init__( self, u_href: ndarray, h_ref: ndarray, z_0: ndarray, mask: ndarray ) -> None: """Initialise the class. Args: u_href: A 2D array of float32 for the wind speed at h_ref h_ref: A 2D array of float32 for the reference heights z_0: A 2D array of float32 for the vegetative roughness lengths mask: A 2D array of booleans where True indicates calculate u* Notes: * z_0 and h_ref need to have identical units. * the calculated friction velocity will have the units of the supplied velocity u_href. """ self.u_href = u_href self.h_ref = h_ref self.z_0 = z_0 self.mask = mask # Check that input cubes are the same size array_sizes = [np.size(u_href), np.size(h_ref), np.size(z_0), np.size(mask)] if not all(x == array_sizes[0] for x in array_sizes): raise ValueError("Different size input arrays u_href, h_ref, z_0, mask")
[docs] def process(self) -> ndarray: """Function to calculate the friction velocity. ustar = K * (u_href / ln(h_ref / z_0)) where ustar is the friction velocity, K is Von Karman's constant, u_ref is the wind speed at the reference height, h_ref is the reference height and z_0 is the vegetative roughness length. Returns: A 2D array of float32 friction velocities """ ustar = np.full(self.u_href.shape, RMDI, dtype=np.float32) numerator = self.u_href[self.mask] with np.errstate(invalid="ignore"): denominator = np.log(self.h_ref[self.mask] / self.z_0[self.mask]) ustar[self.mask] = VONKARMAN * (numerator / denominator) return ustar
[docs] class RoughnessCorrectionUtilities: """Class to calculate the height and roughness wind corrections. This holds functions to calculate the roughness and height corrections given the ancil files: * standard deviation of height in grid cell as sigma (model grid on pp grid) * Silhouette roughness as a_over_s (model grid on pp grid) * vegetative roughness length z_0 (model grid on pp grid) * post-processing grid orography pporo * model grid orography interpolated on post-processing grid modoro * height level 3D/ 1D grid * windspeed 3D field on height level 3D grid (from above). """
[docs] def __init__( self, a_over_s: ndarray, sigma: ndarray, z_0: ndarray, pporo: ndarray, modoro: ndarray, ppres: float, modres: float, ) -> None: """Set up roughness and height correction. This sets up the parameters used for roughness and height correction given the ancillary file inputs: Args: a_over_s: 2D array float32 - Silhouette roughness field, dimensionless ancillary data, calculated according to Robinson, D. (2008) - Ancillary file creation for the UM, Unified Model Documentation Paper 73. sigma: 2D array float32 - Standard deviation field of height in the grid cell, units of length z_0: 2D array float32 - Vegetative roughness height field, units of length pporo: 2D array float32 - Post processing grid orography field modoro: 2D array float32 - Model orography field interpolated to post processing grid ppres: Float - Grid cell length of post processing grid modres: Float - Grid cell length of model grid """ self.a_over_s = a_over_s self.z_0 = z_0 self.pporo = pporo self.modoro = modoro self.h_over_2 = self.sigma2hover2(sigma) # half peak to trough height self.hcmask, self.rcmask = self._setmask() # HC mask, RC mask if self.z_0 is not None: self.z_0[z_0 <= 0] = Z0M_SEA self.dx_min = ppres / 2.0 # scales smaller than this not resolved in pp # the original code had hardcoded 500 self.dx_max = 3.0 * modres # scales larger than this resolved in model # the original code had hardcoded 4000 self.wavenum = self._calc_wav() # k = 2*pi / L self.h_ref = self._calc_h_ref() self._refinemask() # HC mask needs to be updated for missing orography self.h_at0 = self._delta_height() # pp orography - model orography
[docs] def _refinemask(self) -> None: """Remask over RMDI and NaN orography. The mask for HC needs to be False where either of the orographies (model or pp) has an invalid number. This cannot be done before because the mask is used to calculate the wavenumber which can and should be calculated for all points where h_over_2 and a_over_s is a valid number. """ self.hcmask[self.pporo == RMDI] = False self.hcmask[self.modoro == RMDI] = False self.hcmask[np.isnan(self.pporo)] = False self.hcmask[np.isnan(self.modoro)] = False
[docs] def _setmask(self) -> Tuple[ndarray, ndarray]: """Create a ~land-sea mask. Create a mask that is basically a land-sea mask: Both, the standard deviation and the silouette roughness, are 0 over the sea. A standard deviation of 0 results in a RMDI for h_over_2. Returns: - 2D array of booleans- True for land-points, false for Sea (HC) - 2D array of booleans- additionally False for invalid z_0 (RC) """ hcmask = np.full(self.h_over_2.shape, True, dtype=bool) hcmask[self.h_over_2 <= 0] = False hcmask[self.a_over_s <= 0] = False hcmask[np.isnan(self.h_over_2)] = False hcmask[np.isnan(self.a_over_s)] = False rcmask = np.copy(hcmask) if self.z_0 is not None: rcmask[self.z_0 <= 0] = False rcmask[np.isnan(self.z_0)] = False return hcmask, rcmask
[docs] @staticmethod def sigma2hover2(sigma: ndarray) -> ndarray: """Calculate the half peak-to-trough height. The ancillary data used to estimate the peak to trough height contains the standard deviation of height in a cell. For sine-waves, this relates to the amplitude of the wave as: Amplitude = sigma * sqrt(2) The amplitude would correspond to half the peak-to-trough height (h_o_2). Args: sigma: 2D array of float32 - standard deviation of height in grid cell. Returns: 2D array of float32 - half peak-to-trough height. Comments: Points that had sigma = 0 (i.e. sea points) are set to RMDI. """ h_o_2 = np.full(sigma.shape, RMDI, dtype=np.float32) h_o_2[sigma > 0] = sigma[sigma > 0] * np.sqrt(2.0) return h_o_2
[docs] def _calc_wav(self) -> ndarray: """Calculate wavenumber k of typical orographic lengthscale. Function to calculate wavenumber k of typical orographic lengthscale L: .. math:: :label: k = 2 * \\pi / L L is approximated from half the peak-to-trough height h_over_2 and the silhoutte roughness a_over_s (average of up-slopes per unit length over several cross-sections through a grid cell) as: .. math:: :label: L = 2 * \\rm{h\\_over\\_2} / \\rm{a\\_over\\_s} a_over_s is dimensionless since it is the sum of up-slopes measured in the same unit lengths as it is calculated over. h_over_2 is calculated from the standard deviation of height in a grid cell, sigma, as: .. math:: :label: \\rm{h\\_over\\_2} = \\sqrt{2} * \\rm{sigma} which is based on the assumptions of sine waves, see sigma2hover2. From eq. (1) and (2) it follows that: .. math:: :label: k = 2*\\pi / (2 * \\rm{h\\_over\\_2} / \\rm{a\\_over\\_s)} = \\rm{a\\_over\\_s} * \\pi / \\rm{h\\_over\\_2} Returns: 2D array float32 - wavenumber in units of inverse units of supplied h_over_2. """ wavn = np.full(self.a_over_s.shape, RMDI, dtype=np.float32) wavn[self.hcmask] = (self.a_over_s[self.hcmask] * np.pi) / self.h_over_2[ self.hcmask ] wavn[wavn > np.pi / self.dx_min] = np.pi / self.dx_min wavn[self.h_over_2 == 0] = RMDI wavn[abs(wavn) < np.pi / self.dx_max] = np.pi / self.dx_max return wavn
[docs] def _calc_h_ref(self) -> ndarray: """Calculate the reference height for roughness correction. The reference height below which the flow is in equilibrium with the vegetative roughness is proportional to 1/wavenum (Howard & Clark, 2007). Vosper (2009) and Clark (2009) argue that at the reference height, the perturbation should have decayed to a fraction epsilon (ABSOLUTE_CORRECTION_TOL). The factor alpha implements eq. 1.3 in Clark (2009): UK Climatology - Wind Screening Tool. See also Vosper (2009) for a motivation. For a freely available external reference, see the Virtual Met Mast Version 1 Methodology and Verification paper under www.thecrownestate.co.uk. alpha is the log of scale parameter to determine reference height which is currently set to 0.04 (this corresponds to epsilon in both Vosper and Clark) Returns: 2D array float32 - reference height for roughness correction """ alpha = -np.log(ABSOLUTE_CORRECTION_TOL) tunable_param = np.full(self.wavenum.shape, RMDI, dtype=np.float32) h_ref = np.full(self.wavenum.shape, RMDI, dtype=np.float32) tunable_param[self.hcmask] = alpha + np.log( self.wavenum[self.hcmask] * self.h_over_2[self.hcmask] ) tunable_param[tunable_param > 1.0] = 1.0 tunable_param[tunable_param < 0.0] = 0.0 h_ref[self.hcmask] = tunable_param[self.hcmask] / self.wavenum[self.hcmask] h_ref[h_ref < 1.0] = 1.0 h_ref = np.minimum(h_ref, HREF_SCALE * self.h_over_2) h_ref[h_ref < 1.0] = 1.0 h_ref[~self.hcmask] = 0.0 return h_ref
[docs] def calc_roughness_correction( self, hgrid: ndarray, uold: ndarray, mask: ndarray ) -> ndarray: """Function to perform the roughness correction. Args: hgrid: 3D or 1D array float32 - height above orography uold: 3D array float32 - original velocities at hgrid. mask: 2D array of bools that is True for land-points, False for Sea and False for invalid z_0. Returns: 3D np.array float32 - Corrected wind speed on hgrid. Above href, this is equal to uold. Comments: Replace the windspeed profile below the reference height with one that increases logarithmically with height, bound by the original velocity uhref at the reference height h_ref and by a 0 velocity at the vegetative roughness height z_0 """ uhref = self._calc_u_at_h(uold, hgrid, self.h_ref, mask) if hgrid.ndim == 1: hgrid = hgrid[np.newaxis, np.newaxis, :] ustar = FrictionVelocity(uhref, self.h_ref, self.z_0, mask)() unew = np.copy(uold) mhref = self.h_ref mhref[~mask] = RMDI cond = hgrid < self.h_ref[:, :, np.newaxis] # Create array of ones. arr_ones = np.ones(unew.shape, dtype=np.float32) first_arg = (ustar[:, :, np.newaxis] * arr_ones)[cond] sec_arg = np.log( hgrid / (np.reshape(self.z_0, self.z_0.shape + (1,)) * arr_ones) )[cond] unew[cond] = (first_arg * sec_arg) / VONKARMAN return unew
[docs] def _calc_u_at_h( self, u_in: ndarray, h_in: ndarray, hhere: ndarray, mask: ndarray, dolog: bool = False, ) -> ndarray: """Function to interpolate u_in on h_in at hhere. Args: u_in: 3D array float32 - velocity on h_in layer, last dim is height h_in: 3D or 1D array float32 - height layer array hhere: 2D array float32 - height grid to interpolate at mask: 2D array of bools - mask the final result for uath dolog: if True, log interpolation, default False Returns: 2D array float32 - velocity interpolated at h """ u_in = np.ma.masked_less(u_in, 0.0) h_in = np.ma.masked_less(h_in, 0.0) # h_in.mask = u_in.mask # If I allow 1D height grids, I think I cannot do the hop over. # Ignores the height at the position where u_in is RMDI,"hops over" hhere = np.ma.masked_less(hhere, 0.0) upidx = np.argmax(h_in > hhere[:, :, np.newaxis], axis=2) # loidx = np.maximum(upidx-1, 0) #if RMDI, need below loidx = np.argmin( np.ma.masked_less(hhere[:, :, np.newaxis] - h_in, 0.0), axis=2 ) if h_in.ndim == 3: hup = h_in.take( upidx.flatten() + np.arange(0, upidx.size * h_in.shape[2], h_in.shape[2]) ) hlow = h_in.take( loidx.flatten() + np.arange(0, loidx.size * h_in.shape[2], h_in.shape[2]) ) elif h_in.ndim == 1: hup = h_in[upidx].flatten() hlow = h_in[loidx].flatten() uup = u_in.take( upidx.flatten() + np.arange(0, upidx.size * u_in.shape[2], u_in.shape[2]) ) ulow = u_in.take( loidx.flatten() + np.arange(0, loidx.size * u_in.shape[2], u_in.shape[2]) ) mask = mask.flatten() uath = np.full(mask.shape, RMDI, dtype=np.float32) if dolog: uath[mask] = self._interpolate_log( hup[mask], hlow[mask], hhere.flatten()[mask], uup[mask], ulow[mask] ) else: uath[mask] = self._interpolate_1d( hup[mask], hlow[mask], hhere.flatten()[mask], uup[mask], ulow[mask] ) uath = np.reshape(uath, hhere.shape) return uath
[docs] @staticmethod def _interpolate_1d( xup: ndarray, xlow: ndarray, at_x: ndarray, yup: ndarray, ylow: ndarray ) -> ndarray: """Simple 1D linear interpolation for 2D grid inputs level. Args: xup: 2D array float32 - upper x-bins xlow: 2D array float32 - lower x-bins at_x: 2D array float32 - x values to interpolate y at yup: 2D array float32 - y(xup) ylow: 2D array float32 - y(xlow) Returns: 2D array float32 - y(at_x) assuming a lin function between xlow and xup """ interp = np.full(xup.shape, RMDI, dtype=np.float32) diffs = xup - xlow interp[diffs != 0] = ylow[diffs != 0] + ( (at_x[diffs != 0] - xlow[diffs != 0]) / diffs[diffs != 0] * (yup[diffs != 0] - ylow[diffs != 0]) ) interp[diffs == 0] = at_x[diffs == 0] / xup[diffs == 0] * (yup[diffs == 0]) return interp
[docs] @staticmethod def _interpolate_log( xup: ndarray, xlow: ndarray, at_x: ndarray, yup: ndarray, ylow: ndarray ) -> ndarray: """Simple 1D log interpolation y(x), except if lowest layer is ground level. Args: xup: 2D array float32 - upper x-bins xlow: 2D array float32 - lower x-bins at_x: 2D array float32 - x values to interpolate y at yup: 2D array float32 - y(xup) ylow: 2D array float32 -y(xlow) Returns: 2D array float32 - y(at_x) assuming a log function between xlow and xup """ ain = np.full(xup.shape, RMDI, dtype=np.float32) loginterp = np.full(xup.shape, RMDI, dtype=np.float32) mfrac = xup / xlow mtest = (xup / xlow != 1) & (at_x != xup) ain[mtest] = (yup[mtest] - ylow[mtest]) / np.log(mfrac[mtest]) loginterp[mtest] = ain[mtest] * np.log(at_x[mtest] / xup[mtest]) + yup[mtest] mtest = xup / xlow == 1 # below lowest layer, make lin interp loginterp[mtest] = at_x[mtest] / xup[mtest] * (yup[mtest]) mtest = at_x == xup # just use yup loginterp[mtest] = yup[mtest] return loginterp
[docs] def _calc_height_corr( self, u_a: ndarray, heightg: ndarray, mask: ndarray, onemfrac: Union[float, ndarray], ) -> ndarray: """Function to calculate the additive height correction. Args: u_a: 2D array float32 - outer velocity, e.g. velocity at h_ref_orig heightg: 1D or 3D array float32 - heights above orography mask: 3D array of bools - Masks the hc_add result onemfrac: Currently, scalar = 1. But can be a function of position and height, e.g. a 3D array (float32) Returns: 3D array float32 - additive height correction to wind speed Comments: The height correction is a disturbance of the flow that decays exponentially with height. The larger the vertical offset h_at0 (the higher the unresolved hill), the larger is the disturbance. The more smooth the disturbance (the larger the horizontal scale of the disturbance), the smaller the height correction (hence, a larger wavenumber results in a larger disturbance). hc_add = exp(-height*wavenumber)*u(href)*h_at_0*wavenumber A final factor of 1 is assumed and omitted for the Bessel function term. """ (xdim, ydim) = u_a.shape if heightg.ndim == 1: zdim = heightg.shape[0] heightg = heightg[np.newaxis, np.newaxis, :] elif heightg.ndim == 3: zdim = heightg.shape[2] ml2 = self.h_at0 * self.wavenum expon = np.ones([xdim, ydim, zdim], dtype=np.float32) mult = self.wavenum[:, :, np.newaxis] * heightg expon[mult > 0.0001] = np.exp(-mult[mult > 0.0001]) hc_add = expon * u_a[:, :, np.newaxis] * ml2[:, :, np.newaxis] * onemfrac hc_add[~mask, :] = 0 return hc_add
[docs] def _delta_height(self) -> ndarray: """Function to calculate pp-grid diff from model grid. Calculate the difference between pp-grid height and model grid height. Returns: 2D array float32 - height difference, ppgrid-model """ delt_z = np.full(self.pporo.shape, RMDI, dtype=np.float32) delt_z[self.hcmask] = self.pporo[self.hcmask] - self.modoro[self.hcmask] return delt_z
[docs] def do_rc_hc_all(self, hgrid: ndarray, uorig: ndarray) -> ndarray: """Function to call HC and RC (height and roughness corrections). Args: hgrid: 1D or 3D array float32 - height grid of wind input uorig: 3D array float32 - wind speed on these levels Returns: - RC corrected windspeed - HC additional part Friedrich, M. M., 2016 Wind Downscaling Program (Internal Met Office Report) """ if hgrid.ndim == 3: condition1 = (hgrid == RMDI).any(axis=2) self.hcmask[condition1] = False self.rcmask[condition1] = False mask_rc = np.copy(self.rcmask) mask_rc[(uorig == RMDI).any(axis=2)] = False mask_hc = np.copy(self.hcmask) mask_hc[(uorig == RMDI).any(axis=2)] = False if self.z_0 is not None: unew = self.calc_roughness_correction(hgrid, uorig, mask_rc) else: unew = uorig uhref_orig = self._calc_u_at_h(uorig, hgrid, 1.0 / self.wavenum, mask_hc) mask_hc[uhref_orig <= 0] = False # Setting this value to 1, is equivalent to setting the # Bessel function to 1. (Friedrich, 2016) # Example usage if the Bessel function was not set to 1 is: # onemfrac = 1.0 - BfuncFrac(nx,ny,nz,heightvec,z_0,waveno, Ustar, UI) onemfrac = 1.0 hc_add = self._calc_height_corr(uhref_orig, hgrid, mask_hc, onemfrac) result = unew + hc_add result[result < 0.0] = 0 # HC can be negative if pporo<modeloro return result.astype(np.float32)
[docs] class RoughnessCorrection(PostProcessingPlugin): """Plugin to orographically-correct 3d wind speeds.""" zcoordnames = ["height", "model_level_number"] tcoordnames = ["time", "forecast_time"]
[docs] def __init__( self, a_over_s_cube: Cube, sigma_cube: Cube, pporo_cube: Cube, modoro_cube: Cube, modres: float, z0_cube: Cube = None, height_levels_cube: Optional[Cube] = None, ) -> None: """Initialise the RoughnessCorrection instance. Args: a_over_s_cube: 2D - model silhouette roughness on pp grid. dimensionless sigma_cube: 2D - standard deviation of model orography height on pp grid. In m. pporo_cube: 2D - pp orography. In m modoro_cube: 2D - model orography interpolated on pp grid. In m modres: original average model resolution in m height_levels_cube: 3D or 1D - height of input velocity field. Can be position dependent z0_cube: 2D - vegetative roughness length in m. If not given, do not do any RC """ # Standard Python 'float' type is either single or double depending on # system and there is no reliable method of finding which from the # variable. So force to numpy.float32 by default. modres = np.float32(modres) x_name, y_name, _, _ = self.find_coord_names(pporo_cube) # Some checking that all the grids match if not ( self.check_ancils( a_over_s_cube, sigma_cube, z0_cube, pporo_cube, modoro_cube ) ): raise ValueError("ancillary grids are not consistent") # I want this ordering. Careful: iris.cube.Cube.slices is unreliable. self.a_over_s = next(a_over_s_cube.slices([y_name, x_name])) self.sigma = next(sigma_cube.slices([y_name, x_name])) try: self.z_0 = next(z0_cube.slices([y_name, x_name])) except AttributeError: self.z_0 = z0_cube self.pp_oro = next(pporo_cube.slices([y_name, x_name])) self.model_oro = next(modoro_cube.slices([y_name, x_name])) self.ppres = self.calc_av_ppgrid_res(pporo_cube) self.modres = modres self.height_levels = height_levels_cube self.x_name = None self.y_name = None self.z_name = None self.t_name = None
[docs] def find_coord_names(self, cube: Cube) -> Tuple[str, str, str, str]: """Extract x, y, z, and time coordinate names. Args: cube: some iris cube to find coordinate names from Returns: - name of the axis name in x-direction - name of the axis name in y-direction - name of the axis name in z-direction - name of the axis name in t-direction """ clist = {cube.coords()[i].name() for i in range(len(cube.coords()))} try: xname = cube.coord(axis="x").name() except CoordinateNotFoundError as exc: print("'{0}' while xname setting. Args: {1}.".format(exc, exc.args)) try: yname = cube.coord(axis="y").name() except CoordinateNotFoundError as exc: print("'{0}' while yname setting. Args: {1}.".format(exc, exc.args)) if clist.intersection(self.zcoordnames): zname = list(clist.intersection(self.zcoordnames))[0] else: zname = None if clist.intersection(self.tcoordnames): tname = list(clist.intersection(self.tcoordnames))[0] else: tname = None return xname, yname, zname, tname
[docs] def calc_av_ppgrid_res(self, a_cube: Cube) -> float: """Calculate average grid resolution from a cube. Args: a_cube: Cube to calculate average resolution of Returns: Average grid resolution. """ x_name, y_name, _, _ = self.find_coord_names(a_cube) [exp_xname, exp_yname] = ["projection_x_coordinate", "projection_y_coordinate"] exp_unit = Unit("m") if (x_name != exp_xname) or (y_name != exp_yname): raise ValueError("cannot currently calculate resolution") if a_cube.coord(x_name).bounds is None and a_cube.coord(y_name).bounds is None: xres = (np.diff(a_cube.coord(x_name).points)).mean() yres = (np.diff(a_cube.coord(y_name).points)).mean() else: xres = (np.diff(a_cube.coord(x_name).bounds)).mean() yres = (np.diff(a_cube.coord(y_name).bounds)).mean() if (a_cube.coord(x_name).units != exp_unit) or ( a_cube.coord(y_name).units != exp_unit ): raise ValueError("cube axis have units different from m.") return (abs(xres) + abs(yres)) / 2.0
[docs] @staticmethod def check_ancils( a_over_s_cube: Cube, sigma_cube: Cube, z0_cube: Optional[Cube], pp_oro_cube: Cube, model_oro_cube: Cube, ) -> ndarray: """Check ancils grid and units. Check if ancil cubes are on the same grid and if they have the expected units. The testing for "same grid" might be replaced if there is a general utils function made for it or so. Args: a_over_s_cube: holding the silhouette roughness field sigma_cube: holding the standard deviation of height in a grid cell z0_cube: holding the vegetative roughness field pp_oro_cube: holding the post processing grid orography model_oro_cube: holding the model orography on post processing grid Returns: Containing bools describing whether or not the tests passed """ ancil_list = [a_over_s_cube, sigma_cube, pp_oro_cube, model_oro_cube] unwanted_coord_list = [ "time", "height", "model_level_number", "forecast_time", "forecast_reference_time", "forecast_period", ] for field, exp_unit in zip(ancil_list, [1, Unit("m"), Unit("m"), Unit("m")]): for unwanted_coord in unwanted_coord_list: try: field.remove_coord(unwanted_coord) except CoordinateNotFoundError: pass if field.units != exp_unit: msg = ( "{} ancil field has unexpected unit:" " {} (expected) vs. {} (actual)" ) raise ValueError(msg.format(field.name(), exp_unit, field.units)) if z0_cube is not None: ancil_list.append(z0_cube) for unwanted_coord in unwanted_coord_list: try: z0_cube.remove_coord(unwanted_coord) except CoordinateNotFoundError: pass if z0_cube.units != Unit("m"): msg = "z0 ancil has unexpected unit: should be {} is {}" raise ValueError(msg.format(Unit("m"), z0_cube.units)) permutated_ancil_list = list(itertools.permutations(ancil_list, 2)) oklist = [] for entry in permutated_ancil_list: x_axis_flag = entry[0].coord(axis="y") == entry[1].coord(axis="y") y_axis_flag = entry[0].coord(axis="x") == entry[1].coord(axis="x") oklist.append(x_axis_flag & y_axis_flag) # HybridHeightToPhenomOnPressure._cube_compatibility_check(entr[0], # entr[1]) return np.array(oklist).all() # replace by a return value of True
[docs] def find_coord_order(self, mcube: Cube) -> Tuple[int, int, int, int]: """Extract coordinate ordering within a cube. Use coord_dims to assess the dimension associated with a particular dimension coordinate. If a coordinate is not a dimension coordinate, then a NaN value will be returned for that coordinate. Args: mcube: cube to check the order of coordinate axis Returns: - position of x axis. - position of y axis. - position of z axis. - position of t axis. """ coord_names = [self.x_name, self.y_name, self.z_name, self.t_name] positions = len(coord_names) * [np.nan] for coord_index, coord_name in enumerate(coord_names): if mcube.coords(coord_name, dim_coords=True): (positions[coord_index],) = mcube.coord_dims(coord_name) return positions
[docs] def find_heightgrid(self, wind: Cube) -> ndarray: """Setup the height grid. Setup the height grid either from the 1D or 3D height grid that was supplied to the plugin or from the z-axis information from the wind grid. Args: wind: 3D or 4D - representing the wind data. Returns: 1D or 3D array - representing the height grid. """ if self.height_levels is None: hld = wind.coord(self.z_name).points else: hld = iris.util.squeeze(self.height_levels) if np.isnan(hld.data).any() or (hld.data == RMDI).any(): raise ValueError("height grid contains invalid points") if hld.ndim == 3: try: xap, yap, zap, _ = self.find_coord_order(hld) hld.transpose([yap, xap, zap]) except KeyError: raise ValueError("height grid different from wind grid") elif hld.ndim == 1: try: hld = next(hld.slices([self.z_name])) except CoordinateNotFoundError: raise ValueError("height z coordinate differs from wind z") else: raise ValueError( "hld must have a dimension length of " "either 3 or 1" "hld.ndim is {}".format(hld.ndim) ) hld = hld.data return hld
[docs] def check_wind_ancil(self, xwp: int, ywp: int) -> None: """Check wind vs ancillary file grids. Check if wind and ancillary files are on the same grid and if they have the same ordering. Args: xwp: representing the position of the x-axis in the wind cube ywp: representing the position of the y-axis of the wind cube """ xap, yap, _, _ = self.find_coord_order(self.pp_oro) if xwp - ywp != xap - yap: if np.isnan(xap) or np.isnan(yap): raise ValueError("ancillary grid different from wind grid") raise ValueError("xy-orientation: ancillary differ from wind")
[docs] def process(self, input_cube: Cube) -> Cube: """Adjust the 4d wind field - cube - (x, y, z including times). Args: input_cube: The wind cube to be operated upon. Should be wind speed on height_levels for all desired forecast times. Returns: The 4d wind field with roughness and height correction applied in the same order as the input cube. Raises: TypeError: If input_cube is not a cube. """ if not isinstance(input_cube, iris.cube.Cube): msg = "wind input is not a cube, but {}" raise TypeError(msg.format(type(input_cube))) (self.x_name, self.y_name, self.z_name, self.t_name) = self.find_coord_names( input_cube ) xwp, ywp, zwp, twp = self.find_coord_order(input_cube) if np.isnan(twp): input_cube.transpose([ywp, xwp, zwp]) else: input_cube.transpose([ywp, xwp, zwp, twp]) # problems with slices rchc_list = iris.cube.CubeList() if self.z_0 is None: z0_data = None else: z0_data = self.z_0.data roughness_correction = RoughnessCorrectionUtilities( self.a_over_s.data, self.sigma.data, z0_data, self.pp_oro.data, self.model_oro.data, self.ppres, self.modres, ) self.check_wind_ancil(xwp, ywp) hld = self.find_heightgrid(input_cube) for time_slice in input_cube.slices_over("time"): if np.isnan(time_slice.data).any() or (time_slice.data < 0.0).any(): msg = "{} has invalid wind data" raise ValueError(msg.format(time_slice.coord(self.t_name))) rc_hc = copy.deepcopy(time_slice) rc_hc.data = roughness_correction.do_rc_hc_all(hld, time_slice.data) rchc_list.append(rc_hc) output_cube = rchc_list.merge_cube() # reorder input_cube and output_cube as original if np.isnan(twp): input_cube.transpose(np.argsort([ywp, xwp, zwp])) output_cube.transpose(np.argsort([ywp, xwp, zwp])) else: input_cube.transpose(np.argsort([ywp, xwp, zwp, twp])) output_cube.transpose(np.argsort([twp, ywp, xwp, zwp])) return output_cube