import json
import os
import warnings
from typing import List, Optional, Tuple, Union
import numpy as np
import pandas as pd
import pastas as ps
from packaging.version import parse as parse_version
from pastas.io.pas import pastas_hook
from tqdm.auto import tqdm
from pastastore.base import BaseConnector
from pastastore.connectors import DictConnector
from pastastore.plotting import Maps, Plots
from pastastore.util import _custom_warning
from pastastore.yaml_interface import PastastoreYAML
FrameorSeriesUnion = Union[pd.DataFrame, pd.Series]
warnings.showwarning = _custom_warning
[docs]
class PastaStore:
"""PastaStore object for managing pastas time series and models.
Requires a Connector object to provide the interface to
the database. Different Connectors are available, e.g.:
- PasConnector for storing all data as .pas (JSON) files on disk (recommended)
- DictConenctor for storing all data in dictionaries (in-memory)
- ArcticConnector for saving data to MongoDB using the Arctic module
- PystoreConnector for saving data to disk using the Pystore module
Parameters
----------
connector : Connector object
object that provides the interface to the
database, e.g. ArcticConnector (see pastastore.connectors)
name : str, optional
name of the PastaStore, by default takes the name of the Connector object
"""
def __init__(
self,
connector: Optional[BaseConnector] = None,
name: Optional[str] = None,
):
"""Initialize PastaStore for managing pastas time series and models.
Parameters
----------
connector : Connector object, optional
object that provides the connection to the database. Default is None, which
will create a DictConnector. This default Connector does not store data on
disk.
name : str, optional
name of the PastaStore, if not provided uses the Connector name
"""
if isinstance(connector, str):
raise DeprecationWarning(
"PastaStore expects the connector as the first argument since v1.1!"
)
if connector is None:
connector = DictConnector("pastas_db")
self.conn = connector
self.name = name if name is not None else self.conn.name
self._register_connector_methods()
# register map, plot and yaml classes
self.maps = Maps(self)
self.plots = Plots(self)
self.yaml = PastastoreYAML(self)
def _register_connector_methods(self):
"""Internal method for registering connector methods."""
methods = [
func
for func in dir(self.conn)
if callable(getattr(self.conn, func)) and not func.startswith("_")
]
for meth in methods:
setattr(self, meth, getattr(self.conn, meth))
@property
def oseries(self):
return self.conn.oseries
@property
def stresses(self):
return self.conn.stresses
@property
def models(self):
return self.conn.models
@property
def oseries_names(self):
return self.conn.oseries_names
@property
def stresses_names(self):
return self.conn.stresses_names
@property
def model_names(self):
return self.conn.model_names
@property
def _modelnames_cache(self):
return self.conn._modelnames_cache
@property
def n_oseries(self):
return self.conn.n_oseries
@property
def n_stresses(self):
return self.conn.n_stresses
@property
def n_models(self):
return self.conn.n_models
@property
def oseries_models(self):
return self.conn.oseries_models
@property
def oseries_with_models(self):
return self.conn.oseries_with_models
def __repr__(self):
"""Representation string of the object."""
return f"<PastaStore> {self.name}: \n - " + self.conn.__str__()
[docs]
def get_oseries_distances(
self, names: Optional[Union[list, str]] = None
) -> FrameorSeriesUnion:
"""Method to obtain the distances in meters between the oseries.
Parameters
----------
names: str or list of str
names of the oseries to calculate distances between
Returns
-------
distances: pandas.DataFrame
Pandas DataFrame with the distances between the oseries
"""
oseries_df = self.conn.oseries
other_df = self.conn.oseries
names = self.conn._parse_names(names)
xo = pd.to_numeric(oseries_df.loc[names, "x"])
xt = pd.to_numeric(other_df.loc[:, "x"])
yo = pd.to_numeric(oseries_df.loc[names, "y"])
yt = pd.to_numeric(other_df.loc[:, "y"])
xh, xi = np.meshgrid(xt, xo)
yh, yi = np.meshgrid(yt, yo)
distances = pd.DataFrame(
np.sqrt((xh - xi) ** 2 + (yh - yi) ** 2),
index=names,
columns=other_df.index,
)
return distances
[docs]
def get_nearest_oseries(
self,
names: Optional[Union[list, str]] = None,
n: int = 1,
maxdist: Optional[float] = None,
) -> FrameorSeriesUnion:
"""Method to obtain the nearest (n) oseries.
Parameters
----------
names: str or list of str
string or list of strings with the name(s) of the oseries
n: int
number of oseries to obtain
maxdist : float, optional
maximum distance to consider
Returns
-------
oseries:
list with the names of the oseries.
"""
distances = self.get_oseries_distances(names)
if maxdist is not None:
distances = distances.where(distances <= maxdist, np.nan)
data = pd.DataFrame(columns=np.arange(n))
for series_name in distances.index:
others = distances.loc[series_name].dropna().sort_values().index.tolist()
# remove self
others.remove(series_name)
series = pd.DataFrame(
index=[series_name], columns=data.columns, data=[others[:n]]
)
data = pd.concat([data, series], axis=0)
return data
[docs]
def get_distances(
self,
oseries: Optional[Union[list, str]] = None,
stresses: Optional[Union[list, str]] = None,
kind: Optional[Union[str, List[str]]] = None,
) -> FrameorSeriesUnion:
"""Method to obtain the distances in meters between the oseries and
stresses.
Parameters
----------
oseries: str or list of str
name(s) of the oseries
stresses: str or list of str
name(s) of the stresses
kind: str, list of str
string or list of strings representing which kind(s) of
stresses to consider
Returns
-------
distances: pandas.DataFrame
Pandas DataFrame with the distances between the oseries (index)
and the stresses (columns).
"""
oseries_df = self.conn.oseries
stresses_df = self.conn.stresses
oseries = self.conn._parse_names(oseries)
if stresses is None and kind is None:
stresses = stresses_df.index
elif stresses is not None and kind is not None:
if isinstance(kind, str):
kind = [kind]
mask = stresses_df.kind.isin(kind)
stresses = stresses_df.loc[stresses].loc[mask].index
elif stresses is None:
if isinstance(kind, str):
kind = [kind]
stresses = stresses_df.loc[stresses_df.kind.isin(kind)].index
xo = pd.to_numeric(oseries_df.loc[oseries, "x"])
xt = pd.to_numeric(stresses_df.loc[stresses, "x"])
yo = pd.to_numeric(oseries_df.loc[oseries, "y"])
yt = pd.to_numeric(stresses_df.loc[stresses, "y"])
xh, xi = np.meshgrid(xt, xo)
yh, yi = np.meshgrid(yt, yo)
distances = pd.DataFrame(
np.sqrt((xh - xi) ** 2 + (yh - yi) ** 2),
index=oseries,
columns=stresses,
)
return distances
[docs]
def get_nearest_stresses(
self,
oseries: Optional[Union[list, str]] = None,
stresses: Optional[Union[list, str]] = None,
kind: Optional[Union[list, str]] = None,
n: int = 1,
maxdist: Optional[float] = None,
) -> FrameorSeriesUnion:
"""Method to obtain the nearest (n) stresses of a specific kind.
Parameters
----------
oseries: str
string with the name of the oseries
stresses: str or list of str
string with the name of the stresses
kind: str, list of str, optional
string or list of str with the name of the kind(s)
of stresses to consider
n: int
number of stresses to obtain
maxdist : float, optional
maximum distance to consider
Returns
-------
stresses:
list with the names of the stresses.
"""
distances = self.get_distances(oseries, stresses, kind)
if maxdist is not None:
distances = distances.where(distances <= maxdist, np.nan)
data = pd.DataFrame(columns=np.arange(n))
for series in distances.index:
series = pd.DataFrame(
[distances.loc[series].dropna().sort_values().index[:n]]
)
data = pd.concat([data, series], axis=0)
return data
[docs]
def get_signatures(
self,
signatures=None,
names=None,
libname="oseries",
progressbar=False,
ignore_errors=False,
):
"""Get groundwater signatures. NaN-values are returned when the
signature could not be computed.
Parameters
----------
signatures : list of str, optional
list of groundwater signatures to compute, if None all groundwater
signatures in ps.stats.signatures.__all__ are used, by default None
names : str, list of str, or None, optional
names of the time series, by default None which
uses all the time series in the library
libname : str
name of the library containing the time series
('oseries' or 'stresses'), by default "oseries"
progressbar : bool, optional
show progressbar, by default False
ignore_errors : bool, optional
ignore errors when True, i.e. when non-existent timeseries is
encountered in names, by default False
Returns
-------
signatures_df : pandas.DataFrame
DataFrame containing the signatures (columns) per time series (rows)
"""
names = self.conn._parse_names(names, libname=libname)
if signatures is None:
signatures = ps.stats.signatures.__all__.copy()
# create dataframe for results
signatures_df = pd.DataFrame(index=names, columns=signatures, data=np.nan)
# loop through oseries names
desc = "Get groundwater signatures"
for name in tqdm(names, desc=desc) if progressbar else names:
try:
if libname == "oseries":
s = self.conn.get_oseries(name)
else:
s = self.conn.get_stresses(name)
except Exception as e:
if ignore_errors:
signatures_df.loc[name, :] = np.nan
continue
else:
raise e
try:
i_signatures = ps.stats.signatures.summary(s.squeeze(), signatures)
except Exception as e:
if ignore_errors:
i_signatures = []
for signature in signatures:
try:
sign_val = getattr(ps.stats.signatures, signature)(
s.squeeze()
)
except Exception as _:
sign_val = np.nan
i_signatures.append(sign_val)
else:
raise e
signatures_df.loc[name, signatures] = i_signatures.squeeze()
return signatures_df
[docs]
def get_tmin_tmax(self, libname, names=None, progressbar=False):
"""Get tmin and tmax for time series.
Parameters
----------
libname : str
name of the library containing the time series
('oseries' or 'stresses')
names : str, list of str, or None, optional
names of the time series, by default None which
uses all the time series in the library
progressbar : bool, optional
show progressbar, by default False
Returns
-------
tmintmax : pd.dataframe
Dataframe containing tmin and tmax per time series
"""
names = self.conn._parse_names(names, libname=libname)
tmintmax = pd.DataFrame(
index=names, columns=["tmin", "tmax"], dtype="datetime64[ns]"
)
desc = f"Get tmin/tmax {libname}"
for n in tqdm(names, desc=desc) if progressbar else names:
if libname == "oseries":
s = self.conn.get_oseries(n)
else:
s = self.conn.get_stresses(n)
tmintmax.loc[n, "tmin"] = s.first_valid_index()
tmintmax.loc[n, "tmax"] = s.last_valid_index()
return tmintmax
def get_extent(self, libname, names=None, buffer=0.0):
names = self.conn._parse_names(names, libname=libname)
if libname in ["oseries", "stresses"]:
df = getattr(self, libname)
elif libname == "models":
df = self.oseries
else:
raise ValueError(f"Cannot get extent for library '{libname}'.")
extent = [
df.loc[names, "x"].min() - buffer,
df.loc[names, "x"].max() + buffer,
df.loc[names, "y"].min() - buffer,
df.loc[names, "y"].max() + buffer,
]
return extent
[docs]
def get_parameters(
self,
parameters: Optional[List[str]] = None,
modelnames: Optional[List[str]] = None,
param_value: Optional[str] = "optimal",
progressbar: Optional[bool] = False,
ignore_errors: Optional[bool] = False,
) -> FrameorSeriesUnion:
"""Get model parameters. NaN-values are returned when the parameters
are not present in the model or the model is not optimized.
Parameters
----------
parameters : list of str, optional
names of the parameters, by default None which uses all
parameters from each model
modelnames : str or list of str, optional
name(s) of model(s), by default None in which case all models
are used
param_value : str, optional
which column to use from the model parameters dataframe, by
default "optimal" which retrieves the optimized parameters.
progressbar : bool, optional
show progressbar, default is False
ignore_errors : bool, optional
ignore errors when True, i.e. when non-existent model is
encountered in modelnames, by default False
Returns
-------
p : pandas.DataFrame
DataFrame containing the parameters (columns) per model (rows)
"""
modelnames = self.conn._parse_names(modelnames, libname="models")
# create dataframe for results
p = pd.DataFrame(index=modelnames, columns=parameters)
# loop through model names and store results
desc = "Get model parameters"
for mlname in tqdm(modelnames, desc=desc) if progressbar else modelnames:
try:
mldict = self.get_models(mlname, return_dict=True, progressbar=False)
except Exception as e:
if ignore_errors:
p.loc[mlname, :] = np.nan
continue
else:
raise e
if parameters is None:
pindex = mldict["parameters"].index
else:
pindex = parameters
for c in pindex:
p.loc[mlname, c] = mldict["parameters"].loc[c, param_value]
p = p.squeeze()
return p.astype(float)
[docs]
def get_statistics(
self,
statistics: Union[str, List[str]],
modelnames: Optional[List[str]] = None,
progressbar: Optional[bool] = False,
ignore_errors: Optional[bool] = False,
**kwargs,
) -> FrameorSeriesUnion:
"""Get model statistics.
Parameters
----------
statistics : str or list of str
statistic or list of statistics to calculate, e.g. ["evp", "rsq", "rmse"],
for a full list see `pastas.modelstats.Statistics.ops`.
modelnames : list of str, optional
modelnames to calculates statistics for, by default None, which
uses all models in the store
progressbar : bool, optional
show progressbar, by default False
ignore_errors : bool, optional
ignore errors when True, i.e. when trying to calculate statistics
for non-existent model in modelnames, default is False
**kwargs
any arguments that can be passed to the methods for calculating
statistics
Returns
-------
s : pandas.DataFrame
"""
modelnames = self.conn._parse_names(modelnames, libname="models")
# if statistics is str
if isinstance(statistics, str):
statistics = [statistics]
# create dataframe for results
s = pd.DataFrame(index=modelnames, columns=statistics, data=np.nan)
# loop through model names
desc = "Get model statistics"
for mlname in tqdm(modelnames, desc=desc) if progressbar else modelnames:
try:
ml = self.get_models(mlname, progressbar=False)
except Exception as e:
if ignore_errors:
continue
else:
raise e
for stat in statistics:
value = ml.stats.__getattribute__(stat)(**kwargs)
s.loc[mlname, stat] = value
s = s.squeeze()
return s.astype(float)
[docs]
def create_model(
self,
name: str,
modelname: str = None,
add_recharge: bool = True,
recharge_name: str = "recharge",
) -> ps.Model:
"""Create a pastas Model.
Parameters
----------
name : str
name of the oseries to create a model for
modelname : str, optional
name of the model, default is None, which uses oseries name
add_recharge : bool, optional
add recharge to the model by looking for the closest
precipitation and evaporation time series in the stresses
library, by default True
recharge_name : str
name of the RechargeModel
Returns
-------
pastas.Model
model for the oseries
Raises
------
KeyError
if data is stored as dataframe and no column is provided
ValueError
if time series is empty
"""
# get oseries metadata
meta = self.conn.get_metadata("oseries", name, as_frame=False)
ts = self.conn.get_oseries(name)
# convert to Timeseries and create model
if not ts.dropna().empty:
if modelname is None:
modelname = name
ml = ps.Model(ts, name=modelname, metadata=meta)
if add_recharge:
self.add_recharge(ml, recharge_name=recharge_name)
return ml
else:
raise ValueError("Empty time series!")
[docs]
def create_models_bulk(
self,
oseries: Optional[Union[list, str]] = None,
add_recharge: bool = True,
solve: bool = False,
store_models: bool = True,
ignore_errors: bool = False,
progressbar: bool = True,
**kwargs,
) -> Union[Tuple[dict, dict], dict]:
"""Bulk creation of pastas models.
Parameters
----------
oseries : list of str, optional
names of oseries to create models for, by default None,
which creates models for all oseries
add_recharge : bool, optional
add recharge to the models based on closest
precipitation and evaporation time series, by default True
solve : bool, optional
solve the model, by default False
store_models : bool, optional
if False, return a list of models, by default True, which will
store the models in the database.
ignore_errors : bool, optional
ignore errors while creating models, by default False
progressbar : bool, optional
show progressbar, by default True
Returns
-------
models : dict, if return_models is True
dictionary of models
errors : list, always returned
list of model names that could not be created
"""
if oseries is None:
oseries = self.conn.oseries.index
elif isinstance(oseries, str):
oseries = [oseries]
models = {}
errors = {}
desc = "Bulk creation models"
for o in tqdm(oseries, desc=desc) if progressbar else oseries:
try:
iml = self.create_model(o, add_recharge=add_recharge)
except Exception as e:
if ignore_errors:
errors[o] = e
continue
else:
raise e
if solve:
iml.solve(**kwargs)
if store_models:
self.conn.add_model(iml, overwrite=True)
else:
models[o] = iml
if len(errors) > 0:
print("Warning! Errors occurred while creating models!")
if store_models:
return errors
else:
return models, errors
[docs]
def add_recharge(
self,
ml: ps.Model,
rfunc=None,
recharge=None,
recharge_name: str = "recharge",
) -> None:
"""Add recharge to a pastas model.
Uses closest precipitation and evaporation time series in database.
These are assumed to be labeled with kind = 'prec' or 'evap'.
Parameters
----------
ml : pastas.Model
pastas.Model object
rfunc : pastas.rfunc, optional
response function to use for recharge in model, by default None
which uses ps.Exponential() (for different response functions, see pastas
documentation)
recharge : ps.RechargeModel
recharge model to use, default is ps.rch.Linear()
recharge_name : str
name of the RechargeModel
"""
# get nearest prec and evap stns
if "prec" not in self.stresses.kind.values:
raise ValueError(
"No stresses with kind='prec' found in store. "
"add_recharge() requires stresses with kind='prec'!"
)
if "evap" not in self.stresses.kind.values:
raise ValueError(
"No stresses with kind='evap' found in store. "
"add_recharge() requires stresses with kind='evap'!"
)
names = []
for var in ("prec", "evap"):
try:
name = self.get_nearest_stresses(ml.oseries.name, kind=var).iloc[0, 0]
except AttributeError:
msg = "No precipitation or evaporation time series found!"
raise Exception(msg)
if isinstance(name, float):
if np.isnan(name):
raise ValueError(
f"Unable to find nearest '{var}' stress! "
"Check x and y coordinates."
)
else:
names.append(name)
if len(names) == 0:
msg = "No precipitation or evaporation time series found!"
raise Exception(msg)
# get data
tsdict = self.conn.get_stresses(names)
metadata = self.conn.get_metadata("stresses", names, as_frame=False)
# add recharge to model
rch = ps.RechargeModel(
tsdict[names[0]],
tsdict[names[1]],
rfunc=rfunc,
name=recharge_name,
recharge=recharge,
settings=("prec", "evap"),
metadata=metadata,
)
ml.add_stressmodel(rch)
[docs]
def solve_models(
self,
mls: Optional[Union[ps.Model, list, str]] = None,
report: bool = False,
ignore_solve_errors: bool = False,
store_result: bool = True,
progressbar: bool = True,
**kwargs,
) -> None:
"""Solves the models in the store.
Parameters
----------
mls : list of str, optional
list of model names, if None all models in the pastastore
are solved.
report : boolean, optional
determines if a report is printed when the model is solved,
default is False
ignore_solve_errors : boolean, optional
if True, errors emerging from the solve method are ignored,
default is False which will raise an exception when a model
cannot be optimized
store_result : bool, optional
if True save optimized models, default is True
progressbar : bool, optional
show progressbar, default is True
**kwargs :
arguments are passed to the solve method.
"""
if mls is None:
mls = self.conn.model_names
elif isinstance(mls, ps.Model):
mls = [mls.name]
desc = "Solving models"
for ml_name in tqdm(mls, desc=desc) if progressbar else mls:
ml = self.conn.get_models(ml_name)
m_kwargs = {}
for key, value in kwargs.items():
if isinstance(value, pd.Series):
m_kwargs[key] = value.loc[ml_name]
else:
m_kwargs[key] = value
# Convert timestamps
for tstamp in ["tmin", "tmax"]:
if tstamp in m_kwargs:
m_kwargs[tstamp] = pd.Timestamp(m_kwargs[tstamp])
try:
ml.solve(report=report, **m_kwargs)
if store_result:
self.conn.add_model(ml, overwrite=True)
except Exception as e:
if ignore_solve_errors:
warning = "solve error ignored for -> {}".format(ml.name)
ps.logger.warning(warning)
else:
raise e
[docs]
def model_results(
self,
mls: Optional[Union[ps.Model, list, str]] = None,
progressbar: bool = True,
): # pragma: no cover
"""Get pastas model results.
Parameters
----------
mls : list of str, optional
list of model names, by default None which means results for
all models will be calculated
progressbar : bool, optional
show progressbar, by default True
Returns
-------
results : pd.DataFrame
dataframe containing parameters and other statistics
for each model
Raises
------
ModuleNotFoundError
if the art_tools module is not available
"""
try:
from art_tools import pastas_get_model_results
except Exception:
raise ModuleNotFoundError("You need 'art_tools' to use this method!")
if mls is None:
mls = self.conn.models
elif isinstance(mls, ps.Model):
mls = [mls.name]
results_list = []
desc = "Get model results"
for mlname in tqdm(mls, desc=desc) if progressbar else mls:
try:
iml = self.conn.get_models(mlname)
except Exception as e:
print("{1}: '{0}' could not be parsed!".format(mlname, e))
continue
iresults = pastas_get_model_results(
iml, par_selection="all", stats=("evp",), stderrors=True
)
results_list.append(iresults)
return pd.concat(results_list, axis=1).transpose()
[docs]
def to_zip(self, fname: str, overwrite=False, progressbar: bool = True):
"""Write data to zipfile.
Parameters
----------
fname : str
name of zipfile
overwrite : bool, optional
if True, overwrite existing file
progressbar : bool, optional
show progressbar, by default True
"""
from zipfile import ZIP_DEFLATED, ZipFile
if os.path.exists(fname) and not overwrite:
raise FileExistsError(
"File already exists! " "Use 'overwrite=True' to " "force writing file."
)
elif os.path.exists(fname):
warnings.warn(f"Overwriting file '{os.path.basename(fname)}'")
with ZipFile(fname, "w", compression=ZIP_DEFLATED) as archive:
# oseries
self.conn._series_to_archive(archive, "oseries", progressbar=progressbar)
# stresses
self.conn._series_to_archive(archive, "stresses", progressbar=progressbar)
# models
self.conn._models_to_archive(archive, progressbar=progressbar)
[docs]
def export_model_series_to_csv(
self,
names: Optional[Union[list, str]] = None,
exportdir: str = ".",
exportmeta: bool = True,
): # pragma: no cover
"""Export model time series to csv files.
Parameters
----------
names : Optional[Union[list, str]], optional
names of models to export, by default None, which uses retrieves
all models from database
exportdir : str, optional
directory to export csv files to, default is current directory
exportmeta : bool, optional
export metadata for all time series as csv file, default is True
"""
names = self.conn._parse_names(names, libname="models")
for name in names:
mldict = self.get_models(name, return_dict=True)
oname = mldict["oseries"]["name"]
o = self.get_oseries(oname)
o.to_csv(os.path.join(exportdir, f"{oname}.csv"))
if exportmeta:
metalist = [self.get_metadata("oseries", oname)]
for sm in mldict["stressmodels"]:
if mldict["stressmodels"][sm]["stressmodel"] == "RechargeModel":
for istress in ["prec", "evap"]:
istress = mldict["stressmodels"][sm][istress]
stress_name = istress["name"]
ts = self.get_stresses(stress_name)
ts.to_csv(os.path.join(exportdir, f"{stress_name}.csv"))
if exportmeta:
tsmeta = self.get_metadata("stresses", stress_name)
metalist.append(tsmeta)
else:
for istress in mldict["stressmodels"][sm]["stress"]:
stress_name = istress["name"]
ts = self.get_stresses(stress_name)
ts.to_csv(os.path.join(exportdir, f"{stress_name}.csv"))
if exportmeta:
tsmeta = self.get_metadata("stresses", stress_name)
metalist.append(tsmeta)
if exportmeta:
pd.concat(metalist, axis=0).to_csv(
os.path.join(exportdir, f"metadata_{name}.csv")
)
[docs]
@classmethod
def from_zip(
cls,
fname: str,
conn: Optional[BaseConnector] = None,
storename: Optional[str] = None,
progressbar: bool = True,
):
"""Load PastaStore from zipfile.
Parameters
----------
fname : str
pathname of zipfile
conn : Connector object, optional
connector for storing loaded data, default is None which creates a
DictConnector. This Connector does not store data on disk.
storename : str, optional
name of the PastaStore, by default None, which
defaults to the name of the Connector.
progressbar : bool, optional
show progressbar, by default True
Returns
-------
pastastore.PastaStore
return PastaStore containing data from zipfile
"""
from zipfile import ZipFile
if conn is None:
conn = DictConnector("pastas_db")
with ZipFile(fname, "r") as archive:
namelist = [
fi for fi in archive.namelist() if not fi.endswith("_meta.json")
]
for f in tqdm(namelist, desc="Reading zip") if progressbar else namelist:
libname, fjson = os.path.split(f)
if libname in ["stresses", "oseries"]:
s = pd.read_json(archive.open(f), dtype=float, orient="columns")
if not isinstance(s.index, pd.DatetimeIndex):
s.index = pd.to_datetime(s.index, unit="ms")
s = s.sort_index()
meta = json.load(archive.open(f.replace(".json", "_meta.json")))
conn._add_series(libname, s, fjson.split(".")[0], metadata=meta)
elif libname in ["models"]:
ml = json.load(archive.open(f), object_hook=pastas_hook)
conn.add_model(ml)
if storename is None:
storename = conn.name
return cls(conn, storename)
[docs]
def search(
self,
libname: str,
s: Optional[Union[list, str]] = None,
case_sensitive: bool = True,
sort=True,
):
"""Search for names of time series or models starting with `s`.
Parameters
----------
libname : str
name of the library to search in
s : str, lst
find names with part of this string or strings in list
case_sensitive : bool, optional
whether search should be case sensitive, by default True
sort : bool, optional
sort list of names
Returns
-------
matches : list
list of names that match search result
"""
if libname == "models":
lib_names = getattr(self, "model_names")
elif libname == "stresses":
lib_names = getattr(self, "stresses_names")
elif libname == "oseries":
lib_names = getattr(self, "oseries_names")
else:
raise ValueError("Provide valid libname: 'models', 'stresses' or 'oseries'")
if isinstance(s, str):
if case_sensitive:
matches = [n for n in lib_names if s in n]
else:
matches = [n for n in lib_names if s.lower() in n.lower()]
if isinstance(s, list):
m = np.array([])
for sub in s:
if case_sensitive:
m = np.append(m, [n for n in lib_names if sub in n])
else:
m = np.append(m, [n for n in lib_names if sub.lower() in n.lower()])
matches = list(np.unique(m))
if sort:
matches.sort()
return matches
[docs]
def get_model_timeseries_names(
self,
modelnames: Optional[Union[list, str]] = None,
dropna: bool = True,
progressbar: bool = True,
) -> FrameorSeriesUnion:
"""Get time series names contained in model.
Parameters
----------
modelnames : Optional[Union[list, str]], optional
list or name of models to get time series names for,
by default None which will use all modelnames
dropna : bool, optional
drop stresses from table if stress is not included in any
model, by default True
progressbar : bool, optional
show progressbar, by default True
Returns
-------
structure : pandas.DataFrame
returns DataFrame with oseries name per model, and a flag
indicating whether a stress is contained within a time series
model.
"""
model_names = self.conn._parse_names(modelnames, libname="models")
structure = pd.DataFrame(
index=model_names, columns=["oseries"] + self.stresses_names
)
for mlnam in (
tqdm(model_names, desc="Get model time series names")
if progressbar
else model_names
):
iml = self.get_models(mlnam, return_dict=True)
PASFILE_LEQ_022 = parse_version(
iml["file_info"]["pastas_version"]
) <= parse_version("0.22.0")
# oseries
structure.loc[mlnam, "oseries"] = iml["oseries"]["name"]
for sm in iml["stressmodels"].values():
class_key = "stressmodel" if PASFILE_LEQ_022 else "class"
if sm[class_key] == "RechargeModel":
pnam = sm["prec"]["name"]
enam = sm["evap"]["name"]
structure.loc[mlnam, pnam] = 1
structure.loc[mlnam, enam] = 1
elif "stress" in sm:
smstress = sm["stress"]
if isinstance(smstress, dict):
smstress = [smstress]
for s in smstress:
structure.loc[mlnam, s["name"]] = 1
if dropna:
return structure.dropna(how="all", axis=1)
else:
return structure
[docs]
def apply(self, libname, func, names=None, progressbar=True):
"""Apply function to items in library.
Supported libraries are oseries, stresses, and models.
Parameters
----------
libname : str
library name, supports "oseries", "stresses" and "models"
func : callable
function that accepts items from one of the supported libraries as input
names : str, list of str, optional
apply function to these names, by default None which loops over all stored
items in library
progressbar : bool, optional
show progressbar, by default True
Returns
-------
dict
dict of results of func, with names as keys and results as values
"""
names = self.conn._parse_names(names, libname)
result = {}
if libname not in ("oseries", "stresses", "models"):
raise ValueError(
"'libname' must be one of ['oseries', 'stresses', 'models']!"
)
getter = getattr(self.conn, f"get_{libname}")
for n in (
tqdm(names, desc=f"Applying {func.__name__}") if progressbar else names
):
result[n] = func(getter(n))
return result
def within(self, extent, names=None, libname="oseries"):
xmin, xmax, ymin, ymax = extent
names = self.conn._parse_names(names, libname)
if libname == "oseries":
df = self.oseries.loc[names]
elif libname == "stresses":
df = self.stresses.loc[names]
elif libname == "models":
onames = np.unique(
[
self.get_models(modelname, return_dict=True)["oseries"]["name"]
for modelname in names
]
)
df = self.oseries.loc[onames]
else:
raise ValueError(
"libname must be one of ['oseries', 'stresses', 'models']"
f", got '{libname}'"
)
mask = (
(df["x"] <= xmax)
& (df["x"] >= xmin)
& (df["y"] >= ymin)
& (df["y"] <= ymax)
)
return df.loc[mask].index.tolist()