Source code for pastastore.base

# ruff: noqa: B019
"""Base classes for PastaStore Connectors."""

import functools
import logging
import warnings
from abc import ABC, abstractmethod
from collections.abc import Iterable
from itertools import chain
from random import choice
from typing import Any, Callable

import pandas as pd
import pastas as ps
from packaging.version import parse as parse_version

from pastastore._tqdm import tqdm
from pastastore.typing import AllLibs, DataFrameOrSeries, TimeSeriesLibs
from pastastore.util import (
    ItemInLibraryException,
    SeriesUsedByModel,
    _custom_warning,
    validate_names,
)
from pastastore.validator import Validator
from pastastore.version import PASTAS_GEQ_150

warnings.showwarning = _custom_warning

logger = logging.getLogger(__name__)


[docs] class ConnectorUtil: """Mix-in class for utility methods used by BaseConnector subclasses. This class contains internal methods for parsing names, handling metadata, and parsing model dictionaries. It is designed to be mixed into BaseConnector subclasses and assumes the presence of certain attributes and methods from BaseConnector (e.g., oseries_names, stresses_names, get_oseries, get_stresses). Note ---- This class should not be instantiated directly. It is intended to be used as a mixin with BaseConnector subclasses only. """
[docs] def _parse_names( self, names: list[str] | str | None = None, libname: AllLibs = "oseries", ) -> list: """Parse names kwarg, returns iterable with name(s) (internal method). Parameters ---------- names : list | str, optional str or list of str or None or 'all' (last two options retrieves all names) libname : str, optional name of library, default is 'oseries' Returns ------- list list of names """ if not isinstance(names, str) and isinstance(names, Iterable): return names elif isinstance(names, str) and names != "all": return [names] elif names is None or names == "all": if libname == "oseries": return self.oseries_names elif libname == "stresses": return self.stresses_names elif libname == "models": return self.model_names elif libname == "oseries_models": return self.oseries_with_models elif libname == "stresses_models": return self.stresses_with_models else: raise ValueError(f"No library '{libname}'!") else: raise NotImplementedError(f"Cannot parse 'names': {names}")
[docs] @staticmethod def _meta_list_to_frame(metalist: list[dict], names: list[str]) -> pd.DataFrame: """Convert list of metadata dictionaries to DataFrame. Parameters ---------- metalist : list list of metadata dictionaries names : list list of names corresponding to data in metalist Returns ------- pandas.DataFrame DataFrame containing overview of metadata """ # convert to dataframe meta_index = pd.Index(names, name="name") if len(metalist) > 1: meta = pd.DataFrame(metalist, index=meta_index) if len({"x", "y"}.difference(meta.columns)) == 0: meta["x"] = meta["x"].astype(float) meta["y"] = meta["y"].astype(float) elif len(metalist) == 1: meta = pd.DataFrame(metalist, index=meta_index) elif len(metalist) == 0: meta = pd.DataFrame(index=meta_index) return meta
[docs] def _parse_model_dict( self, mdict: dict, update_ts_settings: bool = False ) -> ps.Model: """Parse dictionary describing pastas models (internal method). Parameters ---------- mdict : dict dictionary describing pastas.Model update_ts_settings : bool, optional update stored tmin and tmax in time series settings based on time series loaded from store. Returns ------- ml : pastas.Model time series analysis model """ PASFILE_LEQ_022 = parse_version( mdict["file_info"]["pastas_version"] ) <= parse_version("0.22.0") # oseries if "series" not in mdict["oseries"]: name = str(mdict["oseries"]["name"]) if not self._item_exists("oseries", name): msg = f"oseries '{name}' not present in library" raise LookupError(msg) mdict["oseries"]["series"] = self.get_oseries(name).squeeze() # update tmin/tmax from time series if update_ts_settings: mdict["oseries"]["settings"]["tmin"] = mdict["oseries"]["series"].index[ 0 ] mdict["oseries"]["settings"]["tmax"] = mdict["oseries"]["series"].index[ -1 ] # StressModel, WellModel for ts in mdict["stressmodels"].values(): if "stress" in ts.keys(): # WellModel classkey = "stressmodel" if PASFILE_LEQ_022 else "class" if ts[classkey] == "WellModel": for stress in ts["stress"]: if "series" not in stress: name = str(stress["name"]) if self._item_exists("stresses", name): stress["series"] = self.get_stresses(name).squeeze() # update tmin/tmax from time series if update_ts_settings: stress["settings"]["tmin"] = stress["series"].index[ 0 ] stress["settings"]["tmax"] = stress["series"].index[ -1 ] # StressModel else: for stress in ts["stress"] if PASFILE_LEQ_022 else [ts["stress"]]: if "series" not in stress: name = str(stress["name"]) if self._item_exists("stresses", name): stress["series"] = self.get_stresses(name).squeeze() # update tmin/tmax from time series if update_ts_settings: stress["settings"]["tmin"] = stress["series"].index[ 0 ] stress["settings"]["tmax"] = stress["series"].index[ -1 ] # RechargeModel, TarsoModel if ("prec" in ts.keys()) and ("evap" in ts.keys()): for stress in [ts["prec"], ts["evap"]]: if "series" not in stress: name = str(stress["name"]) if self._item_exists("stresses", name): stress["series"] = self.get_stresses(name).squeeze() # update tmin/tmax from time series if update_ts_settings: stress["settings"]["tmin"] = stress["series"].index[0] stress["settings"]["tmax"] = stress["series"].index[-1] else: msg = "stress '{name}' not present in library" raise KeyError(msg) # hack for pcov w dtype object (when filled with NaNs on store?) if "fit" in mdict: if "pcov" in mdict["fit"]: pcov = mdict["fit"]["pcov"] if pcov.dtypes.apply(lambda dtyp: isinstance(dtyp, object)).any(): mdict["fit"]["pcov"] = pcov.astype(float) # check pastas version vs pas-file version file_version = mdict["file_info"]["pastas_version"] # check file version and pastas version # if file<0.23 and pastas>=1.0 --> error PASTAS_GT_023 = parse_version(ps.__version__) > parse_version("0.23.1") if PASFILE_LEQ_022 and PASTAS_GT_023: raise UserWarning( f"This file was created with Pastas v{file_version} " f"and cannot be loaded with Pastas v{ps.__version__} Please load and " "save the file with Pastas 0.23 first to update the file " "format." ) # Use pastas' internal _load_model - required for model reconstruction ml = ps.io.base._load_model(mdict) # noqa: SLF001 return ml
[docs] class BaseConnector(ABC, ConnectorUtil): """Base Connector class. Class holds base logic for dealing with time series and Pastas Models. Create your own Connector to a data source by writing a a class that inherits from this BaseConnector. Your class has to override each abstractmethod and property. """ _default_library_names = ( "oseries", "stresses", "models", "oseries_models", "stresses_models", ) _conn_type: str | None = None _validator: Validator | None = None name: str | None = None def __init__(self): self._added_models = [] # internal list of added models used for updating links def __getstate__(self): """Return picklable state, stripping Manager objects and proxies. ``multiprocessing.Manager`` instances and their proxy objects cannot be pickled. When a connector is serialised for a worker process the flags are stored as plain ``bool`` values so that workers start with a clean, independent copy. The ``_manager`` attribute (if present) is removed entirely. """ state = self.__dict__.copy() # Remove the Manager instance – it is not picklable and workers do not # need a reference back to the main-process manager server. state.pop("_manager", None) # Replace Manager proxy values with plain booleans. The flags may # already be plain booleans when _ensure_manager_proxies() has not yet # been called (lazy init), so we guard with hasattr. if hasattr(self._oseries_links_need_update, "value"): state["_oseries_links_need_update"] = self._oseries_links_need_update.value state["_stresses_links_need_update"] = ( self._stresses_links_need_update.value ) return state def __setstate__(self, state): """Restore state after unpickling. After unpickling, flags are plain booleans (set by ``__getstate__``). Worker processes operate on independent copies and do not need shared memory proxies. """ self.__dict__.update(state) # Flags are already plain booleans from __getstate__; nothing more to do. def __repr__(self): """Representation string of the object.""" return ( f"<{type(self).__name__}> '{self.name}': " f"{self.n_oseries} oseries, " f"{self.n_stresses} stresses, " f"{self.n_models} models" ) @property def validation_settings(self) -> dict: """Return current connector settings as dictionary.""" return self.validator.settings @property def empty(self) -> bool: """Check if the database is empty.""" return not any([self.n_oseries > 0, self.n_stresses > 0, self.n_models > 0]) @property def validator(self) -> Validator: """Get the Validator instance for this connector.""" if self._validator is None: raise AttributeError("Validator not set for this connector.") return self._validator @property def conn_type(self) -> str: """Get the connector type.""" if self._conn_type is None: raise AttributeError( "Connector class must set a connector type in `conn_type` attribute." ) return self._conn_type
[docs] @abstractmethod def _get_library(self, libname: AllLibs): """Get library handle. Must be overridden by subclass. Parameters ---------- libname : str name of the library Returns ------- lib : Any handle to the library """
[docs] @abstractmethod def _add_item( self, libname: AllLibs, item: DataFrameOrSeries | dict, name: str, metadata: dict | None = None, ) -> None: """Add item for both time series and pastas.Models (internal method). Must be overridden by subclass. Parameters ---------- libname : str name of library to add item to item : DataFrameOrSeries | dict item to add name : str name of the item metadata : dict, optional dictionary containing metadata, by default None Note ---- Metadata storage can vary by connector: - ArcticDB: Native metadata support via write() - DictConnector: Stored as tuple (metadata, item) - PasConnector: Separate {name}_meta.pas JSON file """
[docs] @abstractmethod def _get_item(self, libname: AllLibs, name: str) -> DataFrameOrSeries | dict: """Get item (series or pastas.Models) (internal method). Must be overridden by subclass. Parameters ---------- libname : str name of library name : str name of item Returns ------- item : DataFrameOrSeries | dict item (time series or pastas.Model) """
[docs] @abstractmethod def _del_item(self, libname: AllLibs, name: str, force: bool = False) -> None: """Delete items (series or models) (internal method). Must be overridden by subclass. Parameters ---------- libname : str name of library to delete item from name : str name of item to delete """
[docs] @abstractmethod def _get_metadata(self, libname: TimeSeriesLibs, name: str) -> dict: """Get metadata (internal method). Must be overridden by subclass. Parameters ---------- libname : str name of the library name : str name of the item Returns ------- metadata : dict dictionary containing metadata """
[docs] @abstractmethod def _list_symbols(self, libname: AllLibs) -> list[str]: """Return list of symbol names in library."""
[docs] @abstractmethod def _item_exists(self, libname: AllLibs, name: str) -> bool: """Return True if item present in library, else False."""
@property def oseries_names(self): """List of oseries names. Property must be overridden by subclass. """ return self._list_symbols("oseries") @property def stresses_names(self): """List of stresses names. Property must be overridden by subclass. """ return self._list_symbols("stresses") @property def model_names(self): """List of model names. Property must be overridden by subclass. """ return self._modelnames_cache @property def oseries_with_models(self): """List of oseries used in models. Property must be overridden by subclass. """ self._trigger_links_update_if_needed() return self._list_symbols("oseries_models") @property def stresses_with_models(self): """List of stresses used in models. Property must be overridden by subclass. """ self._trigger_links_update_if_needed() return self._list_symbols("stresses_models")
[docs] @abstractmethod def _parallel( self, func: Callable, names: list[str], kwargs: dict | None = None, progressbar: bool | None = True, max_workers: int | None = None, chunksize: int | None = None, desc: str = "", ) -> None: """Parallel processing of function. Must be overridden by subclass. Parameters ---------- func : function function to apply in parallel names : list list of names to apply function to kwargs : dict additional keyword arguments to pass to function progressbar : bool, optional show progressbar, by default True max_workers : int, optional maximum number of workers, by default None chunksize : int, optional chunksize for parallel processing, by default None desc : str, optional description for progressbar, by default "" """
[docs] def parse_names( self, names: list[str] | str | None = None, libname: AllLibs = "oseries", ) -> list[str]: """Parse names argument and return list of names. Public method that exposes name parsing functionality. Parameters ---------- names : list | str, optional str or list of str or None or 'all' (last two options retrieves all names) libname : str, optional name of library, default is 'oseries' Returns ------- list list of names """ return self._parse_names(names, libname)
@property @functools.lru_cache() def oseries(self) -> pd.DataFrame: """Dataframe with overview of oseries.""" return self.get_metadata(libname="oseries", names=self.oseries_names) @property @functools.lru_cache() def stresses(self) -> pd.DataFrame: """Dataframe with overview of stresses.""" return self.get_metadata(libname="stresses", names=self.stresses_names) @property @functools.lru_cache() def _modelnames_cache(self) -> list[str]: """List of model names.""" return self._list_symbols(libname="models") @property def n_oseries(self): """ Returns the number of oseries. Returns ------- int The number of oseries names. """ return len(self.oseries_names) @property def n_stresses(self): """ Returns the number of stresses. Returns ------- int The number of stresses. """ return len(self.stresses_names) @property def n_models(self): """ Returns the number of models in the store. Returns ------- int The number of models in the store. """ return len(self.model_names) @property @functools.lru_cache() def oseries_models(self) -> dict[str, list[str]]: """List of model names per oseries. Returns ------- d : dict dictionary with oseries names as keys and list of model names as values """ self._trigger_links_update_if_needed() d = {} for onam in self.oseries_with_models: d[onam] = self._get_item("oseries_models", onam) return d @property @functools.lru_cache() def stresses_models(self) -> dict[str, list[str]]: """List of model names per stress. Returns ------- d : dict dictionary with stress names as keys and list of model names as values """ self._trigger_links_update_if_needed() d = {} for stress_name in self.stresses_with_models: d[stress_name] = self._get_item("stresses_models", stress_name) return d
[docs] def _add_series( self, libname: TimeSeriesLibs, series: DataFrameOrSeries, name: str, metadata: dict | None = None, validate: bool | None = None, overwrite: bool = False, ) -> None: """Add series to database (internal method). Parameters ---------- libname : str name of the library to add the series to series : pandas.Series or pandas.DataFrame data to add name : str name of the time series metadata : dict, optional dictionary containing metadata, by default None validate: bool, optional use pastas to validate series, default is None, which will use the USE_PASTAS_VALIDATE_SERIES value (default is True). overwrite : bool, optional overwrite existing dataset with the same name, by default False Raises ------ ItemInLibraryException if overwrite is False and name is already in the database """ if not isinstance(name, str): name = str(name) if metadata: self.validator.validate_metadata(metadata) self.validator.validate_input_series(series) series = self.validator.set_series_name(series, name) if self.validator.pastas_validation_status(validate): if libname == "oseries": if PASTAS_GEQ_150 and not ps.validate_oseries(series): raise ValueError( "oseries does not meet pastas criteria," " see `ps.validate_oseries()`!" ) else: ps.validate_oseries(series) else: if PASTAS_GEQ_150 and not ps.validate_stress(series): raise ValueError( "stress does not meet pastas criteria," " see `ps.validate_stress()`!" ) else: ps.validate_stress(series) in_store = getattr(self, f"{libname}_names") if name not in in_store or overwrite: self._add_item(libname, series, name, metadata=metadata) self._clear_cache(libname) elif (libname == "oseries" and self._item_exists("oseries_models", name)) or ( libname == "stresses" and self._item_exists("stresses_models", name) ): raise SeriesUsedByModel( f"Time series with name '{name}' is used by a model! " "Use overwrite=True to replace existing time series. " "Note that this may modify the model!" ) else: raise ItemInLibraryException( f"Time series with name '{name}' already in '{libname}' library! " "Use overwrite=True to replace existing time series." )
def _update_series( self, libname: TimeSeriesLibs, series: DataFrameOrSeries, name: str, metadata: dict | None = None, validate: bool | None = None, force: bool = False, ) -> None: """Update time series (internal method). Parameters ---------- libname : str name of library series : DataFrameOrSeries time series containing update values name : str name of the time series to update metadata : dict | None, optional optionally provide metadata dictionary which will also update the current stored metadata dictionary, by default None validate: bool, optional use pastas to validate series, default is None, which will use the USE_PASTAS_VALIDATE_SERIES value (default is True). force : bool, optional force update even if time series is used in a model, by default False """ if libname not in ["oseries", "stresses"]: raise ValueError("Library must be 'oseries' or 'stresses'!") if not force: self.validator.check_series_in_models(libname, name) self.validator.validate_input_series(series) series = self.validator.set_series_name(series, name) stored = self._get_series(libname, name, progressbar=False) if self.conn_type == "pas" and not isinstance(series, type(stored)): if isinstance(series, pd.DataFrame): stored = stored.to_frame() # get union of index idx_union = stored.index.union(series.index) # update series with new values update = stored.reindex(idx_union) update.update(series) # metadata update_meta = self._get_metadata(libname, name) if metadata is not None: update_meta.update(metadata) self._add_series( libname, update, name, metadata=update_meta, validate=validate, overwrite=True, )
[docs] def _upsert_series( self, libname: TimeSeriesLibs, series: DataFrameOrSeries, name: str, metadata: dict | None = None, validate: bool | None = None, force: bool = False, ) -> None: """Update or insert series depending on whether it exists in store. Parameters ---------- libname : str name of library series : DataFrameOrSeries time series to update/insert name : str name of the time series metadata : dict | None, optional metadata dictionary, by default None validate : bool, optional use pastas to validate series, default is None, which will use the USE_PASTAS_VALIDATE_SERIES value (default is True). force : bool, optional force update even if time series is used in a model, by default False """ if libname not in ["oseries", "stresses"]: raise ValueError("Library must be 'oseries' or 'stresses'!") if name in getattr(self, f"{libname}_names"): self._update_series( libname, series, name, metadata=metadata, validate=validate, force=force ) else: self._add_series( libname, series, name, metadata=metadata, validate=validate )
[docs] def update_metadata( self, libname: TimeSeriesLibs, name: str, metadata: dict ) -> None: """Update metadata. Note: also retrieves and stores time series as updating only metadata is not supported for some Connectors. Parameters ---------- libname : str name of library name : str name of the item for which to update metadata metadata : dict metadata dictionary that will be used to update the stored metadata """ if libname not in ["oseries", "stresses"]: raise ValueError("Library must be 'oseries' or 'stresses'!") update_meta = self._get_metadata(libname, name) update_meta.update(metadata) # get series, since just updating metadata is not really defined # in all cases s = self._get_series(libname, name, progressbar=False) self._add_series(libname, s, name, metadata=update_meta, overwrite=True)
[docs] def add_oseries( self, series: DataFrameOrSeries, name: str, metadata: dict | None = None, validate: bool | None = None, overwrite: bool = False, ) -> None: """Add oseries to the database. Parameters ---------- series : pandas.Series or pandas.DataFrame data to add name : str name of the time series metadata : dict, optional dictionary containing metadata, by default None. validate : bool, optional use pastas to validate series, default is None, which will use the USE_PASTAS_VALIDATE_SERIES value (default is True). overwrite : bool, optional overwrite existing dataset with the same name, by default False """ self._add_series( "oseries", series, name=name, metadata=metadata, validate=validate, overwrite=overwrite, )
[docs] def add_stress( self, series: DataFrameOrSeries, name: str, kind: str, metadata: dict | None = None, validate: bool | None = None, overwrite: bool = False, ) -> None: """Add stress to the database. Parameters ---------- series : pandas.Series or pandas.DataFrame data to add, if pastas.Timeseries is passed, series_orignal and metadata is stored in database name : str name of the time series kind : str category to identify type of stress, this label is added to the metadata dictionary. metadata : dict, optional dictionary containing metadata, by default None. validate : bool, optional use pastas to validate series, default is True overwrite : bool, optional overwrite existing dataset with the same name, by default False """ if metadata is None: metadata = {} metadata["kind"] = kind self._add_series( "stresses", series, name=name, metadata=metadata, validate=validate, overwrite=overwrite, )
[docs] def add_model( self, ml: ps.Model | dict, overwrite: bool = False, validate_metadata: bool = False, ) -> None: """Add model to the database. Parameters ---------- ml : pastas.Model or dict pastas Model or dictionary to add to the database overwrite : bool, optional if True, overwrite existing model, by default False validate_metadata, bool optional remove unsupported characters from metadata dictionary keys Raises ------ TypeError if model is not pastas.Model or dict ItemInLibraryException if overwrite is False and model is already in the database """ if isinstance(ml, ps.Model): mldict = ml.to_dict(series=False) name = ml.name if validate_metadata: metadata = validate_names(d=ml.oseries.metadata) else: metadata = ml.oseries.metadata elif isinstance(ml, dict): mldict = ml name = ml["name"] metadata = None else: raise TypeError("Expected pastas.Model or dict!") if not isinstance(name, str): name = str(name) if not self._item_exists("models", name) or overwrite: # check if stressmodels supported self.validator.check_stressmodels_supported(ml) # check oseries and stresses names and if they exist in store self.validator.check_model_series_names_duplicates(ml) self.validator.check_oseries_in_store(ml) self.validator.check_stresses_in_store(ml) # write model to store self._add_item("models", mldict, name, metadata=metadata) self._clear_cache("_modelnames_cache") # avoid updating links so parallel operations do not simultaneously # access the same object. Indicate that these links need updating and # clear existing caches. Handle both Manager proxies and booleans if hasattr(self._oseries_links_need_update, "value"): self._oseries_links_need_update.value = True self._stresses_links_need_update.value = True # this won't update main instance in parallel # ensure model name is only added once if name not in self._added_models: self._added_models.append(name) else: self._oseries_links_need_update = True self._stresses_links_need_update = True # ensure model name is only added once if name not in self._added_models: self._added_models.append(name) self._clear_cache("oseries_models") self._clear_cache("stresses_models") else: raise ItemInLibraryException( f"Model with name '{name}' already in 'models' library! " "Use overwrite=True to replace existing model." )
[docs] def _update_series( self, libname: TimeSeriesLibs, series: DataFrameOrSeries, name: str, metadata: dict | None = None, validate: bool | None = None, force: bool = False, ) -> None: """Update time series (internal method). Parameters ---------- libname : str name of library series : DataFrameOrSeries time series containing update values name : str name of the time series to update metadata : dict | None, optional optionally provide metadata dictionary which will also update the current stored metadata dictionary, by default None validate: bool, optional use pastas to validate series, default is None, which will use the USE_PASTAS_VALIDATE_SERIES value (default is True). force : bool, optional force update even if time series is used in a model, by default False """ if libname not in ["oseries", "stresses"]: raise ValueError("Library must be 'oseries' or 'stresses'!") if not force: self.validator.check_series_in_models(libname, name) self.validator.validate_input_series(series) series = self.validator.set_series_name(series, name) stored = self._get_series(libname, name, progressbar=False) if self.conn_type == "pas" and not isinstance(series, type(stored)): if isinstance(series, pd.DataFrame): stored = stored.to_frame() # get union of index idx_union = stored.index.union(series.index) # update series with new values update = stored.reindex(idx_union) update.update(series) # metadata update_meta = self._get_metadata(libname, name) if metadata is not None: update_meta.update(metadata) self._add_series( libname=libname, series=update, name=name, metadata=update_meta, validate=validate, overwrite=True, )
[docs] def update_oseries( self, series: DataFrameOrSeries, name: str, metadata: dict | None = None, force: bool = False, ) -> None: """Update oseries values. Parameters ---------- series : DataFrameOrSeries time series to update stored oseries with name : str name of the oseries to update metadata : dict | None, optional optionally provide metadata, which will update the stored metadata dictionary, by default None force : bool, optional force update even if time series is used in a model, by default False """ self._update_series("oseries", series, name, metadata=metadata, force=force)
[docs] def update_stress( self, series: DataFrameOrSeries, name: str, metadata: dict | None = None, force: bool = False, ) -> None: """Update stresses values. Note: the 'kind' attribute of a stress cannot be updated! To update the 'kind' delete and add the stress again. Parameters ---------- series : DataFrameOrSeries time series to update stored stress with name : str name of the stress to update metadata : dict | None, optional optionally provide metadata, which will update the stored metadata dictionary, by default None force : bool, optional force update even if time series is used in a model, by default False """ self._update_series("stresses", series, name, metadata=metadata, force=force)
[docs] def upsert_oseries( self, series: DataFrameOrSeries, name: str, metadata: dict | None = None, force: bool = False, ) -> None: """Update or insert oseries values depending on whether it exists. Parameters ---------- series : DataFrameOrSeries time series to update/insert name : str name of the oseries metadata : dict | None, optional optionally provide metadata, which will update the stored metadata dictionary if it exists, by default None force : bool, optional force update even if time series is used in a model, by default False """ self._upsert_series("oseries", series, name, metadata=metadata, force=force)
[docs] def upsert_stress( self, series: DataFrameOrSeries, name: str, kind: str, metadata: dict | None = None, force: bool = False, ) -> None: """Update or insert stress values depending on whether it exists. Parameters ---------- series : DataFrameOrSeries time series to update/insert name : str name of the stress metadata : dict | None, optional optionally provide metadata, which will update the stored metadata dictionary if it exists, by default None kind : str category to identify type of stress, this label is added to the metadata dictionary. force : bool, optional force update even if time series is used in a model, by default False """ if metadata is None: metadata = {} metadata["kind"] = kind self._upsert_series("stresses", series, name, metadata=metadata, force=force)
[docs] def del_models(self, names: list | str, verbose: bool = True) -> None: """Delete model(s) from the database. Parameters ---------- names : str | list[str] name(s) of the model to delete verbose : bool, optional print information about deleted models, by default True """ names = self._parse_names(names, libname="models") for n in names: mldict = self.get_models(n, return_dict=True) oname = mldict["oseries"]["name"] self._del_item("models", n) # delete reference to added model if present if n in self._added_models: self._added_models.remove(n) else: self._del_oseries_model_link(oname, n) self._del_stress_model_link(self._get_model_stress_names(mldict), n) self._clear_cache("_modelnames_cache") if verbose: logger.info("Deleted %d model(s) from database.", len(names))
[docs] def del_model(self, names: list | str, verbose: bool = True) -> None: """Delete model(s) from the database. Alias for del_models(). Parameters ---------- names : str | list[str] name(s) of the model to delete verbose : bool, optional print information about deleted models, by default True """ self.del_models(names=names, verbose=verbose)
[docs] def del_oseries( self, names: list | str, remove_models: bool = False, force: bool = False, verbose: bool = True, ): """Delete oseries from the database. Parameters ---------- names : str | list[str] name(s) of the oseries to delete remove_models : bool, optional also delete models for deleted oseries, default is False force : bool, optional force deletion of oseries that are used in models, by default False verbose : bool, optional print information about deleted oseries, by default True """ names = self._parse_names(names, libname="oseries") for n in names: self._del_item("oseries", n, force=force) self._clear_cache("oseries") if verbose: logger.info("Deleted %d oseries from database.", len(names)) # remove associated models from database if remove_models: modelnames = list( chain.from_iterable([self.oseries_models.get(n, []) for n in names]) ) self.del_models(modelnames, verbose=verbose) if verbose: logger.info("Deleted %d model(s) from database.", len(modelnames))
[docs] def del_stress( self, names: list | str, remove_models: bool = False, force: bool = False, verbose: bool = True, ): """Delete stress from the database. Parameters ---------- names : str | list[str] name(s) of the stress to delete remove_models : bool, optional also delete models for deleted stresses, default is False force : bool, optional force deletion of stresses that are used in models, by default False verbose : bool, optional print information about deleted stresses, by default True """ names = self._parse_names(names, libname="stresses") for n in names: self._del_item("stresses", n, force=force) self._clear_cache("stresses") if verbose: logger.info("Deleted %d stress(es) from database.", len(names)) # remove associated models from database if remove_models: modelnames = list( chain.from_iterable([self.stresses_models.get(n, []) for n in names]) ) self.del_models(modelnames, verbose=verbose) if verbose: logger.info("Deleted %d model(s) from database.", len(modelnames))
[docs] def _get_series( self, libname: str, names: list | str, progressbar: bool = True, squeeze: bool = True, ) -> DataFrameOrSeries: """Get time series (internal method). Parameters ---------- libname : str name of the library names : str | list[str] names of the time series to load progressbar : bool, optional show progressbar, by default True squeeze : bool, optional if True return DataFrame or Series instead of dictionary for single entry Returns ------- pandas.DataFrame or dict of pandas.DataFrames either returns time series as pandas.DataFrame or dictionary containing the time series. """ ts = {} names = self._parse_names(names, libname=libname) desc = f"Get {libname}" n = None for n in tqdm(names, desc=desc) if progressbar else names: ts[n] = self._get_item(libname, n) # return frame if len == 1 if len(ts) == 1 and squeeze: return ts[n] else: return ts
[docs] def get_metadata( self, libname: str, names: list | str, progressbar: bool = False, as_frame: bool = True, squeeze: bool = True, ) -> dict[str, Any] | pd.DataFrame: """Read metadata from database. Parameters ---------- libname : str name of the library containing the dataset names : str | list[str] names of the datasets for which to read the metadata squeeze : bool, optional if True return dict instead of list of dict for single entry Returns ------- dict | pandas.DataFrame returns metadata dictionary or DataFrame of metadata """ metalist = [] names = self._parse_names(names, libname=libname) desc = f"Get metadata {libname}" for n in tqdm(names, desc=desc) if progressbar else names: imeta = self._get_metadata(libname, n) if imeta is None: imeta = {} metalist.append(imeta) if as_frame: meta = self._meta_list_to_frame(metalist, names=names) return meta else: if len(metalist) == 1 and squeeze: return metalist[0] else: return metalist
[docs] def get_oseries( self, names: list | str, return_metadata: bool = False, progressbar: bool = False, squeeze: bool = True, ) -> DataFrameOrSeries | dict | list | None: """Get oseries from database. Parameters ---------- names : str | list[str] names of the oseries to load return_metadata : bool, optional return metadata as dictionary or list of dictionaries, default is False progressbar : bool, optional show progressbar, by default False squeeze : bool, optional if True return DataFrame or Series instead of dictionary for single entry Returns ------- oseries : pandas.DataFrame or dict of DataFrames returns time series as DataFrame or dictionary of DataFrames if multiple names were passed metadata : dict | list[dict] metadata for each oseries, only returned if return_metadata=True """ oseries = self._get_series( "oseries", names, progressbar=progressbar, squeeze=squeeze ) if return_metadata: metadata = self.get_metadata( "oseries", names, progressbar=progressbar, as_frame=False, squeeze=squeeze, ) return oseries, metadata else: return oseries
[docs] def get_stresses( self, names: list[str] | str, return_metadata: bool = False, progressbar: bool = False, squeeze: bool = True, ) -> DataFrameOrSeries | dict | list | None: """Get stresses from database. Parameters ---------- names : str | list[str] names of the stresses to load return_metadata : bool, optional return metadata as dictionary or list of dictionaries, default is False progressbar : bool, optional show progressbar, by default False squeeze : bool, optional if True return DataFrame or Series instead of dictionary for single entry Returns ------- stresses : pandas.DataFrame or dict of DataFrames returns time series as DataFrame or dictionary of DataFrames if multiple names were passed metadata : dict | list[dict] metadata for each stress, only returned if return_metadata=True """ stresses = self._get_series( "stresses", names, progressbar=progressbar, squeeze=squeeze ) if return_metadata: metadata = self.get_metadata( "stresses", names, progressbar=progressbar, as_frame=False, squeeze=squeeze, ) return stresses, metadata else: return stresses
[docs] def get_stress( self, names: list | str, return_metadata: bool = False, progressbar: bool = False, squeeze: bool = True, ) -> DataFrameOrSeries | dict | list | None: """Get stresses from database. Alias for `get_stresses()` Parameters ---------- names : str | list[str] names of the stresses to load return_metadata : bool, optional return metadata as dictionary or list of dictionaries, default is False progressbar : bool, optional show progressbar, by default False squeeze : bool, optional if True return DataFrame or Series instead of dictionary for single entry Returns ------- stresses : pandas.DataFrame or dict of DataFrames returns time series as DataFrame or dictionary of DataFrames if multiple names were passed metadata : dict | list[dict] metadata for each stress, only returned if return_metadata=True """ return self.get_stresses( names, return_metadata=return_metadata, progressbar=progressbar, squeeze=squeeze, )
[docs] def get_models( self, names: list | str, return_dict: bool = False, progressbar: bool = False, squeeze: bool = True, update_ts_settings: bool = False, ) -> ps.Model | list: """Load models from database. Parameters ---------- names : str | list[str] names of the models to load return_dict : bool, optional return model dictionary instead of pastas.Model (much faster for obtaining parameters, for example) progressbar : bool, optional show progressbar, by default False squeeze : bool, optional if True return Model instead of list of Models for single entry update_ts_settings : bool, optional update time series settings based on time series in store. overwrites stored tmin/tmax in model. Returns ------- pastas.Model or list of pastas.Model return pastas model, or list of models if multiple names were passed """ models = [] names = self._parse_names(names, libname="models") desc = "Get models" for n in tqdm(names, desc=desc) if progressbar else names: data = self._get_item("models", n) if return_dict: ml = data else: ml = self._parse_model_dict(data, update_ts_settings=update_ts_settings) models.append(ml) if len(models) == 1 and squeeze: return models[0] else: return models
[docs] def get_model( self, names: list | str, return_dict: bool = False, progressbar: bool = False, squeeze: bool = True, update_ts_settings: bool = False, ) -> ps.Model | list: """Load models from database. Alias for get_models(). Parameters ---------- names : str | list[str] names of the models to load return_dict : bool, optional return model dictionary instead of pastas.Model (much faster for obtaining parameters, for example) progressbar : bool, optional show progressbar, by default False squeeze : bool, optional if True return Model instead of list of Models for single entry update_ts_settings : bool, optional update time series settings based on time series in store. overwrites stored tmin/tmax in model. Returns ------- pastas.Model or list of pastas.Model return pastas model, or list of models if multiple names were passed """ return self.get_models( names, return_dict=return_dict, progressbar=progressbar, squeeze=squeeze, update_ts_settings=update_ts_settings, )
[docs] def empty_library( self, libname: AllLibs, prompt: bool = True, progressbar: bool = True ): """Empty library of all its contents. Parameters ---------- libname : str name of the library prompt : bool, optional prompt user for input before deleting contents, by default True. Default answer is "n", user must enter 'y' to delete contents progressbar : bool, optional show progressbar, by default True """ if prompt: ui = input( f"Do you want to empty '{libname}' library of all its contents? [y/N] " ) if ui.lower() != "y": return if libname == "models": # also delete linked modelnames linked to oseries and stresses libs = ["models", "oseries_models", "stresses_models"] else: libs = [libname] # delete items and clear caches for libname in libs: names = self._parse_names(None, libname) for name in ( tqdm(names, desc=f"Deleting items from {libname}") if progressbar else names ): self._del_item(libname, name, force=True) self._clear_cache(libname) logger.info( "Emptied library %s in %s: %s", libname, self.name, self.__class__ )
[docs] def _iter_series(self, libname: TimeSeriesLibs, names: list[str] | None = None): """Iterate over time series in library (internal method). Parameters ---------- libname : str name of library (e.g. 'oseries' or 'stresses') names : list[str] | None, optional list of names, by default None, which defaults to all stored series Yields ------ pandas.Series or pandas.DataFrame time series contained in library """ names = self._parse_names(names, libname) for name in names: yield self._get_series(libname, name, progressbar=False)
[docs] def iter_oseries(self, names: list[str] | None = None): """Iterate over oseries in library. Parameters ---------- names : list[str] | None, optional list of oseries names, by default None, which defaults to all stored series Yields ------ pandas.Series or pandas.DataFrame oseries contained in library """ yield from self._iter_series("oseries", names=names)
[docs] def iter_stresses(self, names: list[str] | None = None): """Iterate over stresses in library. Parameters ---------- names : list[str] | None, optional list of stresses names, by default None, which defaults to all stored series Yields ------ pandas.Series or pandas.DataFrame stresses contained in library """ yield from self._iter_series("stresses", names=names)
[docs] def iter_models( self, modelnames: list[str] | None = None, return_dict: bool = False ): """Iterate over models in library. Parameters ---------- modelnames : list[str] | None, optional list of models to iterate over, by default None which uses all models return_dict : bool, optional if True, return model as dictionary, by default False, which returns a pastas.Model. Yields ------ pastas.Model or dict time series model """ modelnames = self._parse_names(modelnames, "models") for mlnam in modelnames: yield self.get_models(mlnam, return_dict=return_dict, progressbar=False)
[docs] def _get_model_stress_names(self, ml: ps.Model | dict) -> list[str]: """Get list of stress names used in model. Parameters ---------- ml : pastas.Model or dict model to get stress names from Returns ------- list[str] list of stress names used in model """ stresses = [] if isinstance(ml, dict): for sm in ml["stressmodels"].values(): class_key = "class" if sm[class_key] == "RechargeModel": stresses.append(sm["prec"]["name"]) stresses.append(sm["evap"]["name"]) if sm["temp"] is not None: stresses.append(sm["temp"]["name"]) elif "stress" in sm: smstress = sm["stress"] if isinstance(smstress, dict): smstress = [smstress] for s in smstress: stresses.append(s["name"]) else: for sm in ml.stressmodels.values(): # Check class name using type instead of protected _name attribute if type(sm).__name__ == "RechargeModel": stresses.append(sm.prec.name) stresses.append(sm.evap.name) if sm.temp is not None: stresses.append(sm.temp.name) elif hasattr(sm, "stress"): smstress = sm.stress if not isinstance(smstress, (list, tuple)): smstress = [smstress] for s in smstress: stresses.append(s.name) elif hasattr(sm, "stresses"): stresses += list(sm.stresses) return list(set(stresses))
[docs] def get_model_time_series_names( self, modelnames: list[str] | str | None = None, dropna: bool = True, progressbar: bool = True, ) -> DataFrameOrSeries: """Get time series names contained in model. Parameters ---------- modelnames : list[str] | str | None, optional list or name of models to get time series names for, by default None which will use all modelnames dropna : bool, optional drop stresses from table if stress is not included in any model, by default True progressbar : bool, optional show progressbar, by default True Returns ------- structure : pandas.DataFrame returns DataFrame with oseries name per model, and a flag indicating whether a stress is contained within a time series model. """ model_names = self._parse_names(modelnames, libname="models") structure = pd.DataFrame( index=pd.Index(model_names), columns=pd.Index(["oseries"] + self.stresses_names), ) structure.index.name = "model" for mlnam in ( tqdm(model_names, desc="Get model time series names") if progressbar else model_names ): mldict = self.get_models(mlnam, return_dict=True) stresses_names = self._get_model_stress_names(mldict) # oseries structure.loc[mlnam, "oseries"] = mldict["oseries"]["name"] # stresses structure.loc[mlnam, stresses_names] = 1 if dropna: return structure.dropna(how="all", axis=1) else: return structure
[docs] @staticmethod def _clear_cache(libname: AllLibs) -> None: """Clear cached property.""" if libname == "models": libname = "_modelnames_cache" getattr(BaseConnector, libname).fget.cache_clear()
[docs] def rebuild(self): """Rebuild the database. No data is lost in this operation! The reverse lookup tables are updated, and the cached properties are deleted so they are updated on next access. This method can be used to repair the database in case of corruption or inconsistencies. Use when models have been deleted from the models library, but are still present in the oseries_models and stresses_models libraries, or vice versa. """ logger.info("Rebuilding database '%s' ...", self.name) self.empty_library("oseries_models", prompt=False, progressbar=False) self.empty_library("stresses_models", prompt=False, progressbar=False) for libname in ["models", "oseries", "stresses"]: self._clear_cache(libname) self._update_time_series_model_links(progressbar=True) logger.info("Database '%s' rebuilt.", self.name)
[docs] class ModelAccessor: """Object for managing access to stored models. The ModelAccessor object allows dictionary-like assignment and access to models. In addition it provides some useful utilities for working with stored models in the database. Examples -------- Get a model by name:: >>> model = pstore.models["my_model"] Store a model in the database:: >>> pstore.models["my_model_v2"] = model Get model metadata dataframe:: >>> pstore.models.metadata Number of models:: >>> len(pstore.models) Random model:: >>> model = pstore.models.random() Iterate over stored models:: >>> for ml in pstore.models: >>> ml.solve() """ def __init__(self, conn): """Initialize model accessor. Parameters ---------- conn : pastastore.*Connector type connector """ self.conn = conn def __repr__(self): """Representation contains the number of models and the list of model names.""" return ( f"<{self.__class__.__name__}> {len(self)} model(s): \n" + self.conn.model_names.__repr__() ) def __getitem__(self, name: str): """Get model from store with model name as key. Parameters ---------- name : str name of the model """ return self.conn.get_models(name) def __setitem__(self, name: str, ml: ps.Model): """Set item. Parameters ---------- name : str name of the model ml : pastas.Model or dict model to add to the pastastore """ ml.name = name self.conn.add_model(ml, overwrite=True) def __iter__(self): """Iterate over models. Yields ------ ml : pastas.Model model """ yield from self.conn.iter_models() def __len__(self): """No. of models """ return self.conn.n_models
[docs] def random(self): """Return a random model. Returns ------- pastas.Model A random model object from the connection. """ return self.conn.get_models(choice(self.conn.model_names))
@property def metadata(self): """Dataframe with overview of models metadata.""" # NOTE: cannot be cached as this dataframe is not a property of the connector # I'm not sure how to clear this cache when models are added/removed. idx = pd.MultiIndex.from_tuples( ((k, i) for k, v in self.conn.oseries_models.items() for i in v), names=["oseries", "modelname"], ) modeldf = pd.DataFrame(index=idx) modeldf = modeldf.join(self.conn.oseries, on=modeldf.index.get_level_values(0)) # drop key_0 column if it exists if "key_0" in modeldf.columns: modeldf.drop("key_0", axis=1, inplace=True) modeldf["n_stressmodels"] = 0 for onam, mlnam in modeldf.index: mldict = self.conn.get_models(mlnam, return_dict=True) modeldf.loc[(onam, mlnam), "n_stressmodels"] = len(mldict["stressmodels"]) modeldf.loc[(onam, mlnam), "stressmodel_names"] = ",".join( list(mldict["stressmodels"].keys()) ) for setting in mldict["settings"].keys(): modeldf.loc[(onam, mlnam), setting] = mldict["settings"][setting] return modeldf