Troubleshooting¶
This tutorial steps through tecnhiques to identify errors and pipeline failures, as well as avoid common pitfalls setting up executing over multiple processes.
Things to check if Pydra gets stuck¶
I There are a number of common gotchas, related to running multi-process code, that can cause Pydra workflows to get stuck and not execute correctly. If using the concurrent futures worker (e.g. worker="cf"
), check these issues first before filing a bug report or reaching out for help.
Applying nest_asyncio
when running within a notebook¶
When using the concurrent futures worker within a Jupyter notebook you need to apply nest_asyncio
with the following lines
[1]:
# This is needed to run parallel workflows in Jupyter notebooks
import nest_asyncio
nest_asyncio.apply()
Enclosing multi-process code within if __name__ == "__main__"
¶
When running multi-process Python code on macOS or Windows, as is the case when the concurrent futures worker is selected (i.e. worker="cf"
), then scripts that execute the forking code need to be enclosed within an if __name__ == "__main__"
block, e.g.
[2]:
from pydra.tasks.testing import UnsafeDivisionWorkflow
from pydra.engine.submitter import Submitter
# This workflow will fail because we are trying to divide by 0
wf = UnsafeDivisionWorkflow(a=10, b=5, denominator=2)
if __name__ == "__main__":
with Submitter(worker="cf") as sub:
result = sub(wf)
This allows the secondary processes to import the script without executing it. Without such a block Pydra will lock up and not process the workflow. On Linux this is not an issue due to the way that processes are forked, but is good practice in any case for code portability.
Removing stray lockfiles¶
When a Pydra task is executed, a lockfile is generated to signify that the task is running. Other processes will wait for this lock to be released before attempting to access the tasks results. The lockfiles are automatically deleted after a task completes, either successfully or with an error, within a try/finally block so should run most of the time. However, if a task/workflow is terminated by an interactive debugger, the finally block may not be executed, leaving stray lockfiles. This can
cause the Pydra to hang waiting for the lock to be released. If you suspect this to be an issue, and there are no other jobs running, then simply remove all lock files from your cache directory (e.g. rm <your-run-cache-dir>/*.lock
) and re-submit your job.
If the clean_stale_locks
flag is set (by default when using the debug worker), locks that were created before the outer task was submitted are removed before the task is run. However, since these locks could be created by separate submission processes, clean_stale_locks
is not switched on by default when using production workers (e.g. cf
, slurm
, etc…).
Inspecting errors¶
Running in debug mode¶
By default, Pydra will run with the debug worker, which executes each task serially within a single process without use of async/await
blocks, to allow raised exceptions to propagate gracefully to the calling code. If you are having trouble with a pipeline, ensure that worker=debug
is passed to the submission/execution call (the default).
Reading error files¶
When a task raises an error, it is captured and saved in pickle file named _error.pklz
within task’s cache directory. For example, when calling the toy UnsafeDivisionWorkflow
with a denominator=0
, the task will fail.
[3]:
# This workflow will fail because we are trying to divide by 0
wf = UnsafeDivisionWorkflow(a=10, b=5).split(denominator=[3, 2, 0])
if __name__ == "__main__":
try:
with Submitter(worker="cf") as sub:
result = sub(wf)
except Exception as e:
print(e)
Task execution failed
Full crash report for 'Split' job is here: /home/runner/.cache/pydra/1.0a0/run-cache/workflow-48785a3059140267a09dfabcd2f1682a/_error.pklz
The error pickle files can be loaded using the cloudpickle
library, noting that it is important to use the same Python version to load the files that was used to run the Pydra workflow
[4]:
from pydra.utils.general import default_run_cache_root
import cloudpickle as cp
from pprint import pprint
from pydra.tasks.testing import Divide
with open(
default_run_cache_root / Divide(x=15, y=0)._checksum / "_error.pklz", "rb"
) as f:
error = cp.load(f)
pprint(error)
{'error message': ['Traceback (most recent call last):\n',
' File '
'"/usr/share/miniconda/lib/python3.12/site-packages/pydra/engine/job.py", '
'line 333, in run\n'
' self.task._run(self, rerun)\n',
' File '
'"/usr/share/miniconda/lib/python3.12/site-packages/pydra/compose/python.py", '
'line 237, in _run\n'
' returned = self.function(**inputs)\n'
' ^^^^^^^^^^^^^^^^^^^^^^^\n',
' File '
'"/usr/share/miniconda/lib/python3.12/site-packages/pydra/tasks/testing/__init__.py", '
'line 11, in Divide\n'
' return x / y\n'
' ~~^~~\n',
'ZeroDivisionError: float division by zero\n'],
'login name': 'runner',
'name with checksum': 'python-7e3cdc473ae1c9ffcda3f1b64579577b',
'time of crash': '20250401-010415'}
Tracing upstream issues¶
Failures are common in scientific analysis, even for well tested workflows, due to the novel nature and of scientific experiments and known artefacts that can occur. Therefore, it is always to sanity-check results produced by workflows. When a problem occurs in a multi-stage workflow it can be difficult to identify at which stage the issue occurred.
Currently in Pydra you need to step backwards through the tasks of the workflow, load the saved task object and inspect its inputs to find the preceding nodes. If any of the inputs that have been generated by previous nodes are not ok, then you should check the tasks that generated them in turn. For file-based inputs, you should be able to find the path of the preceding task’s cache directory from the provided file path. However, for non-file inputs you may need to exhaustively iterate through all the task dirs in your cache root to find the issue.
For example, in the following example workflow, if a divide by 0 occurs within the division node of the workflow, then an float('inf')
will be returned, which will then propagate through the workflow.
[5]:
from pydra.engine.submitter import Submitter
from pydra.tasks.testing import SafeDivisionWorkflow
wf = SafeDivisionWorkflow(a=10, b=5).split(denominator=[3, 2, 0])
if __name__ == "__main__":
with Submitter(worker="cf") as sub:
result = sub(wf)
print(f"Workflow completed successfully, results saved in: {result.cache_dir}")
Workflow completed successfully, results saved in: /home/runner/.cache/pydra/1.0a0/run-cache/workflow-1e2439ed0771726a4503d3b26efcba41
To find the task directory where the issue first surfaced, iterate through every task cache dir and check the results for float("inf")
s
[6]:
import cloudpickle as cp
from pydra.utils.general import user_cache_root
run_cache = user_cache_root / "run-cache"
for task_cache_root in run_cache.iterdir():
with open(task_cache_root / "_result.pklz", "rb") as f:
result = cp.load(f)
if result.outputs is not None:
for field_name in result.outputs:
if result.outputs[field_name] == float("nan"):
print(
f"Job {task_cache_root.name!r} produced a NaN value for {field_name!r}"
)