Source code for pastastore.yaml_interface

import datetime
import logging
import os
from copy import deepcopy
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd
import pastas as ps
import yaml

from pastastore.version import PASTAS_LEQ_022


logger = logging.getLogger(__name__)


def _convert_dict_dtypes_for_yaml(d: Dict[str, Any]):
    """Internal method to convert dictionary values for storing in YAML format.

    Parameters
    ----------
    d : dict
        dictionary to parse iteratively
    """
    for k, v in d.items():
        if isinstance(v, dict):
            _convert_dict_dtypes_for_yaml(v)
        elif isinstance(v, list) and k == "stress":
            for iv in v:
                if isinstance(iv, dict):
                    _convert_dict_dtypes_for_yaml(iv)
        elif isinstance(v, pd.Timestamp):
            d[k] = v.strftime("%Y-%m-%d %H:%M:%S")
        elif isinstance(v, datetime.datetime):
            d[k] = pd.to_datetime(v).strftime("%Y-%m-%d %H:%M:%S")
        elif isinstance(v, pd.Timedelta):
            d[k] = v.to_timedelta64().__str__()
        elif isinstance(v, datetime.timedelta):
            d[k] = pd.to_timedelta(v).to_timedelta64().__str__()
        elif isinstance(v, np.int64):
            d[k] = int(v)
        elif isinstance(v, np.float64):
            d[k] = float(v)
        elif isinstance(v, pd.DataFrame):
            d[k] = v.reset_index().to_dict(orient="records")


[docs] def replace_ts_with_name(d, nearest=False): """Replace time series dict with its name in pastas model dict. Parameters ---------- d : dict pastas model dictionary nearest : bool, optional replace time series with "nearest" option. Warning, this does not check whether the time series are actually the nearest ones! """ for k, v in d.items(): if k in ["oseries", "prec", "evap", "stress"]: if isinstance(v, dict): if nearest and k != "oseries": if k == "stress": d[k] = "nearest <kind>" else: d[k] = f"nearest {k}" else: d[k] = v["name"] elif isinstance(v, list): if nearest: d[k] = f"nearest {len(v)} well" else: d[k] = [iv["name"] for iv in v] elif isinstance(v, dict): replace_ts_with_name(v, nearest=nearest)
[docs] def reduce_to_minimal_dict(d, keys=None): """Reduce pastas model dictionary to a minimal form. This minimal form strives to keep the minimal information that still allows a model to be constructed. Users are warned, reducing a model dictionary with this function can lead to a different model than the original! Parameters ---------- d : dict pastas model in dictionary form keys : list, optional list of keys to keep, by default None, which defaults to: ["name", "oseries", "settings", "tmin", "tmax", "noise", "stressmodels", "rfunc", "stress", "prec", "evap", "stressmodel"] """ if keys is None: keys = [ "name", "oseries", "settings", "tmin", "tmax", "noise", "stressmodels", "rfunc", "stress", "prec", "evap", "stressmodel" if PASTAS_LEQ_022 else "class", ] # also keep stressmodels by adding names to keys list if "stressmodels" in d: keys += list(d["stressmodels"].keys()) iter_d = d.copy() # copy dictionary to use as iterator # delete keys if not in keys list or if value is None for k, v in iter_d.items(): if (k not in keys) or (v is None): del d[k] elif isinstance(v, dict): reduce_to_minimal_dict(v, keys=keys)
[docs] class PastastoreYAML: """Class for reading/writing Pastas models in YAML format. This class provides a more human-readable form of Pastas models in comparison to Pastas default .pas (JSON) files. The goal is to provide users with a simple mini-language to quickly build/test different model structures. A PastaStore is required as input, which contains existing models or time series required to build new models. This class also introduces some shortcuts to simplify building models. Shortcuts include the option to pass 'nearest' as the name of a stress, which will automatically select the closest stress of a particular type. Other shortcuts include certain default options when certain information is not listed in the YAML file, that will work well in many cases. Usage ----- Instantiate the PastastoreYAML class:: pyaml = PastastoreYAML(pstore) Export a Pastas model to a YAML file:: pyaml.export_model_to_yaml(ml) Load a Pastas model from a YAML file:: models = pyaml.load_yaml("my_first_model.yaml") Example YAML file using 'nearest':: my_first_model: # this is the name of the model oseries: "oseries1" # name of oseries stored in PastaStore stressmodels: recharge: # recognized as RechargeModel by name prec: "nearest" # use nearest stress with kind="prec" evap: "EV24_DEELEN" # specific station river: stress: "nearest riv" # nearest stress with kind="riv" wells: stress: "nearest 3" # nearest 3 stresses with kind="well" stressmodel: WellModel # provide StressModel type """ def __init__(self, pstore): """Constructor for PastasstoreYAML class. Parameters ---------- pstore : pastastore.PastaStore PastaStore object containing models to be exported as YAML files or containing time series that are referenced in YAML files. """ self.pstore = pstore def _parse_rechargemodel_dict(self, d: Dict, onam: Optional[str] = None) -> Dict: """Internal method to parse RechargeModel dictionary. Note: supports 'nearest' as input to 'prec' and 'evap', which will automatically select nearest stress with kind="prec" or "evap". Requires "x" and "y" locations to be present in both oseries and stresses metadata. Parameters ---------- d : dict dictionary containing RechargeModel information onam : str, optional name of oseries used when 'nearest' is provided as prec or evap, by default None Returns ------- d : dict dictionary that can be read by ps.io.base.load(), containing stresses obtained from PastaStore, and setting defaults if they were not already provided. """ # precipitation prec_val = d.get("prec", "nearest") if isinstance(prec_val, dict): pnam = prec_val["name"] p = self.pstore.get_stresses(pnam) prec_val["series"] = p prec = prec_val elif prec_val.startswith("nearest"): if onam is None: raise ValueError("Provide oseries name when using nearest!") if len(prec_val.split()) > 1: kind = prec_val.split()[-1] else: kind = "prec" pnam = self.pstore.get_nearest_stresses(onam, kind=kind).iloc[0, 0] logger.info(f" | using nearest stress with kind='{kind}': '{pnam}'") p, pmeta = self.pstore.get_stresses(pnam, return_metadata=True) prec = { "name": pnam, "settings": "prec", "metadata": pmeta, "series": p, } elif isinstance(prec_val, str): pnam = d["prec"] p, pmeta = self.pstore.get_stresses(pnam, return_metadata=True) prec = { "name": pnam, "settings": "prec", "metadata": pmeta, "series": p, } else: raise NotImplementedError(f"Could not parse prec value: '{prec_val}'") d["prec"] = prec # evaporation evap_val = d.get("evap", "nearest") if isinstance(evap_val, dict): enam = evap_val["name"] e = self.pstore.get_stresses(enam) evap_val["series"] = e evap = evap_val elif evap_val.startswith("nearest"): if onam is None: raise ValueError("Provide oseries name when using nearest!") if len(evap_val.split()) > 1: kind = evap_val.split()[-1] else: kind = "evap" enam = self.pstore.get_nearest_stresses(onam, kind=kind).iloc[0, 0] logger.info(f" | using nearest stress with kind='{kind}': '{enam}'") e, emeta = self.pstore.get_stresses(enam, return_metadata=True) evap = { "name": enam, "settings": "evap", "metadata": emeta, "series": e, } elif isinstance(evap_val, str): enam = d["evap"] e, emeta = self.pstore.get_stresses(enam, return_metadata=True) evap = { "name": enam, "settings": "evap", "metadata": emeta, "series": e, } else: raise NotImplementedError(f"Could not parse evap value: '{evap_val}'") d["evap"] = evap # rfunc if "rfunc" not in d: logger.info(" | no 'rfunc' provided, using 'Exponential'") # for pastas >= 0.23.0, convert rfunc value to dictionary with 'class' key elif not isinstance(d["rfunc"], dict) and not PASTAS_LEQ_022: d["rfunc"] = {"class": d["rfunc"]} # stressmodel classkey = "stressmodel" if PASTAS_LEQ_022 else "class" if classkey not in d: d[classkey] = "RechargeModel" # recharge type (i.e. Linear, FlexModel, etc.) if ("recharge" not in d) and (d[classkey] == "RechargeModel"): logger.info(" | no 'recharge' type provided, using 'Linear'") # if pastas >= 0.23.0, recharge value must be dict with class key elif not isinstance(d["recharge"], dict) and not PASTAS_LEQ_022: d["recharge"] = {"class": d["recharge"]} # tarsomodel logic if d[classkey] == "TarsoModel": dmin = d.get("dmin", None) dmax = d.get("dmin", None) oseries = d.get("oseries", None) if ((dmin is None) or (dmax is None)) and (oseries is None): logger.info( " | no 'dmin/dmax' or 'oseries' provided," f" filling in 'oseries': '{onam}'" ) d["oseries"] = onam if "oseries" in d: onam = d["oseries"] if isinstance(onam, str): o = self.pstore.get_oseries(onam) d["oseries"] = o return d def _parse_stressmodel_dict(self, d: Dict, onam: Optional[str] = None) -> Dict: """Internal method to parse StressModel dictionary. Note: supports 'nearest' or 'nearest <kind>' as input to 'stress', which will automatically select nearest stress with kind=<kind>. Requires "x" and "y" locations to be present in both oseries and stresses metadata. Parameters ---------- d : dict dictionary containing WellModel information onam : str, optional name of oseries used when 'nearest <kind>' is provided as stress, by default None Returns ------- d : dict dictionary that can be read by ps.io.base.load(), containing stresses obtained from PastaStore, and setting defaults if they were not already provided. """ # get stress snam = d.pop("stress") # if list, obtain first and only entry if isinstance(snam, list): snam = snam[0] # if str, either name of single series or 'nearest <kind>' if snam.startswith("nearest"): if len(snam.split()) > 1: kind = snam.split()[-1] else: kind = None if kind == "oseries": snam = self.pstore.get_nearest_oseries(onam).iloc[0, 0] else: snam = self.pstore.get_nearest_stresses(onam, kind=kind).iloc[0, 0] logger.info(f" | using nearest stress with kind='{kind}': {snam}") s, smeta = self.pstore.get_stresses(snam, return_metadata=True) s = { "name": snam, "settings": d.pop("settings", None), "metadata": smeta, "series": s, } d["stress"] = [s] if PASTAS_LEQ_022 else s # use stress name if not provided if "name" not in d: d["name"] = snam # rfunc if "rfunc" not in d: logger.info(" | no 'rfunc' provided, using 'Gamma'") d["rfunc"] = "Gamma" if PASTAS_LEQ_022 else {"class": "Gamma"} # for pastas >= 0.23.0, convert rfunc value to dictionary with 'class' key elif not isinstance(d["rfunc"], dict) and not PASTAS_LEQ_022: d["rfunc"] = {"class": d["rfunc"]} return d def _parse_wellmodel_dict(self, d: Dict, onam: Optional[str] = None) -> Dict: """Internal method to parse WellModel dictionary. Note: supports 'nearest' or 'nearest <number> <kind>' as input to 'stress', which will automatically select nearest or <number> of nearest stress(es) with kind=<kind>. Requires "x" and "y" locations to be present in both oseries and stresses metadata. Parameters ---------- d : dict dictionary containing WellModel information onam : str, optional name of oseries used when 'nearest' is provided as stress, by default None Returns ------- d : dict dictionary that can be read by ps.io.base.load(), containing stresses obtained from PastaStore, and setting defaults if they were not already provided. """ # parse stress snames = d.pop("stress") # if str, either name of single series or 'nearest <n> <kind>' if isinstance(snames, str): if snames.startswith("nearest"): if len(snames.split()) == 3: n = int(snames.split()[1]) kind = snames.split()[2] elif len(snames.split()) == 2: try: n = int(snames.split()[1]) except ValueError: raise ValueError( f"Could not parse: '{snames}'! " "When using option 'nearest' for WellModel, " "use 'nearest <n>' or 'nearest <n> <kind>'!" ) kind = "well" elif len(snames.split()) == 1: n = 1 kind = "well" snames = ( self.pstore.get_nearest_stresses(onam, kind=kind, n=n) .iloc[0] .values ) logger.info( f" | using {n} nearest stress(es) with kind='{kind}': " f"{snames}" ) else: snames = [snames] # get time series slist = [] for snam in snames: s, smeta = self.pstore.get_stresses(snam, return_metadata=True) sdict = { "name": snam, "settings": "well", "metadata": smeta, "series": s, } slist.append(sdict) d["stress"] = slist # get distances if "distances" not in d: d["distances"] = self.pstore.get_distances( oseries=onam, stresses=snames ).values.squeeze() # use default name if not provided if "name" not in d: d["name"] = "wells" # rfunc if "rfunc" not in d: logger.info(" | no 'rfunc' provided, using 'HantushWellModel'") d["rfunc"] = ( "HantushWellModel" if PASTAS_LEQ_022 else {"class": "HantushWellModel"} ) # for pastas >= 0.23.0, convert rfunc value to dictionary with 'class' key elif not isinstance(d["rfunc"], dict) and not PASTAS_LEQ_022: d["rfunc"] = {"class": d["rfunc"]} if "up" not in d: logger.info( " | no 'up' provided, set to 'False', " "(i.e. pumping rate is positive for extraction)." ) d["up"] = False return d def construct_mldict(self, mlyml: dict, mlnam: str) -> dict: # get oseries + metadata if isinstance(mlyml["oseries"], dict): onam = str(mlyml["oseries"]["name"]) _ = mlyml.pop("oseries") else: onam = str(mlyml.pop("oseries")) logger.info(f"Building model '{mlnam}' for oseries '{onam}'") o, ometa = self.pstore.get_oseries(onam, return_metadata=True) # create model to obtain default model settings ml = ps.Model(o, name=mlnam, metadata=ometa) mldict = ml.to_dict(series=True) # update with stored model settings if "settings" in mlyml: mldict["settings"].update(mlyml["settings"]) # stressmodels for smnam, smyml in mlyml["stressmodels"].items(): # set name if not provided if smyml is not None: name = smyml.get("name", smnam) else: name = smnam logger.info(f"| parsing stressmodel: '{name}'") # check whether smtyp is defined classkey = "stressmodel" if PASTAS_LEQ_022 else "class" if smyml is not None: if PASTAS_LEQ_022: if "class" in smyml: smyml["stressmodel"] = smyml.pop("class") if classkey in smyml: smtyp = True else: smtyp = False else: smtyp = False # check if RechargeModel based on name if smtyp not defined if ( smnam.lower() in ["rch", "rech", "recharge", "rechargemodel"] ) and not smtyp: logger.info("| assuming RechargeModel based on stressmodel name.") # check if stressmodel dictionary is empty, create (nearly # empty) dict so defaults are used if smyml is None: mlyml["stressmodels"][smnam] = {"name": "recharge"} smyml = mlyml["stressmodels"][smnam] if "name" not in smyml: smyml["name"] = smnam smtyp = smyml.get(classkey, "RechargeModel") else: # if no info is provided, raise error, # cannot make any assumptions for non-RechargeModels if smyml is None: raise ValueError( "Insufficient information " f"for stressmodel '{name}'!" ) # get stressmodel type, with default StressModel if classkey in smyml: smtyp = smyml[classkey] else: logger.info( "| no stressmodel class type provided, " "using 'StressModel'" ) smtyp = "StressModel" # parse dictionary based on smtyp if smtyp in ["RechargeModel", "TarsoModel"]: # parse RechargeModel sm = self._parse_rechargemodel_dict(smyml, onam=onam) # turn off constant for TarsoModel if smtyp == "TarsoModel": mldict["constant"] = False elif smtyp == "StressModel": # parse StressModel sm = self._parse_stressmodel_dict(smyml, onam=onam) elif smtyp == "WellModel": # parse WellModel sm = self._parse_wellmodel_dict(smyml, onam=onam) else: raise NotImplementedError( "PastaStore.yaml interface does " f"not (yet) support '{smtyp}'!" ) # add to list smyml.update(sm) # update model dict w/ default settings with loaded data mldict.update(mlyml) # add name to dictionary if not already present if "name" not in mldict: mldict["name"] = mlnam # convert warmup and time_offset to panads.Timedelta if "warmup" in mldict["settings"]: mldict["settings"]["warmup"] = pd.Timedelta(mldict["settings"]["warmup"]) if "time_offset" in mldict["settings"]: mldict["settings"]["time_offset"] = pd.Timedelta( mldict["settings"]["time_offset"] ) return mldict
[docs] def load(self, fyaml: str) -> List[ps.Model]: """Load Pastas YAML file. Note: currently supports RechargeModel, StressModel and WellModel. Parameters ---------- fyaml : str path to file Returns ------- models : list list containing pastas model(s) Raises ------ ValueError if insufficient information is provided to construct pastas model NotImplementedError if unsupported stressmodel is encountered """ with open(fyaml, "r") as f: yml = yaml.load(f, Loader=yaml.CFullLoader) models = [] for mlnam in yml.keys(): mlyml = yml[mlnam] mldict = self.construct_mldict(mlyml, mlnam) # load model ml = ps.io.base._load_model(mldict) models.append(ml) return models
[docs] def export_stored_models_per_oseries( self, oseries: Optional[Union[List[str], str]] = None, outdir: Optional[str] = ".", minimal_yaml: Optional[bool] = False, use_nearest: Optional[bool] = False, ): """Export store models grouped per oseries (location) to YAML file(s). Note: The oseries names are used as file names. Parameters ---------- oseries : list of str, optional list of oseries (location) names, by default None, which uses all stored oseries for which there are models. outdir : str, optional path to output directory, by default "." (current directory) minimal_yaml : bool, optional reduce yaml file to include the minimum amount of information that will still construct a model. Users are warned, using this option does not guarantee the same model will be constructed as the one that was exported! Default is False. use_nearest : bool, optional if True, replaces time series with "nearest <kind>", filling in kind where possible. Warning! This does not check whether the time series are actually the nearest ones! Only used when minimal_yaml=True. Default is False. """ onames = self.pstore.conn._parse_names(oseries, "oseries") for onam in onames: try: onam = int(onam) except ValueError: pass # check if any models exist for oseries if onam not in self.pstore.oseries_models: continue model_list = self.pstore.get_models( self.pstore.oseries_models[onam], return_dict=True, squeeze=False, ) model_dicts = {} for d in model_list: if minimal_yaml: replace_ts_with_name(d, nearest=use_nearest) reduce_to_minimal_dict(d) _convert_dict_dtypes_for_yaml(d) name = d.pop("name") model_dicts[name] = d with open(os.path.join(outdir, f"{onam}.yaml"), "w") as f: yaml.dump(model_dicts, f, Dumper=yaml.CDumper)
[docs] def export_models( self, models: Optional[Union[List[ps.Model], List[Dict]]] = None, modelnames: Optional[Union[List[str], str]] = None, outdir: Optional[str] = ".", minimal_yaml: Optional[bool] = False, use_nearest: Optional[bool] = False, split: Optional[bool] = True, filename: Optional[str] = "pastas_models.yaml", ): """Export (stored) models to yaml file(s). Parameters ---------- models : list of ps.Model or dict, optional pastas Models to write to yaml file(s), if not provided, uses modelnames to collect stored models to export. modelnames : list of str, optional list of model names to export, by default None, which uses all stored models. outdir : str, optional path to output directory, by default "." (current directory) minimal_yaml : bool, optional reduce yaml file to include the minimum amount of information that will still construct a model. Users are warned, using this option does not guarantee the same model will be constructed as the one that was exported! Default is False. use_nearest : bool, optional if True, replaces time series with "nearest <kind>", filling in kind where possible. Warning! This does not check whether the time series are actually the nearest ones! Only used when minimal_yaml=True. Default is False. split : bool, optional if True, split into separate yaml files, otherwise store all in the same file. The model names are used as file names. filename : str, optional filename for YAML file, only used if `split=False` """ if models is None: modelnames = self.pstore.conn._parse_names(modelnames, "models") model_list = self.pstore.get_models( modelnames, return_dict=True, squeeze=False ) else: model_list = [ iml.to_dict(series=False) if isinstance(iml, ps.Model) else iml for iml in models ] # each model in separate file if split: for ml in model_list: self.export_model(ml, outdir=outdir, minimal_yaml=minimal_yaml) # all models in same file else: model_dicts = {} for d in model_list: if minimal_yaml: replace_ts_with_name(d, nearest=use_nearest) reduce_to_minimal_dict(d) _convert_dict_dtypes_for_yaml(d) name = d.pop("name") model_dicts[name] = d with open(os.path.join(outdir, filename), "w") as f: yaml.dump(model_dicts, f, Dumper=yaml.CDumper)
[docs] @staticmethod def export_model( ml: Union[ps.Model, dict], outdir: Optional[str] = ".", minimal_yaml: Optional[bool] = False, use_nearest: Optional[bool] = False, ): """Write single pastas model to YAML file. Parameters ---------- ml : ps.Model or dict pastas model instance or dictionary representing a pastas model outdir : str, optional path to output directory, by default "." (current directory) minimal_yaml : bool, optional reduce yaml file to include the minimum amount of information that will still construct a model. Users are warned, using this option does not guarantee the same model will be constructed as the one that was exported! Default is False. use_nearest : bool, optional if True, replaces time series with "nearest <kind>", filling in kind where possible. Warning! This does not check whether the time series are actually the nearest ones! Only used when minimal_yaml=True. Default is False. """ if isinstance(ml, dict): name = ml["name"] else: name = ml.name with open(os.path.join(outdir, f"{name}.yaml"), "w") as f: if isinstance(ml, ps.Model): mldict = deepcopy(ml.to_dict(series=False)) elif isinstance(ml, dict): mldict = ml else: raise TypeError("Only accepts dictionary or pastas.Model!") mlname = mldict.pop("name") if minimal_yaml: replace_ts_with_name(mldict, nearest=use_nearest) reduce_to_minimal_dict(mldict) _convert_dict_dtypes_for_yaml(mldict) yaml.dump({mlname: mldict}, f, Dumper=yaml.CDumper)