"""Module containing the PastaStore object for managing time series and models."""
import json
import logging
import os
import warnings
from collections.abc import Iterable
from functools import partial
from pathlib import Path
from typing import Any, Callable
import numpy as np
import pandas as pd
import pastas as ps
from pastas.io.pas import pastas_hook
from pastastore._tqdm import tqdm
from pastastore.base import BaseConnector
from pastastore.connectors import ArcticDBConnector, DictConnector, PasConnector
from pastastore.plotting import Maps, Plots
from pastastore.styling import boolean_styler
from pastastore.typing import DataFrameOrSeries, PastasLibs, TimeSeriesLibs
from pastastore.util import ZipUtils, _custom_warning
from pastastore.version import PASTAS_GEQ_150
from pastastore.yaml_interface import PastastoreYAML
warnings.showwarning = _custom_warning
logger = logging.getLogger(__name__)
[docs]
class PastaStore:
"""PastaStore object for managing pastas time series and models.
Requires a Connector object to provide the interface to
the database. Different Connectors are available, e.g.:
- PasConnector for storing all data as .pas (JSON) files on disk (recommended)
- ArcticDBConnector for saving data on disk using arcticdb package
- DictConnector for storing all data in dictionaries (in-memory)
Parameters
----------
connector : Connector object
object that provides the interface to the
database, e.g. ArcticConnector (see pastastore.connectors)
name : str, optional
name of the PastaStore, by default takes the name of the Connector object
"""
_accessors = set()
def __init__(
self,
connector: BaseConnector | None = None,
name: str | None = None,
):
"""Initialize PastaStore for managing pastas time series and models.
Parameters
----------
connector : Connector object, optional
object that provides the connection to the database. Default is None, which
will create a DictConnector. This default Connector does not store data on
disk.
name : str, optional
name of the PastaStore, if not provided uses the Connector name
"""
if isinstance(connector, str):
raise DeprecationWarning(
"PastaStore expects the connector as the first argument since v1.1!"
)
if connector is None:
connector = DictConnector("pastas_db")
self.conn = connector
self.validator = self.conn.validator
self.name = name if name is not None else self.conn.name
self._register_connector_methods()
# register map, plot, yaml and zip classes
self.maps = Maps(self)
self.plots = Plots(self)
self.yaml = PastastoreYAML(self)
self.zip = ZipUtils(self)
[docs]
@classmethod
def from_pastastore_config_file(cls, fname, update_path: bool = True):
"""Create a PastaStore from a pastastore config file.
Parameters
----------
fname : str
path to the pastastore config file
update_path : bool, optional
when True, use path derived from location of the config file instead of
the stored path in the config file. If a PastaStore is moved, the path
in the config file will probably still refer to the old location. Set to
False to read the file from the path listed in the config file. In that
case config files do not need to be stored within the correct directory.
Returns
-------
PastaStore
PastaStore
"""
with open(fname, "r", encoding="utf-8") as f:
cfg = json.load(f)
conn_type = cfg.pop("connector_type")
if conn_type == "pas":
# allow loading of pas files from config file that was moved
if update_path:
# update path to the config file
cfg["path"] = Path(fname).parent.parent
conn = PasConnector(**cfg)
elif conn_type == "arcticdb":
if update_path:
prefix, _ = cfg["uri"].split("://")
if prefix.lower() == "lmbd":
cfg["uri"] = prefix + "://" + str(Path(fname).parent.parent)
conn = ArcticDBConnector(**cfg)
else:
raise ValueError(
f"Cannot load connector type: '{conn_type}'. "
"This is only supported for PasConnector and ArcticDBConnector."
)
return cls(conn)
@property
def empty(self) -> bool:
"""Check if the PastaStore is empty."""
return self.conn.empty
def _register_connector_methods(self):
"""Register connector methods (internal method)."""
methods = [
func
for func in dir(self.conn)
if callable(getattr(self.conn, func)) and not func.startswith("_")
]
for meth in methods:
setattr(self, meth, getattr(self.conn, meth))
@property
def oseries(self):
"""
Returns the oseries metadata as dataframe.
Returns
-------
oseries
oseries metadata as dataframe
"""
return self.conn.oseries
@property
def stresses(self):
"""
Returns the stresses metadata as dataframe.
Returns
-------
stresses
stresses metadata as dataframe
"""
return self.conn.stresses
@property
def models(self):
"""Return the ModelAccessor object.
The ModelAccessor object allows dictionary-like assignment and access to models.
In addition it provides some useful utilities for working with stored models
in the database.
Examples
--------
Get a model by name::
>>> model = pstore.models["my_model"]
Store a model in the database::
>>> pstore.models["my_model_v2"] = model
Get model metadata dataframe::
>>> pstore.models.metadata
Number of models::
>>> len(pstore.models)
Random model::
>>> model = pstore.models.random()
Iterate over stored models::
>>> for ml in pstore.models:
>>> ml.solve()
Returns
-------
ModelAccessor
ModelAccessor object
"""
return self.conn.models
@property
def oseries_names(self):
"""Return list of oseries names.
Returns
-------
list
list of oseries names
"""
return self.conn.oseries_names
@property
def stresses_names(self):
"""Return list of streses names.
Returns
-------
list
list of stresses names
"""
return self.conn.stresses_names
@property
def model_names(self):
"""Return list of model names.
Returns
-------
list
list of model names
"""
return self.conn.model_names
@property
def _modelnames_cache(self):
# Backward-compat shim; prefer public property model_names
return self.conn.model_names
@property
def n_oseries(self):
"""Return number of oseries.
Returns
-------
int
number of oseries
"""
return self.conn.n_oseries
@property
def n_stresses(self):
"""Return number of stresses.
Returns
-------
int
number of stresses
"""
return self.conn.n_stresses
@property
def n_models(self):
"""Return number of models.
Returns
-------
int
number of models
"""
return self.conn.n_models
@property
def oseries_models(self):
"""Return dictionary of models per oseries.
Returns
-------
dict
dictionary containing list of models (values) for each oseries (keys).
"""
return self.conn.oseries_models
@property
def oseries_with_models(self):
"""Return list of oseries for which models are contained in the database.
Returns
-------
list
list of oseries names for which models are contained in the database.
"""
return self.conn.oseries_with_models
@property
def stresses_models(self):
"""Return dictionary of models per stress.
Returns
-------
dict
dictionary containing list of models (values) for each stress (keys).
"""
return self.conn.stresses_models
@property
def stresses_with_models(self):
"""Return list of stresses for which models are contained in the database.
Returns
-------
list
list of stress names for which models are contained in the database.
"""
return self.conn.stresses_with_models
def __repr__(self):
"""Representation string of the object."""
return f"<PastaStore> {self.name}: \n - " + self.conn.__str__()
[docs]
def get_oseries_distances(
self, names: list[str] | str | None = None
) -> DataFrameOrSeries:
"""Get the distances in meters between the oseries.
Parameters
----------
names: str | list[str]
names of the oseries to calculate distances between
Returns
-------
distances: pandas.DataFrame
Pandas DataFrame with the distances between the oseries
"""
oseries_df = self.conn.oseries
other_df = self.conn.oseries
names = self.conn.parse_names(names)
xo = pd.to_numeric(oseries_df.loc[names, "x"])
xt = pd.to_numeric(other_df.loc[:, "x"])
yo = pd.to_numeric(oseries_df.loc[names, "y"])
yt = pd.to_numeric(other_df.loc[:, "y"])
xh, xi = np.meshgrid(xt, xo)
yh, yi = np.meshgrid(yt, yo)
distances = pd.DataFrame(
np.sqrt((xh - xi) ** 2 + (yh - yi) ** 2),
index=names,
columns=other_df.index,
)
return distances
[docs]
def get_nearest_oseries(
self,
names: list[str] | str | None = None,
n: int = 1,
maxdist: float | None = None,
) -> DataFrameOrSeries:
"""Get the nearest (n) oseries.
Parameters
----------
names: str | list[str]
string or list of strings with the name(s) of the oseries
n: int
number of oseries to obtain
maxdist : float, optional
maximum distance to consider
Returns
-------
oseries:
list with the names of the oseries.
"""
distances = self.get_oseries_distances(names)
if maxdist is not None:
distances = distances.where(distances <= maxdist, np.nan)
data = pd.DataFrame(columns=np.arange(n))
for series_name in distances.index:
others = distances.loc[series_name].dropna().sort_values().index.tolist()
# remove self
others.remove(series_name)
series = pd.DataFrame(
index=[series_name], columns=data.columns, data=[others[:n]]
)
data = pd.concat([data, series], axis=0)
return data
[docs]
def get_distances(
self,
oseries: list[str] | str | None = None,
stresses: list[str] | str | None = None,
kind: list[str] | str | None = None,
) -> DataFrameOrSeries:
"""Get the distances in meters between the oseries and stresses.
Parameters
----------
oseries: str | list[str]
name(s) of the oseries
stresses: str | list[str]
name(s) of the stresses
kind: str | list[str]
string or list of strings representing which kind(s) of
stresses to consider
Returns
-------
distances: pandas.DataFrame
Pandas DataFrame with the distances between the oseries (index)
and the stresses (columns).
"""
oseries_df = self.conn.oseries
stresses_df = self.conn.stresses
oseries = self.conn.parse_names(oseries)
if stresses is None and kind is None:
stresses = stresses_df.index
elif stresses is not None and kind is not None:
if isinstance(kind, str):
kind = [kind]
mask = stresses_df.kind.isin(kind)
stresses = stresses_df.loc[stresses].loc[mask].index
elif stresses is None:
if isinstance(kind, str):
kind = [kind]
stresses = stresses_df.loc[stresses_df.kind.isin(kind)].index
xo = pd.to_numeric(oseries_df.loc[oseries, "x"])
xt = pd.to_numeric(stresses_df.loc[stresses, "x"])
yo = pd.to_numeric(oseries_df.loc[oseries, "y"])
yt = pd.to_numeric(stresses_df.loc[stresses, "y"])
xh, xi = np.meshgrid(xt, xo)
yh, yi = np.meshgrid(yt, yo)
distances = pd.DataFrame(
np.sqrt((xh - xi) ** 2 + (yh - yi) ** 2),
index=oseries,
columns=stresses,
)
return distances
[docs]
def get_nearest_stresses(
self,
oseries: list[str] | str | None = None,
stresses: list[str] | str | None = None,
kind: list[str] | str | None = None,
n: int = 1,
maxdist: float | None = None,
) -> DataFrameOrSeries:
"""Get the nearest (n) stresses of a specific kind.
Parameters
----------
oseries: str
string with the name of the oseries
stresses: str | list[str]
string with the name of the stresses
kind: str | list[str], optional
string or list of str with the name of the kind(s)
of stresses to consider
n: int
number of stresses to obtain
maxdist : float, optional
maximum distance to consider
Returns
-------
stresses:
list with the names of the stresses.
"""
distances = self.get_distances(oseries, stresses, kind)
if maxdist is not None:
distances = distances.where(distances <= maxdist, np.nan)
data = pd.DataFrame(columns=np.arange(n))
for series_name in distances.index:
nearest = distances.loc[series_name].dropna().sort_values().index[:n]
data.loc[series_name, range(len(nearest))] = nearest
return data
[docs]
def get_signatures(
self,
names: list[str] | str | None = None,
signatures: list[str] | None = None,
libname: TimeSeriesLibs = "oseries",
progressbar: bool = False,
ignore_errors: bool = False,
) -> pd.DataFrame | pd.Series:
"""Get groundwater signatures.
NaN-values are returned when the signature cannot be computed.
Parameters
----------
names : str | list[str], optional
names of the time series, by default None which
uses all the time series in the library
signatures : list[str], optional
list of groundwater signatures to compute, if None all groundwater
signatures in ps.stats.signatures.__all__ are used, by default None
libname : str
name of the library containing the time series
('oseries' or 'stresses'), by default "oseries"
progressbar : bool, optional
show progressbar, by default False
ignore_errors : bool, optional
ignore errors when True, i.e. when non-existent timeseries is
encountered in names, by default False
Returns
-------
signatures_df : pandas.DataFrame or pandas.Series
Containing the time series (columns) and the signatures (index).
Note
----
Names is set as the first argument to allow parallelization.
"""
names = self.conn.parse_names(names, libname=libname)
signatures = (
ps.stats.signatures.__all__.copy() if signatures is None else signatures
)
sigs = []
# loop through oseries names
for name in (
tqdm(names, desc="Get groundwater signatures") if progressbar else names
):
sig = pd.Series(np.nan, index=signatures, name=name)
try:
s = (
self.conn.get_oseries(name)
if libname == "oseries"
else self.conn.get_stresses(name)
)
except Exception as e:
if ignore_errors:
return sig
else:
raise e
try:
i_signatures: pd.Series = ps.stats.signatures.summary(
s.squeeze(), signatures
).squeeze("columns")
except Exception as e:
if ignore_errors:
i_signatures = []
for signature in signatures:
try:
sign_val = getattr(ps.stats.signatures, signature)(
s.squeeze()
)
except Exception as _:
sign_val = np.nan
i_signatures.append(sign_val)
else:
raise e
sig.loc[signatures] = i_signatures.values
sigs.append(sig)
if len(sigs) == 1:
return sigs[0]
else:
return pd.concat(sigs, axis=1)
[docs]
def get_tmin_tmax(
self,
libname: PastasLibs | None = None,
names: str | list[str] | None = None,
progressbar: bool = False,
) -> pd.DataFrame:
"""Get tmin and tmax for time series and/or models.
Parameters
----------
libname : str, optional
name of the library containing the time series ('oseries', 'stresses',
'models', or None), by default None which returns tmin/tmax for all
libraries
names : str | list[str], optional
names of the time series, by default None which
uses all the time series in the library
progressbar : bool, optional
show progressbar, by default False
Returns
-------
tmintmax : pd.dataframe
Dataframe containing tmin and tmax per time series and/or model
"""
results = {}
if libname is None:
libs = ["oseries", "stresses", "models"]
else:
libs = [libname]
for lib in libs:
_names = self.conn.parse_names(names, libname=lib)
tmintmax = pd.DataFrame(
index=pd.Index(names) if names else None,
columns=pd.Index(["tmin", "tmax"]),
dtype="datetime64[ns]",
)
desc = f"Get tmin/tmax {lib}"
for n in tqdm(_names, desc=desc) if progressbar else _names:
if lib == "models":
mld = self.conn.get_models(
n,
return_dict=True,
)
tmintmax.loc[n, "tmin"] = mld["settings"]["tmin"]
tmintmax.loc[n, "tmax"] = mld["settings"]["tmax"]
else:
s = (
self.conn.get_oseries(n)
if lib == "oseries"
else self.conn.get_stresses(n)
)
tmintmax.loc[n, "tmin"] = s.first_valid_index()
tmintmax.loc[n, "tmax"] = s.last_valid_index()
results[lib] = tmintmax
if len(results) == 1:
return results[lib]
else:
return pd.concat(results, axis=0)
[docs]
def get_extent(self, libname, names=None, buffer=0.0):
"""Get extent [xmin, xmax, ymin, ymax] from library.
Parameters
----------
libname : str
name of the library containing the time series
('oseries', 'stresses', 'models')
names : str | list[str], optional
list of names to include for computing the extent
buffer : float, optional
add this distance to the extent, by default 0.0
Returns
-------
extent : list
extent [xmin, xmax, ymin, ymax]
"""
names = self.conn.parse_names(names, libname=libname)
if libname in ["oseries", "stresses"]:
df = getattr(self, libname)
elif libname == "models":
df = self.oseries
else:
raise ValueError(f"Cannot get extent for library '{libname}'.")
extent = [
df.loc[names, "x"].min() - buffer,
df.loc[names, "x"].max() + buffer,
df.loc[names, "y"].min() - buffer,
df.loc[names, "y"].max() + buffer,
]
return extent
[docs]
def get_parameters(
self,
parameters: list[str] | None = None,
modelnames: list[str] | None = None,
param_value: str | None = "optimal",
progressbar: bool | None = False,
ignore_errors: bool | None = True,
) -> DataFrameOrSeries:
"""Get model parameters.
NaN-values are returned when the parameters are not present in the model or the
model is not optimized.
Parameters
----------
parameters : list[str], optional
names of the parameters, by default None which uses all
parameters from each model
modelnames : str | list[str], optional
name(s) of model(s), by default None in which case all models
are used
param_value : str, optional
which column to use from the model parameters dataframe, by
default "optimal" which retrieves the optimized parameters.
progressbar : bool, optional
show progressbar, default is False
ignore_errors : bool, optional
ignore errors when True, i.e. when non-existent model is
encountered in modelnames, by default True
Returns
-------
p : pandas.DataFrame
DataFrame containing the parameters (columns) per model (rows)
"""
modelnames = self.conn.parse_names(modelnames, libname="models")
# create dataframe for results
p = pd.DataFrame(
index=pd.Index(modelnames),
columns=pd.Index(parameters) if parameters else None,
dtype=float,
)
# loop through model names and store results
desc = "Get model parameters"
for mlname in tqdm(modelnames, desc=desc) if progressbar else modelnames:
try:
mldict = self.conn.get_models(
mlname, return_dict=True, progressbar=False
)
except Exception as e:
if ignore_errors:
p.loc[mlname, :] = np.nan
continue
else:
raise e
if parameters is None:
pindex = mldict["parameters"].index
else:
pindex = parameters
for c in pindex:
if c in mldict["parameters"].index:
p.loc[mlname, c] = mldict["parameters"].loc[c, param_value]
else:
p.loc[mlname, c] = np.nan
return p.astype(float).squeeze()
[docs]
def get_statistics(
self,
statistics: str | list[str],
modelnames: list[str] | None = None,
parallel: bool = False,
progressbar: bool = False,
ignore_errors: bool = False,
fancy_output: bool = True,
**kwargs,
) -> DataFrameOrSeries:
"""Get model statistics.
Parameters
----------
statistics : str | list[str]
statistic or list of statistics to calculate, e.g. ["evp", "rsq", "rmse"],
for a full list see `pastas.modelstats.Statistics.ops`.
modelnames : list[str], optional
modelnames to calculates statistics for, by default None, which
uses all models in the store
progressbar : bool, optional
show progressbar, by default False
ignore_errors : bool, optional
ignore errors when True, i.e. when trying to calculate statistics
for non-existent model in modelnames, default is False
parallel : bool, optional
use parallel processing, by default False
fancy_output : bool, optional
only read if parallel=True, if True, return as DataFrame with statistics,
otherwise return list of results
**kwargs
any arguments that can be passed to the methods for calculating
statistics
Returns
-------
s : pandas.DataFrame
"""
modelnames = self.conn.parse_names(modelnames, libname="models")
# if statistics is str
if isinstance(statistics, str):
statistics = [statistics]
if parallel:
kwargs["statistics"] = statistics
if self.conn.conn_type == "pas":
kwargs["connector"] = self.conn
return self.apply(
libname="models",
func=self.conn._get_statistics,
names=modelnames,
kwargs=kwargs,
parallel=parallel,
progressbar=progressbar,
fancy_output=fancy_output,
).transpose() # transpose to match serial output
else:
# create dataframe for results
s = pd.DataFrame(
index=pd.Index(modelnames), columns=pd.Index(statistics), data=np.nan
)
# loop through model names
desc = "Get model statistics"
for mlname in tqdm(modelnames, desc=desc) if progressbar else modelnames:
try:
ml = self.conn.get_models(mlname, progressbar=False)
except Exception as e:
if ignore_errors:
continue
else:
raise e
for stat in statistics:
value = getattr(ml.stats, stat)(**kwargs)
s.loc[mlname, stat] = value
return s.astype(float).squeeze()
[docs]
def create_model(
self,
name: str,
modelname: str | None = None,
add_recharge: bool = True,
add_ar_noisemodel: bool = False,
recharge_name: str = "recharge",
) -> ps.Model:
"""Create a pastas Model.
Parameters
----------
name : str
name of the oseries to create a model for
modelname : str, optional
name of the model, default is None, which uses oseries name
add_recharge : bool, optional
add recharge to the model by looking for the closest
precipitation and evaporation time series in the stresses
library, by default True
add_ar_noisemodel : bool, optional
add AR(1) noise model to the model, by default False
recharge_name : str
name of the RechargeModel
Returns
-------
pastas.Model
model for the oseries
Raises
------
KeyError
if data is stored as dataframe and no column is provided
ValueError
if time series is empty
"""
# get oseries metadata
meta = self.conn.get_metadata("oseries", name, as_frame=False)
ts = self.conn.get_oseries(name)
# convert to time series and create model
if not ts.dropna().empty:
if modelname is None:
modelname = name
ml = ps.Model(ts, name=modelname, metadata=meta)
if add_recharge:
self.add_recharge(ml, recharge_name=recharge_name)
if add_ar_noisemodel and PASTAS_GEQ_150:
ml.add_noisemodel(ps.ArNoiseModel())
return ml
else:
raise ValueError("Empty time series!")
[docs]
def create_models_bulk(
self,
oseries: list[str] | str | None = None,
add_recharge: bool = True,
solve: bool = False,
store_models: bool = True,
ignore_errors: bool = False,
suffix: str | None = None,
progressbar: bool = True,
**kwargs,
) -> tuple[dict, dict] | dict:
"""Bulk creation of pastas models.
Parameters
----------
oseries : list[str], optional
names of oseries to create models for, by default None,
which creates models for all oseries
add_recharge : bool, optional
add recharge to the models based on closest
precipitation and evaporation time series, by default True
solve : bool, optional
solve the model, by default False
store_models : bool, optional
if False, return a list of models, by default True, which will
store the models in the database.
ignore_errors : bool, optional
ignore errors while creating models, by default False
suffix : str, optional
add suffix to oseries name to create model name, by default None
progressbar : bool, optional
show progressbar, by default True
Returns
-------
models : dict, if return_models is True
dictionary of models
errors : list, always returned
list of model names that could not be created
"""
if oseries is None:
oseries = self.conn.oseries.index
elif isinstance(oseries, str):
oseries = [oseries]
models = {}
errors = {}
desc = "Bulk creation models"
for o in tqdm(oseries, desc=desc) if progressbar else oseries:
try:
if suffix is not None:
modelname = f"{o}{suffix}"
else:
modelname = o
iml = self.create_model(
o, modelname=modelname, add_recharge=add_recharge
)
except Exception as e:
if ignore_errors:
errors[o] = e
continue
else:
raise e
if solve:
iml.solve(**kwargs)
if store_models:
self.add_model(iml, overwrite=True)
else:
models[o] = iml
if len(errors) > 0:
logger.warning(
"Warning! Some models were not created. See errors dictionary "
"for more information."
)
if store_models:
return errors
else:
return models, errors
[docs]
def add_recharge(
self,
ml: ps.Model,
rfunc=None,
recharge=None,
recharge_name: str = "recharge",
) -> None:
"""Add recharge to a pastas model.
Uses closest precipitation and evaporation time series in database.
These are assumed to be labeled with kind = 'prec' or 'evap'.
Parameters
----------
ml : pastas.Model
pastas.Model object
rfunc : pastas.rfunc, optional
response function to use for recharge in model, by default None
which uses ps.Exponential() (for different response functions, see pastas
documentation)
recharge : ps.RechargeModel
recharge model to use, default is ps.rch.Linear()
recharge_name : str
name of the RechargeModel
"""
if recharge is None:
recharge = ps.rch.Linear()
if rfunc is None:
rfunc = ps.Exponential
self.add_stressmodel(
ml,
stresses={"prec": "nearest", "evap": "nearest"},
rfunc=rfunc,
stressmodel=ps.RechargeModel,
stressmodel_name=recharge_name,
recharge=recharge,
)
def _parse_stresses(
self,
stresses: str | list[str] | dict[str, str],
kind: str | None,
stressmodel,
oseries: str | None = None,
):
# parse stresses for RechargeModel, allow list of len 2 or 3 and
# set correct kwarg names
if stressmodel._name == "RechargeModel":
if isinstance(stresses, list):
if len(stresses) == 2:
stresses = {
"prec": stresses[0],
"evap": stresses[1],
}
elif len(stresses) == 3:
stresses = {
"prec": stresses[0],
"evap": stresses[1],
"temp": stresses[2],
}
else:
raise ValueError(
"RechargeModel requires 2 or 3 stress names, "
f"got: {len(stresses)}!"
)
# if stresses is list, create dictionary normally
elif isinstance(stresses, list):
stresses = {"stress": stresses}
# if stresses is str, make it a list of len 1
elif isinstance(stresses, str):
stresses = {"stress": [stresses]}
# check if stresses is a dictionary, else raise TypeError
if not isinstance(stresses, dict):
raise TypeError("stresses must be a list, string or dictionary!")
# if no kind specified, set to well for WellModel
if stressmodel._name == "WellModel":
if kind is None:
kind = "well"
# store a copy of the user input for kind
if isinstance(kind, list):
_kind = kind.copy()
else:
_kind = kind
# create empty list for gathering metadata
metadata = []
# loop over stresses keys/values
for i, (k, v) in enumerate(stresses.items()):
# if entry in dictionary is str, make it list of len 1
if isinstance(v, str):
v = [v]
# parse value
if isinstance(v, list):
for item in v:
names = [] # empty list for names
# parse nearest
if item.startswith("nearest"):
# check oseries defined if nearest option is used
if not oseries:
raise ValueError(
"Getting nearest stress(es) requires oseries name!"
)
try:
if len(item.split()) == 3: # nearest <n> <kind>
n = int(item.split()[1])
kind = item.split()[2]
elif len(item.split()) == 2: # nearest <n> | <kind>
try:
n = int(item.split()[1]) # try converting to <n>
except ValueError:
n = 1
kind = item.split()[1] # interpret as <kind>
else: # nearest
n = 1
# if RechargeModel, we can infer kind
if (
_kind is None
and stressmodel._name == "RechargeModel"
):
kind = k
elif _kind is None: # catch no kind with bare nearest
raise ValueError(
"Bare 'nearest' found but no kind specified."
)
elif isinstance(_kind, list):
kind = _kind[i] # if multiple kind, select i-th
except Exception as e:
# raise if nearest parsing failed
raise ValueError(
f"Could not parse stresses: '{item}'! "
"When using option 'nearest', use 'nearest' and specify"
" kind, or 'nearest <kind>' or 'nearest <n> <kind>'!"
) from e
# check if kind exists at all
if kind not in self.stresses.kind.values:
raise ValueError(
f"Could not find stresses with kind='{kind}'!"
)
# get stress names of <n> nearest <kind> stresses
inames = self.get_nearest_stresses(
oseries, kind=kind, n=n
).iloc[0]
# check if any NaNs in result
if inames.isna().any():
nkind = (self.stresses.kind == kind).sum()
raise ValueError(
f"Could not find {n} nearest stress(es) for '{kind}'! "
f"There are only {nkind} '{kind}' stresses."
)
# append names
names += inames.tolist()
else:
# assume name is name of stress
names.append(item)
# get stresses and metadata
stress_series, imeta = self.conn.get_stresses(
names, return_metadata=True, squeeze=True
)
# replace stress name(s) with time series
if len(names) > 1:
stresses[k] = list(stress_series.values())
else:
stresses[k] = stress_series
# gather metadata
if isinstance(imeta, list):
metadata += imeta
else:
metadata.append(imeta)
return stresses, metadata
[docs]
def get_stressmodel(
self,
stresses: str | list[str] | dict[str, str],
stressmodel=ps.StressModel,
stressmodel_name: str | None = None,
rfunc=ps.Exponential,
rfunc_kwargs: dict | None = None,
kind: list[str] | str | None = None,
oseries: str | None = None,
**kwargs,
):
"""Get a Pastas stressmodel from stresses time series in Pastastore.
Supports "nearest" selection. Any stress name can be replaced by
"nearest [<n>] <kind>" where <n> is optional and represents the number of
nearest stresses and <kind> and represents the kind of stress to
consider. <kind> can also be specified directly with the `kind` kwarg.
Note: the 'nearest' option requires the oseries name to be provided.
Additionally, 'x' and 'y' metadata must be stored for oseries and stresses.
Parameters
----------
stresses : str | list[str] | dict
name(s) of the time series to use for the stressmodel, or dictionary
with key(s) and value(s) as time series name(s). Options include:
- name of stress: `"prec_stn"`
- list of stress names: `["prec_stn", "evap_stn"]`
- dict for RechargeModel: `{"prec": "prec_stn", "evap": "evap_stn"}`
- dict for StressModel: `{"stress": "well1"}`
- nearest, specifying kind: `"nearest well"`
- nearest specifying number and kind: `"nearest 2 well"`
stressmodel : str or class
stressmodel class to use, by default ps.StressModel
stressmodel_name : str, optional
name of the stressmodel, by default None, which uses the stress name,
if there is 1 stress otherwise the name of the stressmodel type. For
RechargeModels, the name defaults to 'recharge'.
rfunc : str or class
response function class to use, by default ps.Exponential
rfunc_kwargs : dict, optional
keyword arguments to pass to the response function, by default None
kind : str | list[str], optional
specify kind of stress(es) to use, by default None, useful in combination
with 'nearest' option for defining stresses
oseries : str, optional
name of the oseries to use for the stressmodel, by default None, used when
'nearest' option is used for defining stresses.
**kwargs
additional keyword arguments to pass to the stressmodel
Returns
-------
stressmodel : pastas.StressModel
pastas StressModel that can be added to pastas Model.
"""
# get stressmodel class, if str was provided
if isinstance(stressmodel, str):
stressmodel = getattr(ps, stressmodel)
# parse stresses names to get time series and metadata
stresses, metadata = self._parse_stresses(
stresses=stresses, stressmodel=stressmodel, kind=kind, oseries=oseries
)
# get stressmodel name if not provided
if stressmodel_name is None:
if stressmodel._name == "RechargeModel":
stressmodel_name = "recharge"
elif len(metadata) == 1:
stressmodel_name = stresses["stress"].squeeze().name
else:
stressmodel_name = stressmodel._name
# check if metadata is list of len 1 and unpack
if isinstance(metadata, list) and len(metadata) == 1:
metadata = metadata[0]
# get stressmodel time series settings
if kind and "settings" not in kwargs:
# try using kind to get predefined settings options
if isinstance(kind, str):
kwargs["settings"] = ps.rcParams["timeseries"].get(kind, None)
else:
kwargs["settings"] = [
ps.rcParams["timeseries"].get(ikind, None) for ikind in kind
]
elif kind is None and "settings" not in kwargs:
# try using kind stored in metadata to get predefined settings options
if isinstance(metadata, list):
kwargs["settings"] = [
ps.rcParams["timeseries"].get(imeta.get("kind", None), None)
for imeta in metadata
]
elif isinstance(metadata, dict):
kwargs["settings"] = ps.rcParams["timeseries"].get(
metadata.get("kind", None), None
)
# get rfunc class if str was provided
if isinstance(rfunc, str):
rfunc = getattr(ps, rfunc)
# create empty rfunc_kwargs if not provided
if rfunc_kwargs is None:
rfunc_kwargs = {}
# special for WellModels
if stressmodel._name == "WellModel":
if isinstance(stresses["stress"], list):
names = [s.squeeze().name for s in stresses["stress"]]
else:
names = [stresses["stress"].squeeze().name]
stresses["stress"] = [stresses["stress"]] # ugly fix for WellModel
# check oseries is provided
if oseries is None:
raise ValueError("WellModel requires 'oseries' to compute distances!")
# compute distances and add to kwargs
distances = (
self.get_distances(oseries=oseries, stresses=names)
.T.squeeze(axis=1)
.values
)
kwargs["distances"] = distances
# set settings to well
if "settings" not in kwargs:
kwargs["settings"] = "well"
# override rfunc and set to HantushWellModel
rfunc = ps.HantushWellModel
kwargs["metadata"] = metadata
return stressmodel(
**stresses,
rfunc=rfunc(**rfunc_kwargs),
name=stressmodel_name,
**kwargs,
)
[docs]
def add_stressmodel(
self,
ml: ps.Model | str,
stresses: str | list[str] | dict[str, str],
stressmodel=ps.StressModel,
stressmodel_name: str | None = None,
rfunc=ps.Exponential,
rfunc_kwargs: dict | None = None,
kind: list[str] | str | None = None,
**kwargs,
):
"""Add a pastas StressModel from stresses time series in Pastastore.
Supports "nearest" selection. Any stress name can be replaced by
"nearest [<n>] <kind>" where <n> is optional and represents the number of
nearest stresses and <kind> and represents the kind of stress to
consider. <kind> can also be specified directly with the `kind` kwarg.
Note: the 'nearest' option requires the oseries name to be provided.
Additionally, 'x' and 'y' metadata must be stored for oseries and stresses.
Parameters
----------
ml : pastas.Model or str
pastas.Model object to add StressModel to, if passed as string,
model is loaded from store, the stressmodel is added and then written
back to the store.
stresses : str | list[str] | dict
name(s) of the time series to use for the stressmodel, or dictionary
with key(s) and value(s) as time series name(s). Options include:
- name of stress: `"prec_stn"`
- list of stress names: `["prec_stn", "evap_stn"]`
- dict for RechargeModel: `{"prec": "prec_stn", "evap": "evap_stn"}`
- dict for StressModel: `{"stress": "well1"}`
- nearest, specifying kind: `"nearest well"`
- nearest specifying number and kind: `"nearest 2 well"`
stressmodel : str or class
stressmodel class to use, by default ps.StressModel
stressmodel_name : str, optional
name of the stressmodel, by default None, which uses the stress name,
if there is 1 stress otherwise the name of the stressmodel type. For
RechargeModels, the name defaults to 'recharge'.
rfunc : str or class
response function class to use, by default ps.Exponential
rfunc_kwargs : dict, optional
keyword arguments to pass to the response function, by default None
kind : str | list[str], optional
specify kind of stress(es) to use, by default None, useful in combination
with 'nearest' option for defining stresses
**kwargs
additional keyword arguments to pass to the stressmodel
"""
sm = self.get_stressmodel(
stresses=stresses,
stressmodel=stressmodel,
stressmodel_name=stressmodel_name,
rfunc=rfunc,
rfunc_kwargs=rfunc_kwargs,
kind=kind,
oseries=ml if isinstance(ml, str) else ml.oseries.name,
**kwargs,
)
if isinstance(ml, str):
ml: ps.Model = self.conn.get_model(ml)
ml.add_stressmodel(sm)
self.conn.add_model(ml, overwrite=True)
logger.info(
"Stressmodel '%s' added to model '%s' and stored in database.",
sm.name,
ml.name,
)
else:
ml.add_stressmodel(sm)
[docs]
def solve_models(
self,
modelnames: list[str] | str | None = None,
report: bool = False,
ignore_solve_errors: bool = False,
progressbar: bool = True,
parallel: bool = False,
max_workers: int | None = None,
**kwargs,
) -> None:
"""Solves the models in the store.
Parameters
----------
modelnames : list[str], optional
list of model names, if None all models in the pastastore
are solved.
report : boolean, optional
determines if a report is printed when the model is solved,
default is False
ignore_solve_errors : boolean, optional
if True, errors emerging from the solve method are ignored,
default is False which will raise an exception when a model
cannot be optimized
progressbar : bool, optional
show progressbar, default is True.
parallel: bool, optional
if True, solve models in parallel using ProcessPoolExecutor
max_workers: int, optional
maximum number of workers to use in parallel solving, default is
None which will use the number of cores available on the machine
**kwargs : dictionary
arguments are passed to the solve method.
Notes
-----
Users should be aware that parallel solving is platform dependent
and may not always work. The current implementation works well for Linux users.
For Windows users, parallel solving does not work when called directly from
Jupyter Notebooks or IPython. To use parallel solving on Windows, the following
code should be used in a Python file::
from multiprocessing import freeze_support
if __name__ == "__main__":
freeze_support()
pstore.solve_models(parallel=True)
"""
if "mls" in kwargs:
modelnames = kwargs.pop("mls")
logger.warning("Argument `mls` is deprecated, use `modelnames` instead.")
modelnames = self.conn.parse_names(modelnames, libname="models")
# prepare parallel
if parallel and self.conn.conn_type == "dict":
parallel = False
logger.error(
"Parallel solving only supported for PasConnector and "
"ArcticDBConnector databases. Setting parallel to `False`"
)
if parallel:
if self.conn.conn_type == "arcticdb":
solve_model = partial(
self.conn._solve_model,
report=report,
ignore_solve_errors=ignore_solve_errors,
**kwargs,
)
self.conn._parallel(
solve_model,
modelnames,
max_workers=max_workers,
chunksize=None,
progressbar=progressbar,
desc="Solving models (parallel)",
)
elif self.conn.conn_type == "pas":
solve_model = partial(
self.conn._solve_model,
connector=self.conn,
report=report,
ignore_solve_errors=ignore_solve_errors,
**kwargs,
)
self.conn._parallel(
solve_model,
modelnames,
max_workers=max_workers,
chunksize=None,
progressbar=progressbar,
desc="Solving models (parallel)",
)
else:
solve_model = partial(
self.conn._solve_model,
connector=self.conn,
report=report,
ignore_solve_errors=ignore_solve_errors,
**kwargs,
)
for ml_name in (
tqdm(modelnames, desc="Solving models") if progressbar else modelnames
):
solve_model(ml_name=ml_name)
[docs]
def check_models(self, checklist=None, modelnames=None, style_output: bool = False):
"""Check models against checklist.
Parameters
----------
checklist : dict, optional
dictionary containing model check methods, by default None which
uses the ps.checks.checks_brakenhoff_2022 checklist. This includes:
- fit metric R² >= 0.6
- runs test for autocorrelation
- t95 response < half length calibration period
- |model parameters| < 1.96 * σ (std deviation)
- model parameters are not on bounds
modelnames : list[str], optional
list of modelnames to perform checks on, by default None
style_output : bool, optional
if True, return styled dataframe with pass/fail colors,
by default False
Returns
-------
pd.DataFrame
DataFrame containing pass True/False for each check for each model
"""
if checklist is None:
checklist = ps.check.checks_brakenhoff_2022
names = self.conn.parse_names(modelnames, libname="models")
check_dfs = []
for n in names:
cdf = ps.check.checklist(self.models[n], checklist, report=False)["pass"]
cdf.name = n
check_dfs.append(cdf)
chkdf = pd.concat(check_dfs, axis=1)
chkdf.columns.name = "models"
if style_output:
return chkdf.style.map(boolean_styler)
else:
return chkdf
[docs]
def to_zip(self, fname: str | Path, overwrite=False, progressbar: bool = True):
"""Write data to zipfile.
Parameters
----------
fname : str | Path
name of zipfile
overwrite : bool, optional
if True, overwrite existing file
progressbar : bool, optional
show progressbar, by default True
"""
from zipfile import ZIP_DEFLATED, ZipFile
fname = Path(fname)
if fname.exists() and not overwrite:
raise FileExistsError(
"File already exists! Use 'overwrite=True' to force writing file."
)
elif fname.exists():
warnings.warn(f"Overwriting file '{fname.name}'", stacklevel=1)
with ZipFile(fname, "w", compression=ZIP_DEFLATED) as archive:
# oseries
self.zip.series_to_archive(archive, "oseries", progressbar=progressbar)
# stresses
self.zip.series_to_archive(archive, "stresses", progressbar=progressbar)
# models
self.zip.models_to_archive(archive, progressbar=progressbar)
[docs]
def export_model_series_to_csv(
self,
names: list[str] | str | None = None,
exportdir: Path | str = ".",
exportmeta: bool = True,
): # pragma: no cover
"""Export model time series to csv files.
Parameters
----------
names : list[str] | str | None, optional
names of models to export, by default None, which uses retrieves
all models from database
exportdir : str, optional
directory to export csv files to, default is current directory
exportmeta : bool, optional
export metadata for all time series as csv file, default is True
"""
names = self.conn.parse_names(names, libname="models")
exportdir = Path(exportdir)
for name in names:
mldict = self.conn.get_models(name, return_dict=True)
oname = mldict["oseries"]["name"]
o = self.conn.get_oseries(oname)
o.to_csv(exportdir / f"{oname}.csv")
if exportmeta:
metalist = [self.conn.get_metadata("oseries", oname)]
for sm in mldict["stressmodels"]:
if mldict["stressmodels"][sm]["stressmodel"] == "RechargeModel":
for istress in ["prec", "evap"]:
istress = mldict["stressmodels"][sm][istress]
stress_name = istress["name"]
ts = self.conn.get_stresses(stress_name)
ts.to_csv(exportdir / f"{stress_name}.csv")
if exportmeta:
tsmeta = self.get_metadata("stresses", stress_name)
metalist.append(tsmeta)
else:
for istress in mldict["stressmodels"][sm]["stress"]:
stress_name = istress["name"]
ts = self.conn.get_stresses(stress_name)
ts.to_csv(exportdir / f"{stress_name}.csv")
if exportmeta:
tsmeta = self.conn.get_metadata("stresses", stress_name)
metalist.append(tsmeta)
if exportmeta:
pd.concat(metalist, axis=0).to_csv(exportdir / f"metadata_{name}.csv")
[docs]
@classmethod
def from_zip(
cls,
fname: str,
conn: BaseConnector | None = None,
storename: str | None = None,
progressbar: bool = True,
series_ext_json: bool = False,
):
"""Load PastaStore from zipfile.
Parameters
----------
fname : str
pathname of zipfile
conn : Connector object, optional
connector for storing loaded data, default is None which creates a
DictConnector. This Connector does not store data on disk.
storename : str, optional
name of the PastaStore, by default None, which
defaults to the name of the Connector.
progressbar : bool, optional
show progressbar, by default True
series_ext_json : bool, optional
if True, series are expected to have a .json extension, by default False,
which assumes a .pas extension. set this option to true for reading
zipfiles created with older versions of pastastore <1.8.0.
Returns
-------
pastastore.PastaStore
return PastaStore containing data from zipfile
"""
from zipfile import ZipFile
if conn is None:
conn = DictConnector("pastas_db")
if series_ext_json:
ext = "json"
else:
ext = "pas"
# short-circuit for PasConnector when zipfile was written using pas files
if conn.conn_type == "pas" and not series_ext_json:
with ZipFile(fname, "r") as archive:
archive.extractall(conn.path)
if storename is None:
storename = conn.name
# because of the short-circuit, the model cache and reverse lookup tables
# are not aware of the extracted models. Guarantee the update of the
# connector here to ensure everything is recomputed.
conn._clear_cache("_modelnames_cache")
conn._update_time_series_model_links(recompute=True)
return cls(conn, storename)
with ZipFile(fname, "r") as archive:
namelist = [
fi
for fi in archive.namelist()
if not fi.endswith(f"_meta.{ext}") and not fi.endswith(os.sep)
]
for f in tqdm(namelist, desc="Reading zip") if progressbar else namelist:
if f.endswith("_meta.json"):
raise (
ValueError(
"The zipfile was created using pastastore <1.8.0."
" Please pass `series_ext_json=True` to `from_zip()`"
)
)
libname, fjson = os.path.split(f)
libname = os.path.split(libname)[-1] # in case zip is one level deeper
if libname in ["stresses", "oseries"]:
s = pd.read_json(archive.open(f), dtype=float, orient="columns")
if not isinstance(s.index, pd.DatetimeIndex):
s.index = pd.to_datetime(s.index, unit="ms")
s = s.sort_index()
meta = json.load(archive.open(f.replace(f".{ext}", f"_meta.{ext}")))
conn._add_series(
libname=libname,
series=s,
name=fjson.split(".")[0],
metadata=meta,
)
elif libname in ["models"]:
ml = json.load(archive.open(f), object_hook=pastas_hook)
conn.add_model(ml)
if storename is None:
storename = conn.name
return cls(conn, storename)
[docs]
def search(
self,
s: list | str | None = None,
libname: PastasLibs | None = None,
case_sensitive: bool = True,
sort=True,
):
"""Search for names of time series or models containing string `s`.
Parameters
----------
libname : str
name of the library to search in
s : str, lst
find names with part of this string or strings in list
case_sensitive : bool, optional
whether search should be case sensitive, by default True
sort : bool, optional
sort list of names
Returns
-------
matches : list
list of names that match search result
"""
if isinstance(s, str) and s in [None, "models", "stresses", "oseries"]:
warnings.warn(
"The order of arguments 's' and 'libname' has been flipped since v1.11."
" Please update your code to use pstore.search(s=..., libname=...).",
DeprecationWarning,
stacklevel=2,
)
temp = str(s) # create copy
s = str(libname) # create copy
libname = temp
if libname == "models":
lib_names = {"models": self.model_names}
elif libname == "stresses":
lib_names = {"stresses": self.stresses_names}
elif libname == "oseries":
lib_names = {"oseries": self.oseries_names}
elif libname is None:
lib_names = {
"oseries": self.oseries_names,
"stresses": self.stresses_names,
"models": self.model_names,
}
else:
raise ValueError(
"Provide valid libname: 'models', 'stresses', 'oseries' or None"
" to seach within all libraries."
)
result = {}
for lib, names in lib_names.items():
if isinstance(s, str):
if case_sensitive:
matches = [n for n in names if s in n]
else:
matches = [n for n in names if s.lower() in n.lower()]
elif isinstance(s, list):
m = np.array([])
for sub in s:
if case_sensitive:
m = np.append(m, [n for n in names if sub in n])
else:
m = np.append(m, [n for n in names if sub.lower() in n.lower()])
matches = list(np.unique(m))
else:
raise TypeError("s must be str or list of str!")
if sort:
matches.sort()
result[lib] = matches
if len(result) == 1:
return result[libname]
else:
return result
[docs]
def apply(
self,
libname: PastasLibs,
func: Callable,
names: list[str] | str | None = None,
kwargs: dict | None = None,
progressbar: bool = True,
parallel: bool = False,
max_workers: int | None = None,
fancy_output: bool = True,
initializer: Callable | None = None,
initargs: tuple | None = None,
) -> dict | pd.Series | pd.DataFrame | Any:
"""Apply function to items in library.
Supported libraries are oseries, stresses, and models.
Parameters
----------
libname : str
library name, supports "oseries", "stresses" and "models"
func : callable
function that accepts a string corresponding to the name of an item in
the library as its first argument. Additional keyword arguments can be
specified. The function can return any result, or update an item in the
database without returning anything.
names : str | list[str], optional
apply function to these names, by default None which loops over all stored
items in library
kwargs : dict, optional
keyword arguments to pass to func, by default None
progressbar : bool, optional
show progressbar, by default True
parallel : bool, optional
run apply in parallel, default is False.
max_workers : int, optional
max no. of workers, only used if parallel is True
fancy_output : bool, optional
if True, try returning result as pandas Series or DataFrame, by default
False
initializer : Callable, optional
function to initialize each worker process, only used if parallel is True
initargs : tuple, optional
arguments to pass to initializer, only used if parallel is True
Returns
-------
dict
dict of results of func, with names as keys and results as values
Notes
-----
Users should be aware that parallel solving is platform dependent
and may not always work. The current implementation works well for Linux users.
For Windows users, parallel solving does not work when called directly from
Jupyter Notebooks or IPython. To use parallel solving on Windows, the following
code should be used in a Python file::
from multiprocessing import freeze_support
if __name__ == "__main__":
freeze_support()
pstore.apply("models", some_func, parallel=True)
"""
names = self.conn.parse_names(names, libname)
if kwargs is None:
kwargs = {}
if libname not in ("oseries", "stresses", "models"):
raise ValueError(
"'libname' must be one of ['oseries', 'stresses', 'models']!"
)
if parallel:
result = self.conn._parallel(
func,
kwargs=kwargs,
names=names,
progressbar=progressbar,
max_workers=max_workers,
chunksize=None,
desc=f"Computing {getattr(func, '__name__', '')} (parallel)",
initializer=initializer,
initargs=initargs,
)
else:
result = []
for n in tqdm(
names,
desc=f"Computing {getattr(func, '__name__', '')}",
disable=not progressbar,
):
result.append(func(n, **kwargs))
if fancy_output:
return PastaStore._fancy_output(
result, names, getattr(func, "__name__", "")
)
else:
return result
@staticmethod
def _fancy_output(
result: Iterable,
names: list[str],
label: str | None = None,
) -> pd.Series | pd.DataFrame | dict | None:
"""Convert apply result to pandas Series, DataFrame or dict.
Parameters
----------
result : Iterable
result of apply function
names : list
list of names
label : str, optional
label for columns, by default None
Returns
-------
pd.Series, pd.DataFrame, dict
Series, DataFrame or dict with results
"""
if not isinstance(result, list):
result = list(result)
if isinstance(result[0], (float, int, np.integer)):
return pd.Series(result, index=names)
elif isinstance(result[0], (pd.Series, pd.DataFrame)):
if isinstance(result[0], pd.Series):
if names[0] == result[0].name:
df = pd.concat(result, axis=1)
else:
df = pd.concat(dict(zip(names, result, strict=True)), axis=1)
else:
df = pd.concat(dict(zip(names, result, strict=True)), axis=1)
if label is not None:
df.columns.name = label
return df
elif result[0] is None:
return None # return None if first result is None?
else:
return dict(zip(names, result, strict=True))
[docs]
def within(
self,
extent: list,
names: list[str] | None = None,
libname: PastasLibs = "oseries",
):
"""Get names of items within extent.
Parameters
----------
extent : list
list with [xmin, xmax, ymin, ymax]
names : str | list[str], optional
list of names to include, by default None
libname : str, optional
name of library, must be one of ('oseries', 'stresses', 'models'), by
default "oseries"
Returns
-------
list
list of items within extent
"""
xmin, xmax, ymin, ymax = extent
names = self.conn.parse_names(names, libname)
if libname == "oseries":
df = self.oseries.loc[names]
elif libname == "stresses":
df = self.stresses.loc[names]
elif libname == "models":
onames = np.unique(
[
self.get_models(modelname, return_dict=True)["oseries"]["name"]
for modelname in names
]
)
df = self.oseries.loc[onames]
else:
raise ValueError(
"libname must be one of ['oseries', 'stresses', 'models']"
f", got '{libname}'"
)
mask = (
(df["x"] <= xmax)
& (df["x"] >= xmin)
& (df["y"] >= ymin)
& (df["y"] <= ymax)
)
return df.loc[mask].index.tolist()