Parallel processing with Pastastore

This notebook shows parallel processing capabilities of PastaStore.

Note

Parallel processing is platform dependent and may not always work. The current implementation works well for Linux users, though this will likely change with Python 3.14 and higher. For Windows users, parallel solving does not work when called directly from Jupyter Notebooks or IPython. To use parallel solving on Windows, the following code should be used in a Python file.

from multiprocessing import freeze_support

if __name__ == "__main__":
    freeze_support()
    pstore.apply("models", some_func, parallel=True)
import pastas as ps

import pastastore as pst
from pastastore.datasets import example_pastastore

ps.set_log_level("ERROR")  # silence Pastas logger for this notebook
pst.get_color_logger("INFO", "pastastore")
pst.show_versions()
Pastastore version : 1.12.0

Python version     : 3.13.11
Pandas version     : 2.3.3
Matplotlib version : 3.10.8
Pastas version     : 1.12.0
PyYAML version     : 6.0.3

Example pastastore

Load some example data, create models and solve them to showcase parallel processing.

# get the example pastastore
conn = pst.PasConnector("my_connector", "./temp")
# conn = pst.ArcticDBConnector("my_connector", "lmdb://./temp")
pstore = example_pastastore(conn)
pstore.create_models_bulk();
PasConnector: library 'oseries' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/oseries'
PasConnector: library 'stresses' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/stresses'
PasConnector: library 'models' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/models'
PasConnector: library 'oseries_models' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/oseries_models'
PasConnector: library 'stresses_models' created in '/home/david/github/pastastore/examples/notebooks/temp/my_connector/stresses_models'

Solving models

The PastaStore.solve_models() method supports parallel processing.

pstore.solve_models(parallel=True)

Parallel processing using .apply()

Define some function that takes a name as input and returns some result. In this case, return the \(R^2\) value for each model.

def rsq(model_name: str) -> float:
    """Compute the R-squared value of a Pastas model."""
    ml = pstore.get_models(model_name)
    return ml.stats.rsq()

We can apply this function to all models in the pastastore using pstore.apply(). By default this function is run sequentially.

pstore.apply("models", rsq, progressbar=True)
head_nb5    0.438129
oseries2    0.931883
head_mw     0.159318
oseries1    0.904487
oseries3    0.030468
dtype: float64

In order to run this function in parallel, set parallel=True in the keyword arguments.

pstore.apply("models", rsq, progressbar=True, parallel=True)
head_nb5    0.438129
oseries2    0.931883
head_mw     0.159318
oseries1    0.904487
oseries3    0.030468
dtype: float64

Get model statistics

The function pstore.get_statistics also supports parallel processing.

pstore.get_statistics(["rsq", "mae"])
rsq mae
head_nb5 0.438129 0.318361
oseries2 0.931883 0.087067
head_mw 0.159318 0.631517
oseries1 0.904487 0.091329
oseries3 0.030468 0.106254
pstore.get_statistics(["rsq", "mae"], parallel=True)
rsq mae
_get_statistics
head_nb5 0.438129 0.318361
oseries2 0.931883 0.087067
head_mw 0.159318 0.631517
oseries1 0.904487 0.091329
oseries3 0.030468 0.106254

Compute prediction intervals

Let’s try using a more complex function and passing that to apply to use parallel processing. In this case we want to compute the prediction interval, and pass along the \(\alpha\) value via the keyword arguments.

def prediction_interval(model_name, **kwargs):
    """Compute the prediction interval for a Pastas model."""
    ml = pstore.get_models(model_name)
    return ml.solver.prediction_interval(**kwargs)
pstore.apply("models", prediction_interval, kwargs={"alpha": 0.05})
head_nb5 oseries2 head_mw oseries1 oseries3
0.025 0.975 0.025 0.975 0.025 0.975 0.025 0.975 0.025 0.975
1960-04-29 NaN NaN NaN NaN 6.300204 9.406311 NaN NaN NaN NaN
1960-04-30 NaN NaN NaN NaN 6.340113 9.291056 NaN NaN NaN NaN
1960-05-01 NaN NaN NaN NaN 6.197963 9.459711 NaN NaN NaN NaN
1960-05-02 NaN NaN NaN NaN 6.256496 9.436107 NaN NaN NaN NaN
1960-05-03 NaN NaN NaN NaN 6.161201 9.336737 NaN NaN NaN NaN
... ... ... ... ... ... ... ... ... ... ...
2020-01-17 7.961349 9.682071 NaN NaN NaN NaN NaN NaN NaN NaN
2020-01-18 7.975718 9.576222 NaN NaN NaN NaN NaN NaN NaN NaN
2020-01-19 7.914376 9.647849 NaN NaN NaN NaN NaN NaN NaN NaN
2020-01-20 7.914663 9.710748 NaN NaN NaN NaN NaN NaN NaN NaN
2020-01-21 7.947020 9.636971 NaN NaN NaN NaN NaN NaN NaN NaN

21817 rows × 10 columns

pstore.apply("models", prediction_interval, kwargs={"alpha": 0.05}, parallel=True)
head_nb5 oseries2 head_mw oseries1 oseries3
0.025 0.975 0.025 0.975 0.025 0.975 0.025 0.975 0.025 0.975
1960-04-29 NaN NaN NaN NaN 6.318763 9.625481 NaN NaN NaN NaN
1960-04-30 NaN NaN NaN NaN 6.157354 9.441837 NaN NaN NaN NaN
1960-05-01 NaN NaN NaN NaN 6.178571 9.421986 NaN NaN NaN NaN
1960-05-02 NaN NaN NaN NaN 6.243335 9.478925 NaN NaN NaN NaN
1960-05-03 NaN NaN NaN NaN 6.237774 9.313179 NaN NaN NaN NaN
... ... ... ... ... ... ... ... ... ... ...
2020-01-17 7.860393 9.630678 NaN NaN NaN NaN NaN NaN NaN NaN
2020-01-18 7.874968 9.671061 NaN NaN NaN NaN NaN NaN NaN NaN
2020-01-19 7.913059 9.650184 NaN NaN NaN NaN NaN NaN NaN NaN
2020-01-20 7.956231 9.667919 NaN NaN NaN NaN NaN NaN NaN NaN
2020-01-21 7.927083 9.712015 NaN NaN NaN NaN NaN NaN NaN NaN

21817 rows × 10 columns

Get signatures

The function pstore.get_signatures does not explicitly support parallel processing but can be used in combination with pstore.apply

signatures = [
    "cv_period_mean",
    "cv_date_min",
    "cv_date_max",
    "cv_fall_rate",
    "cv_rise_rate",
]
pstore.get_signatures(signatures=signatures)
head_nb5 oseries2 head_mw oseries1 oseries3
cv_period_mean 0.061879 0.015199 0.145062 0.013066 0.029168
cv_date_min 0.246021 0.128636 0.254627 0.145884 1.394852
cv_date_max 1.262425 0.722945 1.083929 0.300328 0.444442
cv_fall_rate -1.136450 -0.722718 -1.430200 -0.744797 -1.032837
cv_rise_rate 1.259450 0.836678 1.097257 0.862981 0.931181
pstore.apply(
    "oseries", pstore.get_signatures, kwargs={"signatures": signatures}, parallel=True
)
get_signatures head_nb5 oseries2 head_mw oseries1 oseries3
cv_period_mean 0.061879 0.015199 0.145062 0.013066 0.029168
cv_date_min 0.246021 0.128636 0.254627 0.145884 1.394852
cv_date_max 1.262425 0.722945 1.083929 0.300328 0.444442
cv_fall_rate -1.136450 -0.722718 -1.430200 -0.744797 -1.032837
cv_rise_rate 1.259450 0.836678 1.097257 0.862981 0.931181

Load models

Load models in parallel.

pstore.apply("models", pstore.get_models, fancy_output=True)
{'head_nb5': Model(oseries=head_nb5, name=head_nb5, constant=True, noisemodel=False),
 'oseries2': Model(oseries=oseries2, name=oseries2, constant=True, noisemodel=False),
 'head_mw': Model(oseries=head_mw, name=head_mw, constant=True, noisemodel=False),
 'oseries1': Model(oseries=oseries1, name=oseries1, constant=True, noisemodel=False),
 'oseries3': Model(oseries=oseries3, name=oseries3, constant=True, noisemodel=False)}

The max_workers keyword argument sets the number of workers that are spawned. The default value is often fine, but it can be set explicitly.

The following works for PasConnector. See alternative code below for ArcticDBConnector.

pstore.apply(
    "models", pstore.get_models, fancy_output=True, parallel=True, max_workers=5
)
{'head_nb5': Model(oseries=head_nb5, name=head_nb5, constant=True, noisemodel=False),
 'oseries2': Model(oseries=oseries2, name=oseries2, constant=True, noisemodel=False),
 'head_mw': Model(oseries=head_mw, name=head_mw, constant=True, noisemodel=False),
 'oseries1': Model(oseries=oseries1, name=oseries1, constant=True, noisemodel=False),
 'oseries3': Model(oseries=oseries3, name=oseries3, constant=True, noisemodel=False)}

Storing models in parallel

Note

This section is mostly for the developer so he doesn’t forget why and how delayed updating of the model links was implemented.

We want to build and solve our time series models in 2 steps, first without a noise model and then with a noise model, and then store the result. We empty the models library to start from scratch.

pstore.empty_library("models", prompt=False, progressbar=False)
Emptied library models in my_connector: <class 'pastastore.connectors.PasConnector'>
Emptied library oseries_models in my_connector: <class 'pastastore.connectors.PasConnector'>
Emptied library stresses_models in my_connector: <class 'pastastore.connectors.PasConnector'>
def two_step_solve(name):
    """Solve a Pastas model in two steps: first without noise model, then with."""
    ml = pstore.create_model(name)
    ml.solve(report=False)
    ml.add_noisemodel(ps.ArNoiseModel())
    ml.solve(initial=False, report=False)
    pstore.add_model(ml, overwrite=True)

In the first example we apply the function in parallel. A separate recomputation is performed after the parallel apply to update the links between the time series names and the models.

The conn._added_models keeps track of added models so that the model links can be updated after all models have been added. In parallel mode, the child processes do not have access to this variable in the main thread, meaning it is not updated.

pstore.conn._added_models
[]
pstore.apply("oseries", two_step_solve, parallel=True, max_workers=2)

As expected, the list remains empty:

pstore.conn._added_models
[]

The parallel apply automatically updates the model links libraries, so the update flags should be equal to False.

# check if update flags were reset after adding models links after parallel apply
print(f"{pstore.conn._oseries_links_need_update.value = }")
print(f"{pstore.conn._stresses_links_need_update.value = }")
pstore.conn._oseries_links_need_update.value = False
pstore.conn._stresses_links_need_update.value = False

Let’s check the oseries_models result:

pstore.oseries_models
{'head_nb5': ['head_nb5'],
 'oseries2': ['oseries2'],
 'head_mw': ['head_mw'],
 'oseries1': ['oseries1'],
 'oseries3': ['oseries3']}

Now we repeat the process with parallel=False. Now the _added_models attribute can be updated properly since there is only the main instance of PastaStore.

Once again, we empty the models library to start fresh.

pstore.empty_library("models", prompt=False, progressbar=False)
Emptied library models in my_connector: <class 'pastastore.connectors.PasConnector'>
Emptied library oseries_models in my_connector: <class 'pastastore.connectors.PasConnector'>
Emptied library stresses_models in my_connector: <class 'pastastore.connectors.PasConnector'>
pstore.apply("oseries", two_step_solve, parallel=False)

The _added_models attribute should now contain the names of all 5 models.

pstore.conn._added_models
['head_nb5', 'oseries2', 'head_mw', 'oseries1', 'oseries3']

The update flags should be set to True, which should trigger an update once we try to access the out-of-date data.

# check if update flags were reset after adding models links after parallel apply
print(f"{pstore.conn._oseries_links_need_update.value = }")
print(f"{pstore.conn._stresses_links_need_update.value = }")
pstore.conn._oseries_links_need_update.value = True
pstore.conn._stresses_links_need_update.value = True

Now let’s trigger the update by looking at oseries_models. This will update the database, empty the _added_models attribute and set the update flags to False.

pstore.oseries_models
{'head_nb5': ['head_nb5'],
 'oseries2': ['oseries2'],
 'head_mw': ['head_mw'],
 'oseries1': ['oseries1'],
 'oseries3': ['oseries3']}
pstore.conn._added_models
[]
# check if update flags were reset after adding models links after parallel apply
print(f"{pstore.conn._oseries_links_need_update.value = }")
print(f"{pstore.conn._stresses_links_need_update.value = }")
pstore.conn._oseries_links_need_update.value = False
pstore.conn._stresses_links_need_update.value = False

ArcticDBConnector workaround

For ArcticDBConnector, the underlying database connection objects cannot be pickled, which is required for Python’s multiprocessing. Therefore, passing methods directly from the PastaStore or ArcticDBConnector classes will not work in parallel mode.

The workaround: The _parallel() method uses an initializer that creates a new connector instance in each worker process and stores it in a global conn variable. Your custom functions can then access this connector to retrieve data from the database.

This is the standard Python pattern for using unpicklable objects with multiprocessing. See the Python documentation for more details.

Note: If you need access to methods from the PastaStore class, just create a new one by passing it the global connector: PastaStore(conn).

Example: Write a simple function that uses the global conn variable to access the database:

# Simple function to get models from database
def get_model(model_name):
    """Get model using global connector (ArcticDBConnector workaround).

    The global 'conn' variable is set by the _parallel() initializer
    in each worker process, providing access to an ArcticDBConnector instance.
    """
    pstore = pst.PastaStore(conn)
    return pstore.get_model(model_name)
pstore.apply("models", get_model, fancy_output=True, parallel=True, max_workers=5)
{'head_nb5': Model(oseries=head_nb5, name=head_nb5, constant=True, noisemodel=True),
 'oseries2': Model(oseries=oseries2, name=oseries2, constant=True, noisemodel=True),
 'head_mw': Model(oseries=head_mw, name=head_mw, constant=True, noisemodel=True),
 'oseries1': Model(oseries=oseries1, name=oseries1, constant=True, noisemodel=True),
 'oseries3': Model(oseries=oseries3, name=oseries3, constant=True, noisemodel=True)}

Clean up

Clean up temporary pastastore.

pst.util.delete_pastastore(pstore)
Deleting PasConnector database: 'my_connector' ... 
Done!