Source code for fv3config._asset_list

import io
import logging
import os

from ._datastore import (
    get_initial_conditions_directory,
    get_orographic_forcing_directory,
    get_base_forcing_directory,
)
from fv3config._datastore import (
    get_field_table_filename,
    get_diag_table_filename,
    get_data_table_filename,
)
from .config.diag_table import DiagTable
from .config._serialization import dump
from ._exceptions import ConfigError
from . import filesystem


FV3CONFIG_YML_NAME = "fv3config.yml"
logger = logging.getLogger("fv3config")


def is_dict_or_list(option):
    return isinstance(option, dict) or isinstance(option, list)


def ensure_is_list(asset):
    """If argument is a dict, return length 1 list containing dict"""
    if isinstance(asset, dict):
        return [asset]
    elif isinstance(asset, list):
        return asset
    else:
        raise ConfigError("Asset must be a dict or list of dicts")


def get_orographic_forcing_asset_list(config):
    """Return asset_list for orographic forcing"""
    if is_dict_or_list(config["orographic_forcing"]):
        return ensure_is_list(config["orographic_forcing"])
    else:
        source_directory = get_orographic_forcing_directory(config)
        return asset_list_from_path(
            source_directory, target_location="INPUT", copy_method="link"
        )


def get_base_forcing_asset_list(config):
    """Return asset_list for base forcing"""
    if is_dict_or_list(config["forcing"]):
        return ensure_is_list(config["forcing"])
    else:
        source_directory = get_base_forcing_directory(config)
        return asset_list_from_path(source_directory, copy_method="link")


def get_initial_conditions_asset_list(config):
    """Return asset_list for initial conditions. """
    if is_dict_or_list(config["initial_conditions"]):
        return ensure_is_list(config["initial_conditions"])
    else:
        source_directory = get_initial_conditions_directory(config)
        return asset_list_from_path(source_directory, target_location="INPUT")


def get_data_table_asset(config):
    """Return asset for data_table"""
    data_table_filename = get_data_table_filename(config)
    location, name = os.path.split(data_table_filename)
    return get_asset_dict(location, name, target_name="data_table")


def get_diag_table_asset(config):
    """Return asset for diag_table"""
    if isinstance(config["diag_table"], DiagTable):
        return get_bytes_asset_dict(
            bytes(str(config["diag_table"]), "UTF-8"), ".", "diag_table"
        )
    else:
        diag_table_filename = get_diag_table_filename(config)
        location, name = os.path.split(diag_table_filename)
        return get_asset_dict(location, name, target_name="diag_table")


def get_field_table_asset(config):
    """Return asset for field_table"""
    field_table_filename = get_field_table_filename(config)
    location, name = os.path.split(field_table_filename)
    return get_asset_dict(location, name, target_name="field_table")


def get_fv3config_yaml_asset(config):
    """An asset containing this configuration"""
    f = io.StringIO()
    dump(config, f)
    return get_bytes_asset_dict(
        bytes(f.getvalue(), "UTF-8"),
        target_location=".",
        target_name=FV3CONFIG_YML_NAME,
    )


[docs]def get_asset_dict( source_location, source_name, target_location="", target_name=None, copy_method="copy", ): """Helper function to generate asset for a particular file Args: source_location (str): path to directory containing source file source_name (str): filename of source file target_location (str, optional): sub-directory to which file will be written, relative to run directory root. Defaults to empty string (i.e. root of run directory). target_name (str, optional): filename to which file will be written. Defaults to None, in which case source_name is used. copy_method (str, optional): flag to determine whether file is copied ('copy') or hard-linked ('link'). Defaults to 'copy'. Returns: dict: an asset dictionary """ if target_name is None: target_name = source_name asset = { "source_location": source_location, "source_name": source_name, "target_location": target_location, "target_name": target_name, "copy_method": copy_method, } return asset
[docs]def get_bytes_asset_dict( data: bytes, target_location: str, target_name: str, ): """Helper function to define the necessary fields for a binary asset to be saved at a given location. Args: data: the bytes to save target_location: sub-directory to which file will be written, relative to run directory root. Defaults to empty string (i.e. root of run directory). target_name: filename to which file will be written. Defaults to None, in which case source_name is used. Returns: dict: an asset dictionary """ return { "bytes": data, "target_location": target_location, "target_name": target_name, }
def _without_dot(path): if path == ".": return "" else: return path
[docs]def asset_list_from_path(from_location, target_location="", copy_method="copy"): """Return asset_list from all files within a given path. Args: location (str): local path or google cloud storage url. target_location (str, optional): target_location used for generated assets. Defaults to '' which is root of run-directory. copy_method ('copy' or 'link', optional): whether to copy or link assets, defaults to 'copy'. If location is a google cloud storage url, this option is ignored and files are copied. Returns: list: a list of asset dictionaries """ if not filesystem.is_local_path(from_location): copy_method = "copy" asset_list = [] for dirname, basename, relative_target_location in _asset_walk(from_location): asset_list.append( get_asset_dict( dirname, basename, target_location=os.path.join(target_location, relative_target_location), copy_method=copy_method, ) ) return asset_list
def _asset_walk(location): fs = filesystem.get_fs(location) protocol_prefix = filesystem._get_protocol_prefix(location) path_list = fs.walk(location) for dirname, _, files in path_list: dirname = protocol_prefix + dirname subdir_target_location = os.path.relpath(dirname, start=location) for basename in files: yield dirname, basename, _without_dot(subdir_target_location) def write_asset(asset, target_directory): """Write file represented by asset to target_directory Args: asset (dict): an asset dict target_directory (str): path to a directory in which all files will be written """ target_path = os.path.join( target_directory, asset["target_location"], asset["target_name"] ) os.makedirs(os.path.dirname(target_path), exist_ok=True) if "copy_method" in asset: copy_file_asset(asset, target_path) elif "bytes" in asset: logger.debug(f"Writing asset bytes to {target_path}.") with open(target_path, "wb") as f: f.write(asset["bytes"]) else: raise ConfigError( "Cannot write asset. Asset must have either a `copy_method` or `bytes` key." ) def copy_file_asset(asset, target_path): check_asset_has_required_keys(asset) source_path = os.path.join(asset["source_location"], asset["source_name"]) copy_method = asset["copy_method"] if copy_method == "copy": logger.debug(f"Copying asset from {source_path} to {target_path}.") filesystem.get_file(source_path, target_path) elif copy_method == "link": logger.debug(f"Linking asset from {source_path} to {target_path}.") link_file(source_path, target_path) else: raise ConfigError( f"Behavior of copy_method {copy_method} not defined for {source_path} asset" ) def write_assets_to_directory(config, target_directory): asset_list = config_to_asset_list(config) write_asset_list(asset_list, target_directory) def write_asset_list(asset_list, target_directory): """Loop over list of assets and write them all""" for asset in asset_list: write_asset(asset, target_directory) def check_asset_has_required_keys(asset): """Check asset has all of its required keys""" required_asset_keys = [ "source_location", "source_name", "target_location", "target_name", "copy_method", ] for required_asset_key in required_asset_keys: if required_asset_key not in asset: raise ConfigError(f"Assets must have a {required_asset_key}") def config_to_asset_list(config): """Convert a configuration dictionary to an asset list. The asset list will contain all files for the run directory except the namelist.""" asset_list = [] asset_list += get_initial_conditions_asset_list(config) asset_list += get_base_forcing_asset_list(config) asset_list += get_orographic_forcing_asset_list(config) asset_list.append(get_field_table_asset(config)) asset_list.append(get_diag_table_asset(config)) asset_list.append(get_data_table_asset(config)) asset_list.append(get_fv3config_yaml_asset(config)) if "patch_files" in config: if is_dict_or_list(config["patch_files"]): asset_list += ensure_is_list(config["patch_files"]) else: raise ConfigError( "patch_files item in config dictionary must be an asset dict or " "list of asset dicts" ) return asset_list def link_file(source_item, target_item): if any(not filesystem.is_local_path(item) for item in [source_item, target_item]): raise NotImplementedError( "cannot perform linking operation involving remote urls " f"from {source_item} to {target_item}" ) if os.path.exists(target_item): os.remove(target_item) os.symlink(source_item, target_item)