# (C) Crown Copyright, Met Office. All rights reserved.
#
# This file is part of 'IMPROVER' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
"""init for cli and clize"""
import pathlib
import shlex
import time
from collections import OrderedDict
from functools import partial
import clize
from clize import parameters
from clize.help import ClizeHelp, HelpForAutodetectedDocstring
from clize.parser import value_converter
from clize.runner import Clize
from sigtools.wrappers import decorator
# Imports are done in their functions to make calls to -h quicker.
# selected clize imports/constants
IGNORE = clize.Parameter.IGNORE
LAST_OPTION = clize.Parameter.LAST_OPTION
REQUIRED = clize.Parameter.REQUIRED
UNDOCUMENTED = clize.Parameter.UNDOCUMENTED
# help helpers
[docs]
def docutilize(obj):
"""Convert Numpy or Google style docstring into reStructuredText format.
Args:
obj (str or object):
Takes an object and changes it's docstrings to a reStructuredText
format.
Returns:
str or object:
A converted string or an object with replaced docstring depending
on the type of the input.
"""
from inspect import cleandoc, getdoc
from sphinx.ext.napoleon.docstring import GoogleDocstring, NumpyDocstring
if isinstance(obj, str):
doc = cleandoc(obj)
else:
doc = getdoc(obj)
doc = str(NumpyDocstring(doc))
doc = str(GoogleDocstring(doc))
doc = doc.replace(":exc:", "")
doc = doc.replace(":data:", "")
doc = doc.replace(":keyword", ":param")
doc = doc.replace(":kwtype", ":type")
if isinstance(obj, str):
return doc
obj.__doc__ = doc
return obj
[docs]
class HelpForNapoleonDocstring(HelpForAutodetectedDocstring):
"""Subclass to add support for google style docstrings"""
[docs]
def add_docstring(self, docstring, *args, **kwargs):
"""Adds the updated docstring."""
docstring = docutilize(docstring)
super().add_docstring(docstring, *args, **kwargs)
[docs]
class DocutilizeClizeHelp(ClizeHelp):
"""Subclass to build Napoleon docstring from subject."""
def __init__(self, subject, owner, builder=HelpForNapoleonDocstring.from_subject):
super().__init__(subject, owner, builder)
# input handling
[docs]
class ObjectAsStr(str):
"""Hide object under a string to pass it through Clize parser."""
__slots__ = ("original_object",)
def __new__(cls, obj, name=None):
if isinstance(obj, cls): # pass object through if already wrapped
return obj
if name is None:
name = cls.obj_to_name(obj)
self = str.__new__(cls, name)
self.original_object = obj
return self
def __hash__(self):
# make sure our hash doesn't clash with normal string hash
return super().__hash__(self) ^ hash(type(self))
[docs]
@staticmethod
def obj_to_name(obj, cls=None):
"""Helper function to create the string."""
if isinstance(obj, str):
return obj
if cls is None:
cls = type(obj)
try:
obj_id = hash(obj)
except TypeError:
obj_id = id(obj)
return "<%s.%s@%i>" % (cls.__module__, cls.__name__, obj_id)
[docs]
def maybe_coerce_with(converter, obj, **kwargs):
"""Apply converter if str, pass through otherwise."""
obj = getattr(obj, "original_object", obj)
return converter(obj, **kwargs) if isinstance(obj, str) else obj
[docs]
@value_converter
def comma_separated_list(to_convert):
"""Converts comma separated string to list or returns passed object.
Args:
to_convert (string or list)
comma separated string or list
Returns:
list
"""
return maybe_coerce_with(lambda s: s.split(","), to_convert)
[docs]
@value_converter
def comma_separated_list_of_float(to_convert):
"""Converts comma separated string to list of floats or returns passed object.
Args:
to_convert (string or list)
comma separated string or list
Returns:
list
"""
return maybe_coerce_with(
lambda string: [float(s) for s in string.split(",")], to_convert
)
# output handling
[docs]
@decorator
def with_output(
wrapped,
*args,
output=None,
pass_through_output=False,
compression_level=1,
least_significant_digit: int = None,
**kwargs,
):
"""Add `output` keyword only argument.
Add `compression_level` option.
Add `least_significant_digit` option.
This is used to add extra `output`, `compression_level` and `least_significant_digit` CLI
options. If `output` is provided, it saves the result of calling `wrapped` to file and returns
None, otherwise it returns the result. If `compression_level` is provided, it compresses the
data with the provided compression level (or not, if `compression_level` 0). If
`least_significant_digit` provided, it will quantize the data to a certain number of
significant figures.
Args:
wrapped (obj):
The function to be wrapped.
output (str, optional):
Output file name. If not supplied, the output object will be
printed instead.
pass_through_output (bool):
Pass through the output object even if saved to file.
Used in pipelines of commands if intermediate output needs to be saved.
compression_level (int):
Will set the compression level (1 to 9), or disable compression (0).
least_significant_digit (int):
If specified will truncate the data to a precision given by
10**(-least_significant_digit), e.g. if least_significant_digit=2, then the data will
be quantized to a precision of 0.01 (10**(-2)). See
http://www.esrl.noaa.gov/psd/data/gridded/conventions/cdc_netcdf_standard.shtml
for details. When used with `compression level`, this will result in lossy
compression.
Returns:
Result of calling `wrapped` or None if `output` is given.
"""
import joblib
from iris.cube import Cube, CubeList
from improver.utilities.save import save_netcdf
result = wrapped(*args, **kwargs)
# If result is a Cube or CubeList or an iterable containing only Cubes,
# save as netCDF
if (
output
and result
and (
(isinstance(result, (Cube, CubeList)))
or all([isinstance(x, Cube) for x in result])
)
):
save_netcdf(result, output, compression_level, least_significant_digit)
if pass_through_output:
return ObjectAsStr(result, output)
return
elif output and result:
# If output is set and result exists but is not a Cube, save it as a pickle file
joblib.dump(result, output, compress=compression_level)
return
return result
# cli object creation
[docs]
def clizefy(obj=None, helper_class=DocutilizeClizeHelp, **kwargs):
"""Decorator for creating CLI objects."""
if obj is None:
return partial(clizefy, helper_class=helper_class, **kwargs)
if hasattr(obj, "cli"):
return obj
if not callable(obj):
return Clize.get_cli(obj, **kwargs)
return Clize.keep(obj, helper_class=helper_class, **kwargs)
# help command
[docs]
@clizefy(help_names=())
def improver_help(prog_name: parameters.pass_name, command=None, *, usage=False):
"""Show command help."""
prog_name = prog_name.split()[0]
args = filter(None, [command, "--help", usage and "--usage"])
result = execute_command(SUBCOMMANDS_DISPATCHER, prog_name, *args)
if not command and usage:
result = "\n".join(
line
for line in result.splitlines()
if not line.endswith("--help [--usage]")
)
return result
[docs]
def command_executor(*argv, verbose=False, dry_run=False):
"""Common entry point for straight command execution."""
return execute_command(
SUBCOMMANDS_DISPATCHER, *argv, verbose=verbose, dry_run=dry_run
)
[docs]
def _cli_items():
"""Dynamically discover CLIs."""
import importlib
import pkgutil
from improver.cli import __path__ as improver_cli_pkg_path
yield ("help", improver_help)
for minfo in pkgutil.iter_modules(improver_cli_pkg_path):
mod_name = minfo.name
if mod_name != "__main__":
mcli = importlib.import_module("improver.cli." + mod_name)
yield (mod_name, clizefy(mcli.process))
SUBCOMMANDS_TABLE = OrderedDict(sorted(_cli_items()))
# main CLI object with subcommands
SUBCOMMANDS_DISPATCHER = clizefy(
SUBCOMMANDS_TABLE,
description="""IMPROVER NWP post-processing toolbox""",
footnotes="""See also improver --help for more information.""",
)
# IMPROVER top level main
[docs]
def unbracket(args):
"""Convert input list with bracketed items into nested lists.
>>> unbracket("foo [ bar a b ] [ baz c ] -o z".split())
['foo', ['bar', 'a', 'b'], ['baz', 'c'], '-o', 'z']
"""
outargs = []
stack = []
mismatch_msg = "Mismatched bracket at position %i."
for i, arg in enumerate(args):
if arg == "[":
stack.append(outargs)
outargs = []
elif arg == "]":
if not stack:
raise ValueError(mismatch_msg % i)
stack[-1].append(outargs)
outargs = stack.pop()
else:
outargs.append(arg)
if stack:
raise ValueError(mismatch_msg % len(args))
return outargs
[docs]
class TimeIt:
def __init__(self, verbose=False):
self._verbose = verbose
self._elapsed = None
self._start = None
def __enter__(self):
self._start = time.perf_counter()
return self
def __exit__(self, *args):
self._elapsed = time.perf_counter() - self._start
if self._verbose:
print(str(self))
@property
def elapsed(self):
"""Return elapsed time in seconds."""
return self._elapsed
def __str__(self):
"""Print elapsed time in seconds."""
return f"Run-time: {self._elapsed}s"
[docs]
def execute_command(dispatcher, prog_name, *args, verbose=False, dry_run=False):
"""Common entry point for command execution."""
args = list(args)
for i, arg in enumerate(args):
if isinstance(arg, (list, tuple)):
# process nested commands recursively
arg = execute_command(
dispatcher, prog_name, *arg, verbose=verbose, dry_run=dry_run
)
if isinstance(arg, pathlib.PurePath):
arg = str(arg)
elif not isinstance(arg, str):
arg = ObjectAsStr(arg)
args[i] = arg
msg = " ".join([shlex.quote(x) for x in (prog_name, *args)])
if dry_run:
if verbose:
print(msg)
return args
with TimeIt() as timeit:
result = dispatcher(prog_name, *args)
if verbose:
print(f"{timeit}; {msg}")
if result is not None:
print(ObjectAsStr.obj_to_name(result))
return result
[docs]
@clizefy()
def main(
prog_name: parameters.pass_name,
command: LAST_OPTION,
*args,
profile: value_converter(lambda _: _, name="FILENAME") = None, # noqa: F821
memprofile: value_converter(lambda _: _, name="FILENAME") = None, # noqa: F821
verbose=False,
dry_run=False,
):
"""IMPROVER NWP post-processing toolbox
Results from commands can be passed into file-like arguments
of other commands by surrounding them by square brackets::
improver command [ command ... ] ...
Spaces around brackets are mandatory.
Args:
prog_name:
The program name from argv[0].
command (str):
Command to execute
args (tuple):
Command arguments
profile (str):
If given, will write profiling to the file given.
To write to stdout, use a hyphen (-)
memprofile (str):
Creates 2 files by adding a suffix to the provided arguemnt -
a tracemalloc snapshot at the point of highest memory consumption
of your program (suffixed with _SNAPSHOT)
and a track of the maximum memory used by your program
over time (suffixed with _MAX_TRACKER).
verbose (bool):
Print executed commands
dry_run (bool):
Print commands to be executed
See improver help [--usage] [command] for more information
on available command(s).
"""
args = unbracket(args)
exec_cmd = execute_command
if profile is not None:
from improver.profile import profile_hook_enable
profile_hook_enable(dump_filename=None if profile == "-" else profile)
if memprofile is not None:
from improver.memprofile import memory_profile_decorator
exec_cmd = memory_profile_decorator(exec_cmd, memprofile)
result = exec_cmd(
SUBCOMMANDS_DISPATCHER,
prog_name,
command,
*args,
verbose=verbose,
dry_run=dry_run,
)
return result
[docs]
def run_main(argv=None):
"""Overrides argv[0] to be 'improver' then runs main.
Args:
argv (list of str):
Arguments that were from the command line.
"""
import sys
from clize import run
# clize help shows module execution as `python -m improver.cli`
# override argv[0] and pass it explicitly in order to avoid this
# so that the help command reflects the way that we call improver.
if argv is None:
argv = sys.argv[:]
argv[0] = "improver"
run(main, args=argv)