import sys
import logging
import contextlib
import resource
import functools
import subprocess
import inspect
import multiprocessing
import os
import tempfile
import warnings
import json
from ..config import write_run_directory, get_n_processes, dump, load
from .. import filesystem

STDOUT_FILENAME = "stdout.log"
STDERR_FILENAME = "stderr.log"
CONFIG_OUT_FILENAME = "fv3config.yml"

logger = logging.getLogger("fv3run")

def call_via_subprocess(module):
    def decorator(func):
        signature = inspect.signature(func)

        def main(argv):
            args, kwargs = json.loads(argv[1])
            func(*args, **kwargs)

        def command(*args, **kwargs) -> str:
            # check that args and kwargs match func
            # raises TypeError if not
            signature.bind(*args, **kwargs)

            serialized = json.dumps([args, kwargs])
            return ["python", "-m", module, serialized]

        func.main = main
        func.command = command
        return func

    return decorator

[docs]@call_via_subprocess("fv3config.fv3run._native_main") def run_native( config_dict_or_location, outdir, runfile=None, capture_output: bool = True ): """Run the FV3GFS model with the given configuration. Copies the resulting directory to a target location. Will use the Google cloud storage key at ``$GOOGLE_APPLICATION_CREDENTIALS`` by default. Requires the fv3gfs-python package. Args: config_dict_or_location (dict or str): a configuration dictionary, or a location (local or on Google cloud storage) of a yaml file containing a configuration dictionary outdir (str): location to copy the resulting run directory runfile (str, optional): Python model script to use in place of the default. capture_output (bool, optional): If true, then the stderr and stdout streams will be redirected to the files `outdir/stderr.log` and `outdir/stdout.log` respectively. """ _set_stacksize_unlimited() with _temporary_directory(outdir) as localdir: config_out_filename = os.path.join(localdir, CONFIG_OUT_FILENAME) # we need to write the dict to the run directory for archival and also load # the dict, it ends up being convenient to do both at once config_dict = _get_config_dict_and_write( config_dict_or_location, config_out_filename ) write_run_directory(config_dict, localdir) if runfile is not None: filesystem.get_file( runfile, os.path.join(localdir, os.path.basename(runfile)) ) with _output_stream_context(localdir, capture_output) as (stdout, stderr): n_processes = get_n_processes(config_dict) _run_experiment( localdir, n_processes, runfile=runfile, mpi_flags=_add_oversubscribe_if_necessary(MPI_FLAGS, n_processes), stdout=stdout, stderr=stderr, )
def _set_stacksize_unlimited(): try: resource.setrlimit( resource.RLIMIT_STACK, (resource.RLIM_INFINITY, resource.RLIM_INFINITY) ) except ValueError: warnings.warn( "could not remove stacksize limit, may run out of memory as a result" ) def _add_oversubscribe_if_necessary(mpi_flags, n_processes): try: cpu_count = multiprocessing.cpu_count() if cpu_count < n_processes: mpi_flags += ["--oversubscribe"] except NotImplementedError: warnings.warn( "could not determine cpu count, assuming number of processors" "is at least as many as number of MPI tasks" ) return mpi_flags @contextlib.contextmanager def _temporary_directory(outdir): fs = filesystem.get_fs(outdir) if not filesystem.is_local_path(outdir): with tempfile.TemporaryDirectory() as tempdir: try: yield tempdir finally:"Copying output to %s", outdir) fs.makedirs(outdir, exist_ok=True) filesystem.put_directory(tempdir, outdir) else: fs.makedirs(outdir, exist_ok=True) yield outdir def _captured_output_context(localdir): out_filename = os.path.join(localdir, STDOUT_FILENAME) err_filename = os.path.join(localdir, STDERR_FILENAME) with open(out_filename, "wb") as out_file, open(err_filename, "wb") as err_file: try: yield out_file, err_file except subprocess.CalledProcessError as e: logger.critical( "Experiment failed. " "Check %s and %s for logs.", STDOUT_FILENAME, STDERR_FILENAME, ) raise e def _uncaptured_output_context(localdir): try: yield sys.stdout, sys.stderr except subprocess.CalledProcessError as e: logger.critical("Experiment failed") raise e @contextlib.contextmanager def _output_stream_context(localdir: str, capture_output: bool):"running experiment") if capture_output: yield from _captured_output_context(localdir) else: yield from _uncaptured_output_context(localdir) def _get_python_command(runfile): python_args = ["python3", "-m", "mpi4py"] if runfile is not None: python_args.append(os.path.basename(runfile)) elif RUNFILE_ENV_VAR in os.environ: python_args.append(os.environ[RUNFILE_ENV_VAR]) else: python_args += ["-m", ""] return python_args def _run_experiment( dirname, n_processes, runfile, mpi_flags=None, stdout=None, stderr=None ): if mpi_flags is None: mpi_flags = [] python_command = _get_python_command(runfile)"Running experiment in %s", dirname) subprocess.check_call( ["mpirun", "-n", str(n_processes)] + mpi_flags + python_command, cwd=dirname, stdout=stderr, stderr=stdout, ) def _get_config_dict_and_write(config_dict_or_location, config_out_filename): if isinstance(config_dict_or_location, dict): config_dict = config_dict_or_location with, "w") as f: dump(config_dict, f) else: config_dict = _copy_and_load_config_dict( config_dict_or_location, config_out_filename ) return config_dict def _copy_and_load_config_dict(config_location, local_target_location): filesystem.get_file(config_location, local_target_location) with open(local_target_location, "r") as infile: config_dict = load(infile) return config_dict if __name__ == "__main__": # In theory this warning should never be triggered. # There's probably a bug in run_native.command if it is. # Remove this main block after some time if it never gets triggered. warnings.warn( "calling fv3config.fv3run._native is deprecated, call fv3config.fv3run._native_main instead", DeprecationWarning, ) run_native.main(sys.argv)