"""Module containing YAML interface for Pastas models using PastaStore."""
import datetime
import logging
import os
import tempfile
from contextlib import contextmanager
from copy import deepcopy
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
import pastas as ps
import yaml
logger = logging.getLogger(__name__)
def _convert_dict_dtypes_for_yaml(d: dict[str, Any]):
"""Convert dictionary values for storing in YAML format (internal function).
Parameters
----------
d : dict
dictionary to parse iteratively
"""
for k, v in d.items():
if isinstance(v, dict):
_convert_dict_dtypes_for_yaml(v)
elif isinstance(v, list) and k == "stress":
for iv in v:
if isinstance(iv, dict):
_convert_dict_dtypes_for_yaml(iv)
elif isinstance(v, pd.Timestamp):
d[k] = v.strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(v, datetime.datetime):
d[k] = pd.to_datetime(v).strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(v, pd.Timedelta):
d[k] = str(v.to_timedelta64())
elif isinstance(v, datetime.timedelta):
d[k] = str(pd.to_timedelta(v).to_timedelta64())
elif isinstance(v, np.int64):
d[k] = int(v)
elif isinstance(v, np.float64):
d[k] = float(v)
elif isinstance(v, pd.DataFrame):
d[k] = v.reset_index().to_dict(orient="records")
[docs]
def replace_ts_with_name(d, nearest=False):
"""Replace time series dict with its name in pastas model dict.
Parameters
----------
d : dict
pastas model dictionary
nearest : bool, optional
replace time series with "nearest" option. Warning, this does not
check whether the time series are actually the nearest ones!
"""
for k, v in d.items():
if k in ["oseries", "prec", "evap", "stress"]:
if isinstance(v, dict):
if nearest and k != "oseries":
if k == "stress":
d[k] = "nearest <kind>"
else:
d[k] = f"nearest {k}"
else:
d[k] = v["name"]
elif isinstance(v, list):
if nearest:
d[k] = f"nearest {len(v)} well"
else:
d[k] = [iv["name"] for iv in v]
elif isinstance(v, dict):
replace_ts_with_name(v, nearest=nearest)
[docs]
def reduce_to_minimal_dict(d: dict, keys: list[str] | None = None) -> None:
"""Reduce pastas model dictionary to a minimal form.
This minimal form strives to keep the minimal information that still
allows a model to be constructed. Users are warned, reducing a model
dictionary with this function can lead to a different model than
the original!
Parameters
----------
d : dict
pastas model in dictionary form
keys : list, optional
list of keys to keep, by default None, which defaults to:
["name", "oseries", "settings", "tmin", "tmax", "noise",
"stressmodels", "rfunc", "stress", "prec", "evap", "stressmodel"]
"""
if keys is None:
keys = [
"name",
"oseries",
"settings",
"tmin",
"tmax",
"noise",
"stressmodels",
"rfunc",
"stress",
"prec",
"evap",
"class",
]
# also keep stressmodels by adding names to keys list
if "stressmodels" in d:
keys += list(d["stressmodels"].keys())
iter_d = d.copy() # copy dictionary to use as iterator
# delete keys if not in keys list or if value is None
for k, v in iter_d.items():
if (k not in keys) or (v is None):
del d[k]
elif isinstance(v, dict):
reduce_to_minimal_dict(v, keys=keys)
[docs]
@contextmanager
def temporary_yaml_from_str(yaml: str):
"""Temporary yaml file that is deleted after usage."""
temp = tempfile.NamedTemporaryFile(delete=False)
temp.write(yaml.encode("utf-8"))
temp.close()
try:
yield Path(temp.name)
finally:
os.unlink(temp.name)
[docs]
class PastastoreYAML:
"""Class for reading/writing Pastas models in YAML format.
This class provides a more human-readable form of Pastas models in
comparison to Pastas default .pas (JSON) files. The goal is to provide
users with a simple mini-language to quickly build/test different model
structures. A PastaStore is required as input, which contains existing
models or time series required to build new models. This
class also introduces some shortcuts to simplify building models.
Shortcuts include the option to pass 'nearest' as the name of a stress,
which will automatically select the closest stress of a particular type.
Other shortcuts include certain default options when certain information
is not listed in the YAML file, that will work well in many cases.
Usage
-----
Instantiate the PastastoreYAML class::
pyaml = PastastoreYAML(pstore)
Export a Pastas model to a YAML file::
pyaml.export_model_to_yaml(ml)
Load a Pastas model from a YAML file::
models = pyaml.load_yaml("my_first_model.yaml")
Example YAML file using 'nearest'::
my_first_model: # this is the name of the model
oseries: "oseries1" # name of oseries stored in PastaStore
stressmodels:
recharge: # recognized as RechargeModel by name
prec: "nearest" # use nearest stress with kind="prec"
evap: "EV24_DEELEN" # specific station
river:
stress: "nearest riv" # nearest stress with kind="riv"
wells:
stress: "nearest 3" # nearest 3 stresses with kind="well"
stressmodel: WellModel # provide StressModel type
"""
def __init__(self, pstore):
"""Create for PastasstoreYAML class.
Parameters
----------
pstore : pastastore.PastaStore
PastaStore object containing models to be exported as YAML files
or containing time series that are referenced in YAML files.
"""
self.pstore = pstore
def _parse_rechargemodel_dict(self, d: dict, onam: str | None = None) -> dict:
"""Parse RechargeModel dictionary (internal method).
Note: supports 'nearest' as input to 'prec' and 'evap',
which will automatically select nearest stress with kind="prec" or
"evap". Requires "x" and "y" locations to be present in both oseries
and stresses metadata.
Parameters
----------
d : dict
dictionary containing RechargeModel information
onam : str, optional
name of oseries used when 'nearest' is provided as prec or evap,
by default None
Returns
-------
d : dict
dictionary that can be read by ps.io.base.load(),
containing stresses obtained from PastaStore, and setting
defaults if they were not already provided.
"""
# precipitation
prec_val = d.get("prec", "nearest")
if isinstance(prec_val, dict):
pnam = prec_val["name"]
p = self.pstore.get_stresses(pnam)
prec_val["series"] = p.squeeze()
prec = prec_val
elif prec_val.startswith("nearest"):
if onam is None:
raise ValueError("Provide oseries name when using nearest!")
if len(prec_val.split()) > 1:
kind = prec_val.split()[-1]
else:
kind = "prec"
pnam = self.pstore.get_nearest_stresses(onam, kind=kind).iloc[0, 0]
logger.info(" | using nearest stress with kind='%s': '%s'", kind, pnam)
p, pmeta = self.pstore.get_stresses(pnam, return_metadata=True)
prec = {
"name": pnam,
"settings": "prec",
"metadata": pmeta,
"series": p.squeeze(),
}
elif isinstance(prec_val, str):
pnam = d["prec"]
p, pmeta = self.pstore.get_stresses(pnam, return_metadata=True)
prec = {
"name": pnam,
"settings": "prec",
"metadata": pmeta,
"series": p.squeeze(),
}
else:
raise NotImplementedError(f"Could not parse prec value: '{prec_val}'")
d["prec"] = prec
# evaporation
evap_val = d.get("evap", "nearest")
if isinstance(evap_val, dict):
enam = evap_val["name"]
e = self.pstore.get_stresses(enam)
evap_val["series"] = e.squeeze()
evap = evap_val
elif evap_val.startswith("nearest"):
if onam is None:
raise ValueError("Provide oseries name when using nearest!")
if len(evap_val.split()) > 1:
kind = evap_val.split()[-1]
else:
kind = "evap"
enam = self.pstore.get_nearest_stresses(onam, kind=kind).iloc[0, 0]
logger.info(" | using nearest stress with kind='%s': '%s'", kind, enam)
e, emeta = self.pstore.get_stresses(enam, return_metadata=True)
evap = {
"name": enam,
"settings": "evap",
"metadata": emeta,
"series": e.squeeze(),
}
elif isinstance(evap_val, str):
enam = d["evap"]
e, emeta = self.pstore.get_stresses(enam, return_metadata=True)
evap = {
"name": enam,
"settings": "evap",
"metadata": emeta,
"series": e.squeeze(),
}
else:
raise NotImplementedError(f"Could not parse evap value: '{evap_val}'")
d["evap"] = evap
# rfunc
if "rfunc" not in d:
logger.info(" | no 'rfunc' provided, using 'Exponential'")
# for pastas >= 0.23.0, convert rfunc value to dictionary with 'class' key
elif not isinstance(d["rfunc"], dict):
d["rfunc"] = {"class": d["rfunc"]}
# stressmodel
classkey = "class"
if classkey not in d:
d[classkey] = "RechargeModel"
# recharge type (i.e. Linear, FlexModel, etc.)
if ("recharge" not in d) and (d[classkey] == "RechargeModel"):
logger.info(" | no 'recharge' type provided, using 'Linear'")
# if pastas >= 0.23.0, recharge value must be dict with class key
elif not isinstance(d["recharge"], dict):
d["recharge"] = {"class": d["recharge"]}
# tarsomodel logic
if d[classkey] == "TarsoModel":
dmin = d.get("dmin", None)
dmax = d.get("dmin", None)
oseries = d.get("oseries", None)
if ((dmin is None) or (dmax is None)) and (oseries is None):
logger.info(
" | no 'dmin/dmax' or 'oseries' provided,"
" filling in 'oseries': '%s'",
onam,
)
d["oseries"] = onam
if "oseries" in d:
onam = d["oseries"]
if isinstance(onam, str):
o = self.pstore.get_oseries(onam)
d["oseries"] = o.squeeze()
return d
def _parse_stressmodel_dict(self, d: dict, onam: str | None = None) -> dict:
"""Parse StressModel dictionary (internal method).
Note: supports 'nearest' or 'nearest <kind>' as input to 'stress',
which will automatically select nearest stress with kind=<kind>.
Requires "x" and "y" locations to be present in both oseries and
stresses metadata.
Parameters
----------
d : dict
dictionary containing WellModel information
onam : str, optional
name of oseries used when 'nearest <kind>' is provided as stress,
by default None
Returns
-------
d : dict
dictionary that can be read by ps.io.base.load(),
containing stresses obtained from PastaStore, and setting
defaults if they were not already provided.
"""
# get stress
snam = d.pop("stress")
# if list, obtain first and only entry
if isinstance(snam, list):
snam = snam[0]
# if str, either name of single series or 'nearest <kind>'
if snam.startswith("nearest"):
if len(snam.split()) > 1:
kind = snam.split()[-1]
else:
kind = None
if kind == "oseries":
snam = self.pstore.get_nearest_oseries(onam).iloc[0, 0]
else:
snam = self.pstore.get_nearest_stresses(onam, kind=kind).iloc[0, 0]
logger.info(" | using nearest stress with kind='%s': %s", kind, snam)
s, smeta = self.pstore.get_stresses(snam, return_metadata=True)
s = {
"name": snam,
"settings": d.pop("settings", None),
"metadata": smeta,
"series": s.squeeze(),
}
d["stress"] = s
# use stress name if not provided
if "name" not in d:
d["name"] = snam
# rfunc
if "rfunc" not in d:
logger.info(" | no 'rfunc' provided, using 'Gamma'")
d["rfunc"] = {"class": "Gamma"}
# for pastas >= 0.23.0, convert rfunc value to dictionary with 'class' key
elif not isinstance(d["rfunc"], dict):
d["rfunc"] = {"class": d["rfunc"]}
return d
def _parse_wellmodel_dict(self, d: dict, onam: str | None = None) -> dict:
"""Parse WellModel dictionary (internal method).
Note: supports 'nearest' or 'nearest <number> <kind>' as input to
'stress', which will automatically select nearest or <number> of
nearest stress(es) with kind=<kind>. Requires "x" and "y" locations to
be present in both oseries and stresses metadata.
Parameters
----------
d : dict
dictionary containing WellModel information
onam : str, optional
name of oseries used when 'nearest' is provided as stress,
by default None
Returns
-------
d : dict
dictionary that can be read by ps.io.base.load(),
containing stresses obtained from PastaStore, and setting
defaults if they were not already provided.
"""
# parse stress
snames = d.pop("stress")
# if str, either name of single series or 'nearest <n> <kind>'
if isinstance(snames, str):
if snames.startswith("nearest"):
if len(snames.split()) == 3:
n = int(snames.split()[1])
kind = snames.split()[2]
elif len(snames.split()) == 2:
try:
n = int(snames.split()[1])
except ValueError as e:
raise ValueError(
f"Could not parse: '{snames}'! "
"When using option 'nearest' for WellModel, "
"use 'nearest <n>' or 'nearest <n> <kind>'!"
) from e
kind = "well"
elif len(snames.split()) == 1:
n = 1
kind = "well"
snames = (
self.pstore.get_nearest_stresses(onam, kind=kind, n=n)
.iloc[0]
.values
)
logger.info(
" | using %d nearest stress(es) with kind='%s': %s",
n,
kind,
snames,
)
else:
snames = [snames]
# get time series
slist = []
for snam in snames:
s, smeta = self.pstore.get_stresses(snam, return_metadata=True)
sdict = {
"name": snam,
"settings": "well",
"metadata": smeta,
"series": s.squeeze(),
}
slist.append(sdict)
d["stress"] = slist
# get distances
if "distances" not in d:
d["distances"] = self.pstore.get_distances(
oseries=onam, stresses=snames
).values.squeeze()
# use default name if not provided
if "name" not in d:
d["name"] = "wells"
# rfunc
if "rfunc" not in d:
logger.info(" | no 'rfunc' provided, using 'HantushWellModel'")
d["rfunc"] = {"class": "HantushWellModel"}
# for pastas >= 0.23.0, convert rfunc value to dictionary with 'class' key
elif not isinstance(d["rfunc"], dict):
d["rfunc"] = {"class": d["rfunc"]}
if "up" not in d:
logger.info(
" | no 'up' provided, set to 'False', "
"(i.e. pumping rate is positive for extraction)."
)
d["up"] = False
return d
[docs]
def construct_mldict(self, mlyml: dict, mlnam: str) -> dict:
"""Create Pastas.Model dictionary from YAML dictionary.
Parameters
----------
mlyml : dict
YAML dictionary
mlnam : str
model name
Returns
-------
dict
dictionary of pastas.Model that can be read by Pastas
"""
# get oseries + metadata
if isinstance(mlyml["oseries"], dict):
onam = str(mlyml["oseries"]["name"])
_ = mlyml.pop("oseries")
else:
onam = str(mlyml.pop("oseries"))
logger.info("Building model '%s' for oseries '%s'", mlnam, onam)
o, ometa = self.pstore.get_oseries(onam, return_metadata=True)
# create model to obtain default model settings
ml = ps.Model(o.squeeze(), name=mlnam, metadata=ometa)
mldict = ml.to_dict(series=True)
# update with stored model settings
if "settings" in mlyml:
mldict["settings"].update(mlyml["settings"])
# stressmodels
for smnam, smyml in mlyml["stressmodels"].items():
# set name if not provided
if smyml is not None:
name = smyml.get("name", smnam)
else:
name = smnam
logger.info("| parsing stressmodel: '%s'", name)
# check whether smtyp is defined
classkey = "class"
if smyml is not None:
if classkey in smyml:
smtyp = True
else:
smtyp = False
else:
smtyp = False
# check if RechargeModel based on name if smtyp not defined
if (
smnam.lower() in ["rch", "rech", "recharge", "rechargemodel"]
) and not smtyp:
logger.info(
"| no StressModel type provided, using 'RechargeModel' based on "
"stressmodel name."
)
# check if stressmodel dictionary is empty, create (nearly
# empty) dict so defaults are used
if smyml is None:
mlyml["stressmodels"][smnam] = {"name": "recharge"}
smyml = mlyml["stressmodels"][smnam]
if "name" not in smyml:
smyml["name"] = smnam
smtyp = smyml.get(classkey, "RechargeModel")
else:
# if no info is provided, raise error,
# cannot make any assumptions for non-RechargeModels
if smyml is None:
raise ValueError(
f"Insufficient information for stressmodel '{name}'!"
)
# get stressmodel type, with default StressModel
if classkey in smyml:
smtyp = smyml[classkey]
else:
logger.info(
"| no stressmodel class type provided, using 'StressModel'"
)
smtyp = "StressModel"
# parse dictionary based on smtyp
if smtyp in ["RechargeModel", "TarsoModel"]:
# parse RechargeModel
sm = self._parse_rechargemodel_dict(smyml, onam=onam)
# turn off constant for TarsoModel
if smtyp == "TarsoModel":
mldict["constant"] = False
elif smtyp == "StressModel":
# parse StressModel
sm = self._parse_stressmodel_dict(smyml, onam=onam)
elif smtyp == "WellModel":
# parse WellModel
sm = self._parse_wellmodel_dict(smyml, onam=onam)
else:
raise NotImplementedError(
f"PastaStore.yaml interface does not (yet) support '{smtyp}'!"
)
# add to list
smyml.update(sm)
# update model dict w/ default settings with loaded data
mldict.update(mlyml)
# add name to dictionary if not already present
if "name" not in mldict:
mldict["name"] = mlnam
# convert warmup and time_offset to panads.Timedelta
if "warmup" in mldict["settings"]:
mldict["settings"]["warmup"] = pd.Timedelta(mldict["settings"]["warmup"])
if "time_offset" in mldict["settings"]:
mldict["settings"]["time_offset"] = pd.Timedelta(
mldict["settings"]["time_offset"]
)
return mldict
[docs]
def load(self, fyaml: str) -> list[ps.Model]:
"""Load Pastas YAML file.
Note: currently supports RechargeModel, StressModel and WellModel.
Parameters
----------
fyaml : str
YAML as str or path to file
Returns
-------
models : list
list containing pastas model(s)
Raises
------
ValueError
if insufficient information is provided to construct pastas model
NotImplementedError
if unsupported stressmodel is encountered
"""
if "\n" in fyaml or "\r" in fyaml:
with temporary_yaml_from_str(fyaml) as fyaml:
with Path(fyaml).open("r", encoding="utf-8") as f:
yml = yaml.load(f, Loader=yaml.CFullLoader)
elif Path(fyaml).exists():
with Path(fyaml).open("r", encoding="utf-8") as f:
yml = yaml.load(f, Loader=yaml.CFullLoader)
else:
raise ValueError(
"Could not read YAML file! Check if input is valid YAML "
"or valid path to YAML file."
)
models = []
for mlnam in yml.keys():
mlyml = yml[mlnam]
mldict = self.construct_mldict(mlyml, mlnam)
# Use pastas' internal _load_model - required for model reconstruction
ml = ps.io.base._load_model(mldict) # noqa: SLF001
models.append(ml)
return models
[docs]
def export_stored_models_per_oseries(
self,
oseries: list[str] | str | None = None,
outdir: Path | str = ".",
minimal_yaml: bool | None = False,
use_nearest: bool | None = False,
):
"""Export store models grouped per oseries (location) to YAML file(s).
Note: The oseries names are used as file names.
Parameters
----------
oseries : list[str], optional
list of oseries (location) names, by default None, which uses
all stored oseries for which there are models.
outdir : str, optional
path to output directory, by default "." (current directory)
minimal_yaml : bool, optional
reduce yaml file to include the minimum amount of information
that will still construct a model. Users are warned, using this
option does not guarantee the same model will be constructed
as the one that was exported! Default is False.
use_nearest : bool, optional
if True, replaces time series with "nearest <kind>", filling in
kind where possible. Warning! This does not check whether
the time series are actually the nearest ones! Only used
when minimal_yaml=True. Default is False.
"""
onames = self.pstore.conn.parse_names(oseries, "oseries")
for onam in onames:
try:
onam = int(onam)
except ValueError:
pass
# check if any models exist for oseries
if onam not in self.pstore.oseries_models:
continue
model_list = self.pstore.get_models(
self.pstore.oseries_models[onam],
return_dict=True,
squeeze=False,
)
model_dicts = {}
for d in model_list:
if minimal_yaml:
replace_ts_with_name(d, nearest=use_nearest)
reduce_to_minimal_dict(d)
_convert_dict_dtypes_for_yaml(d)
name = d.pop("name")
model_dicts[name] = d
with (Path(outdir) / f"{onam}.yaml").open("w", encoding="utf-8") as f:
yaml.dump(model_dicts, f, Dumper=yaml.CDumper)
[docs]
def export_models(
self,
models: list[ps.Model] | list[dict] | None = None,
modelnames: list[str] | str | None = None,
outdir: str | Path = ".",
minimal_yaml: bool | None = False,
use_nearest: bool | None = False,
split: bool | None = True,
filename: str = "pastas_models.yaml",
):
"""Export (stored) models to yaml file(s).
Parameters
----------
models : list of ps.Model or dict, optional
pastas Models to write to yaml file(s), if not provided,
uses modelnames to collect stored models to export.
modelnames : list[str], optional
list of model names to export, by default None, which uses
all stored models.
outdir : str, optional
path to output directory, by default "." (current directory)
minimal_yaml : bool, optional
reduce yaml file to include the minimum amount of information
that will still construct a model. Users are warned, using this
option does not guarantee the same model will be constructed
as the one that was exported! Default is False.
use_nearest : bool, optional
if True, replaces time series with "nearest <kind>", filling in
kind where possible. Warning! This does not check whether
the time series are actually the nearest ones! Only used
when minimal_yaml=True. Default is False.
split : bool, optional
if True, split into separate yaml files, otherwise store all
in the same file. The model names are used as file names.
filename : str, optional
filename for YAML file, only used if `split=False`
"""
if models is None:
modelnames = self.pstore.conn.parse_names(modelnames, "models")
model_list = self.pstore.get_models(
modelnames, return_dict=True, squeeze=False
)
else:
model_list = [
iml.to_dict(series=False) if isinstance(iml, ps.Model) else iml
for iml in models
]
# each model in separate file
if split:
for ml in model_list:
self.export_model(ml, outdir=outdir, minimal_yaml=minimal_yaml)
# all models in same file
else:
model_dicts = {}
for d in model_list:
if minimal_yaml:
replace_ts_with_name(d, nearest=use_nearest)
reduce_to_minimal_dict(d)
_convert_dict_dtypes_for_yaml(d)
name = d.pop("name")
model_dicts[name] = d
with (Path(outdir) / filename).open("w", encoding="utf-8") as f:
yaml.dump(model_dicts, f, Dumper=yaml.CDumper)
[docs]
@staticmethod
def export_model(
ml: ps.Model | dict,
outdir: Path | str = ".",
minimal_yaml: bool | None = False,
use_nearest: bool | None = False,
):
"""Write single pastas model to YAML file.
Parameters
----------
ml : ps.Model or dict
pastas model instance or dictionary representing a pastas model
outdir : str, optional
path to output directory, by default "." (current directory)
minimal_yaml : bool, optional
reduce yaml file to include the minimum amount of information
that will still construct a model. Users are warned, using this
option does not guarantee the same model will be constructed
as the one that was exported! Default is False.
use_nearest : bool, optional
if True, replaces time series with "nearest <kind>", filling in
kind where possible. Warning! This does not check whether
the time series are actually the nearest ones! Only used
when minimal_yaml=True. Default is False.
"""
if isinstance(ml, dict):
name = ml["name"]
else:
name = ml.name
with (Path(outdir) / f"{name}.yaml").open("w", encoding="utf-8") as f:
if isinstance(ml, ps.Model):
mldict = deepcopy(ml.to_dict(series=False))
elif isinstance(ml, dict):
mldict = ml
else:
raise TypeError("Only accepts dictionary or pastas.Model!")
mlname = mldict.pop("name")
if minimal_yaml:
replace_ts_with_name(mldict, nearest=use_nearest)
reduce_to_minimal_dict(mldict)
_convert_dict_dtypes_for_yaml(mldict)
yaml.dump({mlname: mldict}, f, Dumper=yaml.CDumper)