import logging
from typing import Sequence, Union, Tuple, Mapping
import dataclasses
import datetime
from enum import Enum
import re
import dacite
from .._exceptions import ConfigError
logger = logging.getLogger("fv3config")
NUMBER_OF_TOKENS_ON_FILE_LINES = 6
NUMBER_OF_TOKENS_ON_FIELD_LINES = 8
[docs]class Packing(Enum):
DOUBLE_PRECISION = 1
SINGLE_PRECISION = 2
[docs]@dataclasses.dataclass
class DiagFieldConfig:
"""Object representing configuration for a field of a diagnostics file.
Args:
module_name: Name of Fortran module containing diagnostic.
field_name: Name of diagnostic within Fortran code.
output_name: Name of diagnostic to use in output NetCDF.
time_sampling: Always set to 'all'.
reduction_method: One of 'none', 'average', 'min', 'max'.
regional_section: 'none' or region specification.
packing: precision for output data.
"""
module_name: str
field_name: str
output_name: str
time_sampling: str = "all"
reduction_method: str = "none"
regional_section: str = "none"
packing: Packing = Packing.SINGLE_PRECISION
[docs]@dataclasses.dataclass
class DiagFileConfig:
"""Object representing a diagnostics file configuration.
Args:
name: Name to use for NetCDF files, not including '.tile?.nc'.
frequency: Period between records in file.
frequency_units: One of 'years', 'months', 'days', 'hours', 'minutes', 'seconds'
field_configs: Sequence of DiagFieldConfigs defining fields to save.
file_format: Always FileFormat.NETCDF.
time_axis_units: Units for time coordinate in output files. One of 'years',
'months', 'days', 'hours', 'minutes', 'seconds'.
time_axis_name: Name for time coordinate in output files.
"""
name: str
frequency: int
frequency_units: str
field_configs: Sequence[DiagFieldConfig]
file_format: FileFormat = FileFormat.NETCDF
time_axis_units: str = "hours"
time_axis_name: str = "time"
[docs]@dataclasses.dataclass
class DiagTable:
"""Representation of diag_table, which controls Fortran diagnostics manager.
Note:
This implementation is based on the diag_table specification described in
https://data1.gfdl.noaa.gov/summer-school/Lectures/July16/03_Seth1_DiagManager.pdf
The MOM6 documentation has a useful description as well:
https://mom6.readthedocs.io/en/latest/api/generated/pages/Diagnostics.html.
Args:
name: label used as attribute in output diagnostic files. Cannot contain
spaces.
base_time: time to be used as reference for time coordinate units.
file_configs: sequence of DiagFileConfig's defining the diagnostics to be
output.
"""
name: str
base_time: datetime.datetime
file_configs: Sequence[DiagFileConfig]
def __post_init__(self,):
if " " in self.name:
raise ConfigError(
f"Name for diag_table cannot have spaces. Got '{self.name}'."
)
def __repr__(self):
"""Representation of diag_table expected by the Fortran model."""
lines = []
lines.append(self.name)
lines.append(self._time_to_str(self.base_time))
lines.append("")
for file_ in self.file_configs:
lines.append(self._file_repr(file_))
lines.append("")
for file_ in self.file_configs:
for field in file_.field_configs:
lines.append(self._field_repr(field, file_.name))
lines.append("")
return "\n".join(lines)
[docs] def asdict(self):
return {
"name": self.name,
"base_time": self.base_time,
"file_configs": [
dataclasses.asdict(file_, dict_factory=self._dict_with_enums_factory)
for file_ in self.file_configs
],
}
@staticmethod
def _dict_with_enums_factory(data):
def convert_value(obj):
if isinstance(obj, Enum):
return obj.value
return obj
return dict((k, convert_value(v)) for k, v in data)
def _file_repr(self, file_: DiagFileConfig) -> str:
tokens = (
file_.name,
file_.frequency,
file_.frequency_units,
file_.file_format.value,
file_.time_axis_units,
file_.time_axis_name,
)
return ", ".join(self._token_to_str(t) for t in tokens)
def _field_repr(self, field: DiagFieldConfig, file_name: str) -> str:
tokens = (
field.module_name,
field.field_name,
field.output_name,
file_name,
field.time_sampling,
field.reduction_method,
field.regional_section,
field.packing.value,
)
return ", ".join(self._token_to_str(t) for t in tokens)
@staticmethod
def _time_to_str(time: datetime.datetime) -> str:
times = [time.year, time.month, time.day, time.hour, time.minute, time.second]
return " ".join([str(t) for t in times])
@staticmethod
def _str_to_time(line: str) -> datetime.datetime:
time_sequence = [int(d) for d in re.findall(r"\d+", line)]
return datetime.datetime(*time_sequence)
@staticmethod
def _str_to_token(arg: str) -> Union[str, int]:
if arg.startswith('"') and arg.endswith('"'):
return arg.strip('"')
elif arg.lower() == ".true.":
# reduction_method can use '.true.' or '"average"' for same meaning
return "average"
elif arg.lower() == ".false.":
# reduction_method can use '.false.' or '"none"' for same meaning
return "none"
else:
return int(arg)
@staticmethod
def _token_to_str(token: Union[str, int]) -> str:
if isinstance(token, str):
return f'"{token}"'
else:
return str(token)
@staticmethod
def _parse_line(line: str) -> Sequence[Union[str, int]]:
token_strings = line.replace(" ", "").split("#")[0].strip(",").split(",")
return list(map(DiagTable._str_to_token, token_strings))
@staticmethod
def _filter_empty_lines(lines: Sequence[str]) -> Sequence[str]:
filtered_lines = [line for line in lines if len(line.strip(" ")) > 0]
filtered_lines = [line for line in filtered_lines if line.strip(" ")[0] != "#"]
return filtered_lines
@staticmethod
def _organize_lines(
parsed_lines: Sequence[str],
) -> Tuple[Sequence[str], Mapping[str, Sequence[str]]]:
"""Separate lines into 1) a sequence of lines describe files and 2) a mapping of
file name to sequence of lines for all fields in that file."""
file_lines = []
field_lines = {}
for tokens in parsed_lines:
if len(tokens) == NUMBER_OF_TOKENS_ON_FILE_LINES:
file_name = tokens[0]
file_lines.append(tokens)
field_lines[file_name] = []
elif len(tokens) == NUMBER_OF_TOKENS_ON_FIELD_LINES:
file_name = tokens[3]
if file_name not in field_lines:
raise ConfigError(
"Files must be defined before they can be used by a field in "
f"diag_table. {file_name} has not been defined yet."
)
field_lines[file_name].append(tokens)
else:
logger.warning(
f"Ignoring a line that could not be parsed in diag_table: {tokens}"
)
return file_lines, field_lines
@staticmethod
def _construct_configs_from_lines(
file_lines, field_lines
) -> Sequence[DiagFileConfig]:
file_configs = []
for file_tokens in file_lines:
file_name = file_tokens[0]
field_configs = []
for field_tokens in field_lines[file_name]:
field_configs.append(
DiagFieldConfig(
module_name=field_tokens[0],
field_name=field_tokens[1],
output_name=field_tokens[2],
time_sampling=field_tokens[4],
reduction_method=field_tokens[5],
regional_section=field_tokens[6],
packing=Packing(field_tokens[7]),
)
)
file_configs.append(
DiagFileConfig(
name=file_name,
frequency=file_tokens[1],
frequency_units=file_tokens[2],
field_configs=field_configs,
file_format=FileFormat(file_tokens[3]),
time_axis_units=file_tokens[4],
time_axis_name=file_tokens[5],
)
)
return file_configs
[docs] @classmethod
def from_dict(cls, diag_table: dict):
file_configs = [
dacite.from_dict(DiagFileConfig, f, config=dacite.Config(cast=[Enum]))
for f in diag_table["file_configs"]
]
return cls(diag_table["name"], diag_table["base_time"], file_configs)
[docs] @classmethod
def from_str(cls, diag_table: str):
"""Initialize DiagTable class from Fortran string representation."""
lines = diag_table.split("\n")
lines = cls._filter_empty_lines(lines)
name = lines[0]
base_time = cls._str_to_time(lines[1])
parsed_lines = list(map(cls._parse_line, lines[2:]))
file_lines, field_lines = cls._organize_lines(parsed_lines)
file_configs = cls._construct_configs_from_lines(file_lines, field_lines)
return cls(name, base_time, file_configs)