Source code for pastastore.util

import os
from typing import Dict, List, Optional, Union

import numpy as np
import pandas as pd
from numpy.lib._iotools import NameValidator
from pandas.testing import assert_series_equal
from pastas.stats.tests import runs_test, stoffer_toloi
from tqdm.auto import tqdm

from pastastore.version import PASTAS_LEQ_022


def _custom_warning(message, category=UserWarning, filename="", lineno=-1, *args):
    print(f"{filename}:{lineno}: {category.__name__}: {message}")


[docs] class ItemInLibraryException(Exception): pass
[docs] def delete_pystore_connector( conn=None, path: Optional[str] = None, name: Optional[str] = None, libraries: Optional[List[str]] = None, ) -> None: # pragma: no cover """Delete libraries from pystore. Parameters ---------- conn : PystoreConnector, optional PystoreConnector object path : str, optional path to pystore name : str, optional name of the pystore libraries : Optional[List[str]], optional list of library names to delete, by default None which deletes all libraries """ import pystore if conn is not None: name = conn.name path = conn.path elif name is None or path is None: raise ValueError("Please provide 'name' and 'path' OR 'conn'!") print(f"Deleting PystoreConnector database: '{name}' ...", end="") pystore.set_path(path) if libraries is None: pystore.delete_store(name) print(" Done!") else: store = pystore.store(name) for lib in libraries: print() store.delete_collection(lib) print(f" - deleted: {lib}")
[docs] def delete_arctic_connector( conn=None, connstr: Optional[str] = None, name: Optional[str] = None, libraries: Optional[List[str]] = None, ) -> None: # pragma: no cover """Delete libraries from arctic database. Parameters ---------- conn : pastastore.ArcticConnector ArcticConnector object connstr : str, optional connection string to the database name : str, optional name of the database libraries : Optional[List[str]], optional list of library names to delete, by default None which deletes all libraries """ import arctic if conn is not None: name = conn.name connstr = conn.connstr elif name is None or connstr is None: raise ValueError("Provide 'name' and 'connstr' OR 'conn'!") arc = arctic.Arctic(connstr) print(f"Deleting ArcticConnector database: '{name}' ... ", end="") # get library names if libraries is None: libs = [] for ilib in arc.list_libraries(): if ilib.split(".")[0] == name: libs.append(ilib) elif name is not None: libs = [name + "." + ilib for ilib in libraries] else: raise ValueError("Provide 'name' and 'connstr' OR 'conn'!") for lib in libs: arc.delete_library(lib) if libraries is not None: print() print(f" - deleted: {lib}") print("Done!")
[docs] def delete_arcticdb_connector( conn=None, uri: Optional[str] = None, name: Optional[str] = None, libraries: Optional[List[str]] = None, ) -> None: """Delete libraries from arcticDB database. Parameters ---------- conn : pastastore.ArcticDBConnector ArcticDBConnector object uri : str, optional uri connection string to the database name : str, optional name of the database libraries : Optional[List[str]], optional list of library names to delete, by default None which deletes all libraries """ import shutil import arcticdb if conn is not None: name = conn.name uri = conn.uri elif name is None or uri is None: raise ValueError("Provide 'name' and 'uri' OR 'conn'!") arc = arcticdb.Arctic(uri) print(f"Deleting ArcticDBConnector database: '{name}' ... ", end="") # get library names if libraries is None: libs = [] for ilib in arc.list_libraries(): if ilib.split(".")[0] == name: # TODO: remove replace when arcticdb is able to delete libs.append(ilib.replace(".", "/")) elif name is not None: # TODO: replace / with . when arcticdb is able to delete libs = [name + "/" + ilib for ilib in libraries] else: raise ValueError("Provide 'name' and 'uri' OR 'conn'!") for lib in libs: # arc.delete_library(lib) # TODO: not working at the moment. shutil.rmtree(os.path.join(conn.uri.split("//")[-1], lib)) if libraries is not None: print() print(f" - deleted: {lib}") remaining = [ilib for ilib in arc.list_libraries() if ilib.split(".") == name] if len(remaining) == 0: shutil.rmtree(os.path.join(conn.uri.split("//")[-1], name)) print("Done!")
def delete_dict_connector(conn, libraries: Optional[List[str]] = None) -> None: print(f"Deleting DictConnector: '{conn.name}' ... ", end="") if libraries is None: del conn print(" Done!") else: for lib in libraries: print() delattr(conn, f"lib_{conn.libname[lib]}") print(f" - deleted: {lib}") print("Done!") def delete_pas_connector(conn, libraries: Optional[List[str]] = None) -> None: import shutil print(f"Deleting PasConnector database: '{conn.name}' ... ", end="") if libraries is None: shutil.rmtree(conn.path) print(" Done!") else: for lib in libraries: print() shutil.rmtree(os.path.join(conn.path, lib)) print(f" - deleted: {lib}") print("Done!")
[docs] def delete_pastastore(pstore, libraries: Optional[List[str]] = None) -> None: """Delete libraries from PastaStore. Note ---- This deletes the original PastaStore object. To access data that has not been deleted, it is recommended to create a new PastaStore object with the same Connector settings. This also creates new empty libraries if they were deleted. Parameters ---------- pstore : pastastore.PastaStore PastaStore object to delete (from) libraries : Optional[List[str]], optional list of library names to delete, by default None which deletes all libraries Raises ------ TypeError when Connector type is not recognized """ if pstore.conn.conn_type == "pystore": delete_pystore_connector(conn=pstore.conn, libraries=libraries) elif pstore.conn.conn_type == "dict": delete_dict_connector(pstore) elif pstore.conn.conn_type == "arctic": delete_arctic_connector(conn=pstore.conn, libraries=libraries) elif pstore.conn.conn_type == "arcticdb": delete_arcticdb_connector(conn=pstore.conn, libraries=libraries) elif pstore.conn.conn_type == "pas": delete_pas_connector(conn=pstore.conn, libraries=libraries) else: raise TypeError( "Unrecognized pastastore Connector type: " f"{pstore.conn.conn_type}" )
[docs] def validate_names( s: Optional[str] = None, d: Optional[dict] = None, replace_space: Optional[str] = "_", deletechars: Optional[str] = None, **kwargs, ) -> Union[str, Dict]: """Remove invalid characters from string or dictionary keys. Parameters ---------- s : str, optional remove invalid characters from string d : dict, optional remove invalid characters from keys from dictionary replace_space : str, optional replace spaces by this character, by default "_" deletechars : str, optional a string combining invalid characters, by default None Returns ------- str, dict string or dict with invalid characters removed """ validator = NameValidator( replace_space=replace_space, deletechars=deletechars, **kwargs ) if s is not None: new_str = validator(s) # tuple if len(new_str) == 1: return new_str[0] else: return new_str elif d is not None: new_dict = {} for k, v in d.items(): new_dict[validator(k)[0]] = v return new_dict else: raise ValueError("Provide one of 's' (string) or 'd' (dict)!")
[docs] def compare_models(ml1, ml2, stats=None, detailed_comparison=False): """Compare two Pastas models. Parameters ---------- ml1 : pastas.Model first model to compare ml2 : pastas.Model second model to compare stats : list of str, optional if provided compare these model statistics detailed_comparison : bool, optional if True return DataFrame containing comparison details, by default False which returns True if models are equivalent or False if they are not Returns ------- bool or pd.DataFrame returns True if models are equivalent when detailed_comparison=True else returns DataFrame containing comparison details. """ df = pd.DataFrame(columns=["model 0", "model 1"]) so1 = [] # for storing series_original sv1 = [] # for storing series_validated ss1 = [] # for storing series for i, ml in enumerate([ml1, ml2]): counter = 0 # for counting stress time series df.loc["name:", f"model {i}"] = ml.name for k in ml.settings.keys(): df.loc[f"- settings: {k}", f"model {i}"] = ml.settings.get(k) if i == 0: oso = ( ml.oseries.series_original if PASTAS_LEQ_022 else ml.oseries._series_original ) df.loc["oseries: series_original", f"model {i}"] = True if PASTAS_LEQ_022: osv = ml.oseries.series_validated df.loc["oseries: series_validated", f"model {i}"] = True oss = ml.oseries.series df.loc["oseries: series_series", f"model {i}"] = True elif i == 1: try: assert_series_equal( oso, ( ml.oseries.series_original if PASTAS_LEQ_022 else ml.oseries._series_original ), ) compare_oso = True except (ValueError, AssertionError): # series are not identical in length or index does not match compare_oso = False if PASTAS_LEQ_022: try: assert_series_equal(osv, ml.oseries.series_validated) compare_osv = True except (ValueError, AssertionError): # series are not identical in length or index does not match compare_osv = False try: assert_series_equal(oss, ml.oseries.series) compare_oss = True except (ValueError, AssertionError): # series are not identical in length or index does not match compare_oss = False df.loc["oseries: series_original", f"model {i}"] = compare_oso if PASTAS_LEQ_022: df.loc["oseries: series_validated", f"model {i}"] = compare_osv df.loc["oseries: series_series", f"model {i}"] = compare_oss for sm_name, sm in ml.stressmodels.items(): df.loc[f"stressmodel: '{sm_name}'"] = sm_name df.loc["- rfunc"] = sm.rfunc._name if sm.rfunc is not None else "NA" if sm._name == "RechargeModel": stresses = [sm.prec, sm.evap] else: stresses = sm.stress for ts in stresses: df.loc[f"- time series: '{ts.name}'"] = ts.name for tsk in ts.settings.keys(): df.loc[f" - {ts.name} settings: {tsk}", f"model {i}"] = ( ts.settings[tsk] ) if i == 0: if PASTAS_LEQ_022: so1.append(ts.series_original.copy()) sv1.append(ts.series_validated.copy()) else: so1.append(ts._series_original.copy()) ss1.append(ts.series.copy()) if PASTAS_LEQ_022: df.loc[f" - {ts.name}: series_validated"] = True df.loc[f" - {ts.name}: series_original"] = True df.loc[f" - {ts.name}: series"] = True elif i == 1: # ValueError if series cannot be compared, # set result to False try: assert_series_equal( so1[counter], ( ts.series_original if PASTAS_LEQ_022 else ts._series_original ), ) compare_so1 = True except (ValueError, AssertionError): compare_so1 = False if PASTAS_LEQ_022: try: assert_series_equal(sv1[counter], ts.series_validated) compare_sv1 = True except (ValueError, AssertionError): compare_sv1 = False try: assert_series_equal(ss1[counter], ts.series) compare_ss1 = True except (ValueError, AssertionError): compare_ss1 = False df.loc[f" - {ts.name}: series_original"] = compare_so1 if PASTAS_LEQ_022: df.loc[f" - {ts.name}: series_validated"] = compare_sv1 df.loc[f" - {ts.name}: series"] = compare_ss1 counter += 1 for p in ml.parameters.index: df.loc[f"param: {p} (init)", f"model {i}"] = ml.parameters.loc[p, "initial"] df.loc[f"param: {p} (opt)", f"model {i}"] = ml.parameters.loc[p, "optimal"] if stats: stats_df = ml.stats.summary(stats=stats) for s in stats: df.loc[f"stats: {s}", f"model {i}"] = stats_df.loc[s, "Value"] # compare df["comparison"] = df.iloc[:, 0] == df.iloc[:, 1] # isclose for params param_mask = df.index.str.startswith("param: ") df.loc[param_mask, "comparison"] = np.isclose( df.loc[param_mask, "model 0"].astype(float).values, df.loc[param_mask, "model 1"].astype(float).values, ) # ensure NaN == NaN is not counted as a difference nanmask = df.iloc[:, 0].isna() & df.iloc[:, 1].isna() df.loc[nanmask, "comparison"] = True # for stats comparison must be almost_equal if stats: stats_idx = [f"stats: {s}" for s in stats] b = np.isclose( df.loc[stats_idx, "model 0"].astype(float).values, df.loc[stats_idx, "model 1"].astype(float).values, ) df.loc[stats_idx, "comparison"] = b if detailed_comparison: return df else: return df["comparison"].iloc[1:].all() # ignore name difference
[docs] def copy_database( conn1, conn2, libraries: Optional[List[str]] = None, overwrite: bool = False, progressbar: bool = False, ) -> None: """Copy libraries from one database to another. Parameters ---------- conn1 : pastastore.*Connector source Connector containing link to current database containing data conn2 : pastastore.*Connector destination Connector with link to database to which you want to copy libraries : Optional[List[str]], optional list of str containing names of libraries to copy, by default None, which copies all libraries: ['oseries', 'stresses', 'models'] overwrite : bool, optional overwrite data in destination database, by default False progressbar : bool, optional show progressbars, by default False Raises ------ ValueError if library name is not understood """ if libraries is None: libraries = ["oseries", "stresses", "models"] for lib in libraries: if lib == "oseries": for name in ( tqdm(conn1.oseries_names, desc="copying oseries") if progressbar else conn1.oseries_names ): o, meta = conn1.get_oseries(name, return_metadata=True) conn2.add_oseries(o, name, metadata=meta, overwrite=overwrite) elif lib == "stresses": for name in ( tqdm(conn1.stresses_names, desc="copying stresses") if progressbar else conn1.stresses_names ): s, meta = conn1.get_stresses(name, return_metadata=True) conn2.add_stress( s, name, kind=meta["kind"], metadata=meta, overwrite=overwrite, ) elif lib == "models": for name in ( tqdm(conn1.model_names, desc="copying models") if progressbar else conn1.model_names ): mldict = conn1.get_models(name, return_dict=True) conn2.add_model(mldict, overwrite=overwrite) else: raise ValueError(f"Library name '{lib}' not recognized!")
[docs] def frontiers_checks( pstore, modelnames: Optional[List[str]] = None, oseries: Optional[List[str]] = None, check1_rsq: bool = True, check1_threshold: float = 0.7, check2_autocor: bool = True, check2_test: str = "runs", check2_pvalue: float = 0.05, check3_tmem: bool = True, check3_cutoff: float = 0.95, check4_gain: bool = True, check5_parambounds: bool = False, csv_dir: Optional[str] = None, ) -> pd.DataFrame: # pragma: no cover """Check models in a PastaStore to see if they pass reliability criteria. The reliability criteria are taken from Brakenhoff et al. 2022 [bra_2022]_. These criteria were applied in a region with recharge, river levels and pumping wells as stresses. This is by no means an exhaustive list of reliability criteria but might serve as a reasonable starting point for model diagnostic checking. Parameters ---------- pstore : pastastore.PastaStore reference to a PastaStore modelnames : list of str, optional list of model names to consider, if None checks 'oseries', if both are None, all stored models will be checked oseries : list of str, optional list of oseries to consider, corresponding models will be picked up from pastastore. If None, uses all stored models are checked. check1 : bool, optional check if model fit is above a threshold of the coefficient of determination $R^2$ , by default True check1_threshold : float, optional threshold of the $R^2$ fit statistic, by default 0.7 check2 : bool, optional check if the noise of the model has autocorrelation with statistical test, by default True check2_test : str, optional statistical test for autocorrelation. Available options are Runs test "runs", Stoffer-Toloi "stoffer" or "both", by default "runs" check2_pvalue : float, optional p-value for the statistical test to define the confindence interval, by default 0.05 check3 : bool, optional check if the length of the response time is within the calibration period, by default True check3_cutoff : float, optional the cutoff of the response time, by default 0.95 check4 : bool, optional check if the uncertainty of the gain, by default True check5 : bool, optional check if parameters hit parameter bounds, by default False csv_dir : string, optional directory to store CSV file with overview of checks for every model, by default None which will not store results Returns ------- df : pandas.DataFrame dataFrame with all models and whether or not they pass the reliability checks References ---------- .. [bra_2022] Brakenhoff, D.A., Vonk M.A., Collenteur, R.A., van Baar, M., Bakker, M.: Application of Time Series Analysis to Estimate Drawdown From Multiple Well Fields. Front. Earth Sci., 14 June 2022 doi:10.3389/feart.2022.907609 """ df = pd.DataFrame(columns=["all_checks_passed"]) if modelnames is not None: models = modelnames if oseries is not None: print( "Warning! Both 'modelnames' and 'oseries' provided," " only using 'modelnames'!" ) elif oseries is not None: models = [] for o in oseries: models += pstore.oseries_models[o] else: models = pstore.model_names for mlnam in tqdm(models, desc="Running model diagnostics"): ml = pstore.get_models(mlnam) if ml.parameters["optimal"].hasnans: print(f"Warning! Skipping model '{mlnam}' because " "it is not solved!") continue checks = pd.DataFrame(columns=["stat", "threshold", "units", "check_passed"]) # Check 1 - Fit Statistic if check1_rsq: rsq = ml.stats.rsq() check_rsq_passed = rsq >= check1_threshold checks.loc["rsq >= threshold", :] = ( rsq, check1_threshold, "-", check_rsq_passed, ) # Check 2 - Autocorrelation Noise if check2_autocor: noise = ml.noise() if noise is None: noise = ml.residuals() print( "Warning! Checking autocorrelation on the residuals not the noise!" ) if check2_test == "runs" or check2_test == "both": _, p_runs = runs_test(noise.iloc[1:]) if p_runs > check2_pvalue: # No autocorrelation check_runs_acf_passed = True else: # Significant autocorrelation check_runs_acf_passed = False checks.loc["ACF: Runs test", :] = ( p_runs, check2_pvalue, "-", check_runs_acf_passed, ) if check2_test == "stoffer" or check2_test == "both": _, p_stoffer = stoffer_toloi( noise.iloc[1:], snap_to_equidistant_timestamps=True ) if p_stoffer > check2_pvalue: check_st_acf_passed = True else: check_st_acf_passed = False checks.loc["ACF: Stoffer-Toloi test", :] = ( p_stoffer, check2_pvalue, "-", check_st_acf_passed, ) # Check 3 - Response Time if check3_tmem: len_oseries_calib = (ml.settings["tmax"] - ml.settings["tmin"]).days for sm_name, sm in ml.stressmodels.items(): if sm._name == "WellModel": nwells = sm.distances.index.size for iw in range(nwells): p = sm.get_parameters(model=ml, istress=iw) t = sm.rfunc.get_t(p, dt=1, cutoff=0.999) step = sm.rfunc.step(p, cutoff=0.999) / sm.rfunc.gain(p) tmem = np.interp(check3_cutoff, step, t) check_tmem_passed = tmem < len_oseries_calib / 2 idxlbl = ( f"calib_period > 2*t_mem_95%: " f"{sm_name}-{iw:02g} ({sm.distances.index[iw]})" ) checks.loc[idxlbl, :] = ( tmem, len_oseries_calib, "days", check_tmem_passed, ) else: tmem = ml.get_response_tmax(sm_name, cutoff=check3_cutoff) if tmem is None: # no rfunc in stressmodel tmem = 0 check_tmem_passed = tmem < len_oseries_calib / 2 checks.loc[f"calib_period > 2*t_mem_95%: {sm_name}", :] = ( tmem, len_oseries_calib, "days", check_tmem_passed, ) # Check 4 - Uncertainty Gain if check4_gain: for sm_name, sm in ml.stressmodels.items(): if sm._name == "WellModel": for iw in range(sm.distances.index.size): p = sm.get_parameters(model=ml, istress=iw) gain = sm.rfunc.gain(p) gain_std = np.sqrt(sm.variance_gain(model=ml, istress=iw)) if gain_std is None: gain_std = np.nan check_gain_passed = pd.NA elif np.isnan(gain_std): check_gain_passed = pd.NA else: check_gain_passed = np.abs(gain) > 2 * gain_std checks.loc[ f"gain > 2*std: {sm_name}-{iw:02g} ({sm.distances.index[iw]})", :, ] = ( gain, 2 * gain_std, "(unit head)/(unit well stress)", check_gain_passed, ) continue elif sm._name == "LinearTrend": gain = ml.parameters.loc[f"{sm_name}_a", "optimal"] gain_std = ml.parameters.loc[f"{sm_name}_a", "stderr"] elif sm._name == "StepModel": gain = ml.parameters.loc[f"{sm_name}_d", "optimal"] gain_std = ml.parameters.loc[f"{sm_name}_d", "stderr"] else: gain = ml.parameters.loc[f"{sm_name}_A", "optimal"] gain_std = ml.parameters.loc[f"{sm_name}_A", "stderr"] if gain_std is None: gain_std = np.nan check_gain_passed = pd.NA elif np.isnan(gain_std): check_gain_passed = pd.NA else: check_gain_passed = np.abs(gain) > 2 * gain_std checks.loc[f"gain > 2*std: {sm_name}", :] = ( gain, 2 * gain_std, "(unit head)/(unit well stress)", check_gain_passed, ) # Check 5 - Parameter Bounds if check5_parambounds: upper, lower = ml._check_parameters_bounds() for param in ml.parameters.index: bounds = ( ml.parameters.loc[param, "pmin"], ml.parameters.loc[param, "pmax"], ) b = ~(upper.loc[param] or lower.loc[param]) checks.loc[f"Parameter bounds: {param}", :] = ( ml.parameters.loc[param, "optimal"], bounds, "_", b, ) df.loc[mlnam, "all_checks_passed"] = checks["check_passed"].all() df.loc[mlnam, checks.index] = checks.loc[:, "check_passed"] if csv_dir: checks.to_csv(f"{csv_dir}/checks_{ml.name}.csv", na_rep="NaN") return df
[docs] def frontiers_aic_select( pstore, modelnames: Optional[List[str]] = None, oseries: Optional[List[str]] = None, full_output: bool = False, ) -> pd.DataFrame: # pragma: no cover """Select the best model structure based on the minimum AIC. As proposed by Brakenhoff et al. 2022 [bra_2022]_. Parameters ---------- pstore : pastastore.PastaStore reference to a PastaStore modelnames : list of str list of model names (that pass reliability criteria) oseries : list of oseries list of locations for which to select models, note that this uses all models associated with a specific location. full_output : bool, optional if set to True, returns a DataFrame including all models per location and their AIC values Returns ------- pandas.DataFrame DataFrame with selected best model per location based on the AIC, or a DataFrame containing statistics for each of the models per location References ---------- .. [bra_2022] Brakenhoff, D.A., Vonk M.A., Collenteur, R.A., van Baar, M., Bakker, M.: Application of Time Series Analysis to Estimate Drawdown From Multiple Well Fields. Front. Earth Sci., 14 June 2022 doi:10.3389/feart.2022.907609 """ if modelnames is None and oseries is None: modelnames = pstore.model_names elif modelnames is None and oseries is not None: modelnames = [] for o in oseries: modelnames += pstore.oseries_models[o] elif oseries is not None: print( "Warning! Both 'modelnames' and 'oseries' provided, " "using only 'modelnames'" ) # Dataframe of models with corresponding oseries df = pstore.get_model_timeseries_names(modelnames, progressbar=False).loc[ :, ["oseries"] ] # AIC of models aic = pstore.get_statistics(["aic"], modelnames, progressbar=True) if full_output: # group models per location and obtain the AIC identify model # with lowest AIC per location collect = [] gr = df.join(aic).groupby("oseries") for o, idf in gr: idf.index.name = "modelname" idf = ( idf.sort_values("aic").reset_index().set_index(["oseries", "modelname"]) ) idf = idf.rename(columns={"aic": "AIC"}) idf["dAIC"] = idf["AIC"] - idf["AIC"].min() idf = idf.replace(0.0, np.nan) collect.append(idf) return pd.concat(collect, axis=0) else: return ( df.join(aic).groupby("oseries").idxmin().rename(columns={"aic": "min_aic"}) )