"""Module containing classes for connecting to different data stores."""
import json
import logging
import os
import warnings
from concurrent.futures import ProcessPoolExecutor, as_completed
from copy import deepcopy
from functools import partial
from pathlib import Path
# import weakref
from typing import Callable
import pandas as pd
from pastas.io.pas import PastasEncoder, pastas_hook
from pastastore._tqdm import process_map, tqdm
from pastastore.base import BaseConnector, ModelAccessor
from pastastore.typing import AllLibs, DataFrameOrSeries, TimeSeriesLibs
from pastastore.util import _custom_warning, metadata_from_json, series_from_json
from pastastore.validator import Validator
warnings.showwarning = _custom_warning
logger = logging.getLogger(__name__)
# Global connector for multiprocessing workaround
# This is required for connectors (like ArcticDBConnector) that cannot be pickled.
# The initializer function in _parallel() sets this global variable in each worker
# process, allowing unpicklable connectors to be used with multiprocessing.
# See: https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor
# Note: Using simple None type to avoid circular import issues
conn = None
class ParallelUtil:
"""Mix-in class for storing parallelizable methods."""
# Declared here so type checkers know these attributes exist on the mixin.
# Concrete subclasses (ArcticDBConnector, PasConnector) initialize them as
# plain booleans in __init__; _ensure_manager_proxies() upgrades them to
# multiprocessing.Manager proxies lazily when parallel workers are spawned.
_oseries_links_need_update: bool
_stresses_links_need_update: bool
def _ensure_manager_proxies(self) -> None:
"""Lazily initialize Manager proxies for cross-process communication.
Defers Manager creation until parallel workers are actually about to
be spawned, avoiding the Windows multiprocessing bootstrap error that
occurs when a Manager is created at module-import time (outside of a
``if __name__ == '__main__'`` block).
On Windows the "spawn" start method re-imports the calling script for
every new process. If a ``multiprocessing.Manager()`` is created
during that re-import, it tries to spawn another process, leading to
infinite recursion. By deferring the Manager creation to the moment
just before ``ProcessPoolExecutor`` is entered, we ensure it is only
ever created inside a proper ``__main__`` context.
"""
if not hasattr(self._oseries_links_need_update, "value"):
from multiprocessing import Manager
mgr = Manager()
# Keep a reference so the manager process is not garbage-collected
# while parallel workers are still running.
self._manager = mgr
current_o = bool(self._oseries_links_need_update)
current_s = bool(self._stresses_links_need_update)
self._oseries_links_need_update = mgr.Value(
"_oseries_links_need_update", current_o
)
self._stresses_links_need_update = mgr.Value(
"_stresses_links_need_update", current_s
)
@staticmethod
def _solve_model(
ml_name: str,
connector: BaseConnector | None = None,
report: bool = False,
ignore_solve_errors: bool = False,
**kwargs,
) -> None:
"""Solve a model in the store (internal method).
ml_name : list[str], optional
name of a model in the pastastore
connector : PasConnector, optional
Connector to use, by default None which gets the global ArcticDB
Connector. Otherwise parse a PasConnector.
report : boolean, optional
determines if a report is printed when the model is solved,
default is False
ignore_solve_errors : boolean, optional
if True, errors emerging from the solve method are ignored,
default is False which will raise an exception when a model
cannot be optimized
**kwargs : dictionary
arguments are passed to the solve method.
"""
if connector is not None:
_conn = connector
else:
_conn = globals()["conn"]
ml = _conn.get_models(ml_name)
m_kwargs = {}
for key, value in kwargs.items():
if isinstance(value, pd.Series):
m_kwargs[key] = value.loc[ml.name]
else:
m_kwargs[key] = value
# Convert timestamps
for tstamp in ["tmin", "tmax"]:
if tstamp in m_kwargs:
m_kwargs[tstamp] = pd.Timestamp(m_kwargs[tstamp])
try:
ml.solve(report=report, **m_kwargs)
except Exception as e: # pylint: disable=broad-except
if ignore_solve_errors:
warning = f"Solve error ignored for '{ml.name}': {e}"
logger.warning(warning)
else:
raise e
# store the updated model back in the database
_conn.add_model(ml, overwrite=True)
@staticmethod
def _get_statistics(
name: str,
statistics: list[str],
connector: None | BaseConnector = None,
**kwargs,
) -> pd.Series:
"""Get statistics for a model in the store (internal method).
This function was made to be run in parallel mode. For the odd user
that wants to run this function directly in sequential model using
an ArcticDBDConnector the connector argument must be passed in the kwargs
of the apply method.
"""
if connector is not None:
_conn = connector
else:
_conn = globals()["conn"]
ml = _conn.get_model(name)
series = pd.Series(index=statistics, dtype=float)
for stat in statistics:
# Note: ml.stats is part of pastas.Model public API
series.loc[stat] = getattr(ml.stats, stat)(**kwargs)
return series
@staticmethod
def _get_max_workers_and_chunksize(
max_workers: int, njobs: int, chunksize: int = None
) -> tuple[int, int]:
"""Get the maximum workers and chunksize for parallel processing.
From: https://stackoverflow.com/a/42096963/10596229
"""
max_workers = (
min(32, os.cpu_count() + 4) if max_workers is None else max_workers
)
if chunksize is None:
# chunksize controls how many items are batched into a single IPC
# round-trip to a worker process. Larger values reduce per-task
# overhead but coarsen load-balancing. The heuristic below (14
# chunks per worker) is from the SO answer linked in the docstring
# and works well when task durations are roughly uniform.
# Note: chunksize only applies to executor.map(); submit()-based
# dispatch (used in the progressbar path) ignores it entirely.
CHUNKS_PER_WORKER = 14
num_chunks = max_workers * CHUNKS_PER_WORKER
chunksize = max(njobs // num_chunks, 1)
return max_workers, chunksize
[docs]
class ArcticDBConnector(BaseConnector, ParallelUtil):
"""ArcticDBConnector object using ArcticDB to store data."""
_conn_type = "arcticdb"
def __init__(
self, name: str, uri: str, verbose: bool = True, worker_process: bool = False
):
"""Create an ArcticDBConnector object using ArcticDB to store data.
Parameters
----------
name : str
name of the database
uri : str
URI connection string (e.g. 'lmdb://<your path here>')
verbose : bool, optional
whether to log messages when database is initialized, by default True
worker_process : bool, optional
whether the connector is created in a worker process for parallel
processing, by default False
"""
try:
import arcticdb
except ModuleNotFoundError as e:
logger.error("Please install arcticdb with `pip install arcticdb`!")
raise e
# avoid warn on all metadata writes
from arcticdb_ext import set_config_string
set_config_string("PickledMetadata.LogLevel", "DEBUG")
super().__init__()
self.uri = uri
self.name = name
# initialize validator class to check inputs
self._validator = Validator(self)
# create libraries
self.libs: dict = {}
self.arc = arcticdb.Arctic(uri)
self._initialize(verbose=verbose)
self.models = ModelAccessor(self)
# Flags start as simple booleans; they are upgraded to Manager proxies
# lazily in _ensure_manager_proxies() when parallel processing begins.
# This avoids spawning a Manager process at import/init time, which
# breaks on Windows outside of a ``if __name__ == '__main__'`` guard.
self._oseries_links_need_update = False
self._stresses_links_need_update = False
if not worker_process:
# for older versions of PastaStore, if oseries_models library is empty
# populate oseries - models database
if (self.n_models > 0) and (
len(self.oseries_models) == 0 or len(self.stresses_models) == 0
):
self._update_time_series_model_links(recompute=False, progressbar=True)
# write pstore file to store database info that can be used to load pstore
if "lmdb" in self.uri:
self.write_pstore_config_file()
[docs]
def _initialize(self, verbose: bool = True) -> None:
"""Initialize the libraries (internal method)."""
if "lmdb" in self.uri.lower(): # only check for LMDB
self.validator.check_config_connector_type(
Path(self.uri.split("://")[1]) / self.name
)
for libname in self._default_library_names:
if self._library_name(libname) not in self.arc.list_libraries():
self.arc.create_library(self._library_name(libname))
else:
if verbose:
logger.info(
"ArcticDBConnector: library '%s' already exists. "
"Linking to existing library.",
self._library_name(libname),
)
self.libs[libname] = self._get_library(libname)
[docs]
def write_pstore_config_file(self, path: str = None) -> None:
"""Write pstore configuration file to store database info."""
# NOTE: method is not private as theoretically an ArcticDB
# database could also be hosted in the cloud, in which case,
# writing this config in the folder holding the database
# is no longer possible. For those situations, the user can
# write this config file and specify the path it should be
# written to.
config = {
"connector_type": self.conn_type,
"name": self.name,
"uri": self.uri,
}
if path is None and "lmdb" in self.uri:
path = Path(self.uri.split("://")[1])
elif path is None and "lmdb" not in self.uri:
raise ValueError("Please provide a path to write the pastastore file!")
with (path / self.name / f"{self.name}.pastastore").open(
"w",
encoding="utf-8",
) as f:
json.dump(config, f)
[docs]
def _library_name(self, libname: AllLibs) -> str:
"""Get full library name according to ArcticDB (internal method)."""
return ".".join([self.name, libname])
[docs]
def _get_library(self, libname: AllLibs):
"""Get ArcticDB library handle.
Parameters
----------
libname : str
name of the library
Returns
-------
lib : arcticdb.Library handle
handle to the library
"""
# get library handle
if libname in self.libs:
return self.libs[libname]
else:
return self.arc.get_library(self._library_name(libname))
[docs]
def _add_item(
self,
libname: AllLibs,
item: DataFrameOrSeries | dict,
name: str,
metadata: dict | None = None,
**_,
) -> None:
"""Add item to library (time series or model) (internal method).
Parameters
----------
libname : str
name of the library
item : DataFrameOrSeries | dict
item to add, either time series or pastas.Model as dictionary
name : str
name of the item
metadata : dict | None, optional
dictionary containing metadata, by default None
"""
lib = self._get_library(libname)
# check file name for illegal characters
name = self.validator.check_filename_illegal_chars(libname, name)
# only normalizable datatypes can be written with write, else use write_pickle
# normalizable: Series, DataFrames, Numpy Arrays
if isinstance(item, (dict, list)):
logger.debug(
"Writing pickled item '%s' to ArcticDB library '%s'.", name, libname
)
lib.write_pickle(name, item, metadata=metadata)
else:
logger.debug("Writing item '%s' to ArcticDB library '%s'.", name, libname)
lib.write(name, item, metadata=metadata)
[docs]
def _get_item(self, libname: AllLibs, name: str) -> DataFrameOrSeries | dict:
"""Retrieve item from library (internal method).
Parameters
----------
libname : str
name of the library
name : str
name of the item
Returns
-------
item : DataFrameOrSeries | dict
time series or model dictionary
"""
lib = self._get_library(libname)
return lib.read(name).data
[docs]
def _del_item(self, libname: AllLibs, name: str, force: bool = False) -> None:
"""Delete items (series or models) (internal method).
Parameters
----------
libname : str
name of library to delete item from
name : str
name of item to delete
force : bool, optional
force deletion even if series is used in models, by default False
"""
lib = self._get_library(libname)
if self.validator.PROTECT_SERIES_IN_MODELS and not force:
self.validator.check_series_in_models(libname, name)
lib.delete(name)
[docs]
def _parallel(
self,
func: Callable,
names: list[str],
kwargs: dict | None = None,
progressbar: bool | None = True,
max_workers: int | None = None,
chunksize: int | None = None,
desc: str = "",
initializer: Callable | None = None,
initargs: tuple | None = None,
):
"""Parallel processing of function.
.. warning::
When ``progressbar=True``, tasks are dispatched with
``submit()`` + ``as_completed()``, so results are returned in
**completion order**, not submission order. When ``progressbar=False``,
``executor.map()`` is used and order is preserved. If your caller
needs results aligned to ``names``, sort the returned list by name
after the call.
Note
----
ArcticDB connection objects cannot be pickled, which is required for
multiprocessing. This implementation uses an initializer function that
creates a new ArcticDBConnector instance in each worker process and stores
it in the global `conn` variable. User-provided functions can access this
connector via the global `conn` variable.
This is the standard Python multiprocessing pattern for unpicklable objects.
See: https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor
For a connector that supports direct method passing (no global variable
required), use PasConnector instead.
Parameters
----------
func : function
function to apply in parallel
names : list
list of names to apply function to
kwargs : dict, optional
keyword arguments to pass to function
progressbar : bool, optional
show progressbar, by default True
max_workers : int, optional
maximum number of workers, by default None
chunksize : int, optional
chunksize for parallel processing, by default None
desc : str, optional
description for progressbar, by default ""
initializer : Callable, optional
function to initialize each worker process, by default None
initargs : tuple, optional
arguments to pass to initializer function, by default None
"""
# Upgrade boolean flags to Manager proxies before spawning workers so
# that state changes in child processes are visible in the main process.
self._ensure_manager_proxies()
max_workers, chunksize = self._get_max_workers_and_chunksize(
max_workers, len(names), chunksize
)
if initializer is None:
def initializer(*args):
# assign to module-level variable without using 'global' statement
globals()["conn"] = ArcticDBConnector(*args, worker_process=True)
initargs = (self.name, self.uri, False)
if initargs is None:
initargs = ()
if kwargs is None:
kwargs = {}
if progressbar:
result = []
with tqdm(total=len(names), desc=desc) as pbar:
with ProcessPoolExecutor(
max_workers=max_workers, initializer=initializer, initargs=initargs
) as executor:
futures = [
executor.submit(partial(func, **kwargs), name) for name in names
]
for future in as_completed(futures):
result.append(future.result())
pbar.update()
else:
with ProcessPoolExecutor(
max_workers=max_workers, initializer=initializer, initargs=initargs
) as executor:
result = list(
executor.map(partial(func, **kwargs), names, chunksize=chunksize)
)
# update links if models were stored
self._trigger_links_update_if_needed(modelnames=names)
return result
[docs]
def _list_symbols(self, libname: AllLibs) -> list[str]:
"""List symbols in a library (internal method).
Parameters
----------
libname : str
name of the library
Returns
-------
list
list of symbols in the library
"""
return self._get_library(libname).list_symbols()
[docs]
def _item_exists(self, libname: AllLibs, name: str) -> bool:
"""Check if item exists without scanning directory."""
lib = self._get_library(libname)
return lib.has_symbol(name)
[docs]
class DictConnector(BaseConnector, ParallelUtil):
"""DictConnector object that stores timeseries and models in dictionaries."""
_conn_type = "dict"
def __init__(self, name: str = "pastas_db"):
"""Create DictConnector object that stores data in dictionaries.
Parameters
----------
name : str, optional
user-specified name of the connector
"""
super().__init__()
self.name = name
# create empty dictionaries for series and models
for val in self._default_library_names:
setattr(self, "lib_" + val, {})
self._validator = Validator(self)
self.models = ModelAccessor(self)
# for older versions of PastaStore, if oseries_models library is empty
# populate oseries - models database
if (self.n_models > 0) and (
len(self.oseries_models) == 0 or len(self.stresses_models) == 0
):
self._update_time_series_model_links(recompute=False, progressbar=True)
# delayed update flags
self._oseries_links_need_update = False
self._stresses_links_need_update = False
[docs]
def _get_library(self, libname: AllLibs):
"""Get reference to dictionary holding data.
Parameters
----------
libname : str
name of the library
Returns
-------
lib : dict
library handle
"""
return getattr(self, f"lib_{libname}")
[docs]
def _add_item(
self,
libname: str,
item: DataFrameOrSeries | dict,
name: str,
metadata: dict | None = None,
**_,
) -> None:
"""Add item (time series or models) (internal method).
Parameters
----------
libname : str
name of library
item : DataFrameOrSeries
pandas.Series or pandas.DataFrame containing data
name : str
name of the item
metadata : dict, optional
dictionary containing metadata, by default None
"""
lib = self._get_library(libname)
# check file name for illegal characters
name = self.validator.check_filename_illegal_chars(libname, name)
if libname in ["models", "oseries_models", "stresses_models"]:
lib[name] = item
else:
lib[name] = (metadata, item)
[docs]
def _get_item(self, libname: AllLibs, name: str) -> DataFrameOrSeries | dict:
"""Retrieve item from database (internal method).
Parameters
----------
libname : str
name of the library
name : str
name of the item
Returns
-------
item : DataFrameOrSeries | dict
time series or model dictionary, modifying the returned object will not
affect the stored data, like in a real database
"""
lib = self._get_library(libname)
# deepcopy calls are needed to ensure users cannot change "stored" items
if libname in ["models", "oseries_models", "stresses_models"]:
item = deepcopy(lib[name])
else:
item = deepcopy(lib[name][1])
return item
[docs]
def _del_item(self, libname: AllLibs, name: str, force: bool = False) -> None:
"""Delete items (series or models) (internal method).
Parameters
----------
libname : str
name of library to delete item from
name : str
name of item to delete
force : bool, optional
if True, force delete item and do not perform check if series
is used in a model, by default False
"""
if self.validator.PROTECT_SERIES_IN_MODELS and not force:
self.validator.check_series_in_models(libname, name)
lib = self._get_library(libname)
_ = lib.pop(name)
[docs]
def _parallel(self, *args, **kwargs) -> None:
"""Parallel implementation method.
Raises
------
NotImplementedError
DictConnector uses in-memory storage that cannot be shared across
processes. Use PasConnector or ArcticDBConnector for parallel operations.
"""
raise NotImplementedError(
"DictConnector does not support parallel processing,"
" use PasConnector or ArcticDBConnector."
)
[docs]
def _list_symbols(self, libname: AllLibs) -> list[str]:
"""List symbols in a library (internal method).
Parameters
----------
libname : str
name of the library
Returns
-------
list
list of symbols in the library
"""
lib = self._get_library(libname)
return list(lib.keys())
[docs]
def _item_exists(self, libname: str, name: str) -> bool:
"""Check if item exists without scanning directory."""
lib = self._get_library(libname)
return name in lib
[docs]
class PasConnector(BaseConnector, ParallelUtil):
"""PasConnector object that stores time series and models as JSON files on disk."""
_conn_type = "pas"
def __init__(self, name: str, path: str, verbose: bool = True):
"""Create PasConnector object that stores data as JSON files on disk.
Uses Pastas export format (pas-files) to store files.
Parameters
----------
name : str
user-specified name of the connector, this will be the name of the
directory in which the data will be stored
path : str
path to directory for storing the data
verbose : bool, optional
whether to print message when database is initialized, by default True
"""
# set shared memory flags for parallel processing
super().__init__()
self.name = name
self.parentdir = Path(path)
self.path = (self.parentdir / self.name).absolute()
self.relpath = os.path.relpath(self.parentdir)
self._validator = Validator(self)
self._initialize(verbose=verbose)
self.models = ModelAccessor(self)
# Flags start as simple booleans; they are upgraded to Manager proxies
# lazily in _ensure_manager_proxies() when parallel processing begins.
# This avoids spawning a Manager process at import/init time, which
# breaks on Windows outside of a ``if __name__ == '__main__'`` guard.
self._oseries_links_need_update = False
self._stresses_links_need_update = False
# for older versions of PastaStore, if oseries_models library is empty
# populate oseries_models library
if (self.n_models > 0) and (
len(self.oseries_models) == 0 or len(self.stresses_models) == 0
):
self._update_time_series_model_links(recompute=False, progressbar=True)
# write pstore file to store database info that can be used to load pstore
self._write_pstore_config_file()
[docs]
def _initialize(self, verbose: bool = True) -> None:
"""Initialize the libraries (internal method)."""
self.validator.check_config_connector_type(self.path)
for val in self._default_library_names:
libdir = self.path / val
if not libdir.exists():
if verbose:
logger.info(
"PasConnector: library '%s' created in '%s'", val, libdir
)
libdir.mkdir(parents=True, exist_ok=False)
else:
if verbose:
logger.info(
"PasConnector: library '%s' already exists. "
"Linking to existing directory: '%s'",
val,
libdir,
)
setattr(self, f"lib_{val}", self.path / val)
[docs]
def _write_pstore_config_file(self):
"""Write pstore configuration file to store database info."""
config = {
"connector_type": self.conn_type,
"name": self.name,
"path": str(self.parentdir.absolute()),
}
with (self.path / f"{self.name}.pastastore").open("w", encoding="utf-8") as f:
json.dump(config, f)
[docs]
def _get_library(self, libname: AllLibs) -> Path:
"""Get path to directory holding data.
Parameters
----------
libname : str
name of the library
Returns
-------
lib : str
path to library
"""
return Path(getattr(self, "lib_" + libname))
[docs]
def _add_item(
self,
libname: str,
item: DataFrameOrSeries | dict,
name: str,
metadata: dict | None = None,
**_,
) -> None:
"""Add item (time series or models) (internal method).
Parameters
----------
libname : str
name of library
item : DataFrameOrSeries
pandas.Series or pandas.DataFrame containing data
name : str
name of the item
metadata : dict, optional
dictionary containing metadata, by default None
"""
lib = self._get_library(libname)
# check file name for illegal characters
name = self.validator.check_filename_illegal_chars(libname, name)
# time series
if isinstance(item, pd.Series):
item = item.to_frame()
if isinstance(item, pd.DataFrame):
if type(item) is pd.DataFrame:
sjson = item.to_json(orient="columns")
else:
# workaround for subclasses of DataFrame that override to_json,
# looking at you hydropandas...
sjson = pd.DataFrame(item).to_json(orient="columns")
if name.endswith("_meta"):
raise ValueError(
"Time series name cannot end with '_meta'. "
"Please use a different name for your time series."
)
fname = lib / f"{name}.pas"
with fname.open("w", encoding="utf-8") as f:
logger.debug("Writing time series '%s' to disk at '%s'.", name, fname)
f.write(sjson)
if metadata is not None:
mjson = json.dumps(metadata, cls=PastasEncoder, indent=4)
fname_meta = lib / f"{name}_meta.pas"
with fname_meta.open("w", encoding="utf-8") as m:
logger.debug(
"Writing metadata '%s' to disk at '%s'.", name, fname_meta
)
m.write(mjson)
# pastas model dict
elif isinstance(item, dict):
jsondict = json.dumps(item, cls=PastasEncoder, indent=4)
fmodel = lib / f"{name}.pas"
with fmodel.open("w", encoding="utf-8") as fm:
logger.debug("Writing model '%s' to disk at '%s'.", name, fmodel)
fm.write(jsondict)
# oseries_models or stresses_models list
elif isinstance(item, list):
jsondict = json.dumps(item)
fname = lib / f"{name}.pas"
with fname.open("w", encoding="utf-8") as fm:
logger.debug("Writing link list '%s' to disk at '%s'.", name, fname)
fm.write(jsondict)
[docs]
def _get_item(self, libname: AllLibs, name: str) -> DataFrameOrSeries | dict:
"""Retrieve item (internal method).
Parameters
----------
libname : str
name of the library
name : str
name of the item
Returns
-------
item : DataFrameOrSeries | dict
time series or model dictionary
"""
lib = self._get_library(libname)
fjson = lib / f"{name}.pas"
if not fjson.exists():
msg = f"Item '{name}' not in '{libname}' library."
raise FileNotFoundError(msg)
# model
if libname == "models":
with fjson.open("r", encoding="utf-8") as ml_json:
item = json.load(ml_json, object_hook=pastas_hook)
# list of models per oseries
elif libname in ["oseries_models", "stresses_models"]:
with fjson.open("r", encoding="utf-8") as f:
item = json.load(f)
# time series
else:
item = series_from_json(fjson)
return item
[docs]
def _del_item(self, libname: AllLibs, name: str, force: bool = False) -> None:
"""Delete items (series or models) (internal method).
Parameters
----------
libname : str
name of library to delete item from
name : str
name of item to delete
force : bool, optional
if True, force delete item and do not perform check if series
is used in a model, by default False
"""
lib = self._get_library(libname)
if self.validator.PROTECT_SERIES_IN_MODELS and not force:
self.validator.check_series_in_models(libname, name)
(lib / f"{name}.pas").unlink()
# remove metadata for time series
if libname in ["oseries", "stresses"]:
try:
(lib / f"{name}_meta.pas").unlink()
except FileNotFoundError:
# Nothing to delete
pass
[docs]
def _parallel(
self,
func: Callable,
names: list[str],
kwargs: dict | None = None,
progressbar: bool | None = True,
max_workers: int | None = None,
chunksize: int | None = None,
desc: str = "",
initializer: Callable = None,
initargs: tuple | None = None,
):
"""Parallel processing of function.
Does not return results, so function must store results in database.
.. warning::
When ``progressbar=True``, tasks are dispatched with
``submit()`` + ``as_completed()``, so results are returned in
**completion order**, not submission order. When ``progressbar=False``,
``executor.map()`` is used and order is preserved. If your caller
needs results aligned to ``names``, sort the returned list by name
after the call.
Parameters
----------
func : function
function to apply in parallel
names : list
list of names to apply function to
progressbar : bool, optional
show progressbar, by default True
max_workers : int, optional
maximum number of workers, by default None
chunksize : int, optional
chunksize for parallel processing, by default None
desc : str, optional
description for progressbar, by default ""
initializer : Callable, optional
function to initialize each worker process, by default None
initargs : tuple, optional
arguments to pass to initializer function, by default None
"""
# Upgrade boolean flags to Manager proxies before spawning workers so
# that state changes in child processes are visible in the main process.
self._ensure_manager_proxies()
max_workers, chunksize = self._get_max_workers_and_chunksize(
max_workers, len(names), chunksize
)
if kwargs is None:
kwargs = {}
if progressbar:
if initializer is not None:
result = []
with tqdm(total=len(names), desc=desc) as pbar:
with ProcessPoolExecutor(
max_workers=max_workers,
initializer=initializer,
initargs=initargs,
) as executor:
futures = [
executor.submit(partial(func, **kwargs), name)
for name in names
]
for future in as_completed(futures):
result.append(future.result())
pbar.update()
else:
result = process_map(
partial(func, **kwargs),
names,
max_workers=max_workers,
chunksize=chunksize,
desc=desc,
total=len(names),
)
else:
with ProcessPoolExecutor(max_workers=max_workers) as executor:
result = list(
executor.map(partial(func, **kwargs), names, chunksize=chunksize)
)
# update links if models were stored
self._trigger_links_update_if_needed(modelnames=names)
return result
[docs]
def _list_symbols(self, libname: AllLibs) -> list[str]:
"""List symbols in a library (internal method).
Parameters
----------
libname : str
name of the library
Returns
-------
list
list of symbols in the library
"""
lib = self._get_library(libname)
return [i.stem for i in lib.glob("*.pas") if not i.stem.endswith("_meta")]
[docs]
def _item_exists(self, libname: str, name: str) -> bool:
"""Check if item exists without scanning directory."""
lib = self._get_library(libname)
path = lib / f"{name}.pas"
return path.exists()