{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Troubleshooting\n", "\n", "This tutorial steps through tecnhiques to identify errors and pipeline failures, as well\n", "as avoid common pitfalls setting up executing over multiple processes." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## Things to check if Pydra gets stuck\n", "\n", "I There are a number of common gotchas, related to running multi-process code, that can\n", "cause Pydra workflows to get stuck and not execute correctly. If using the concurrent\n", "futures worker (e.g. `worker=\"cf\"`), check these issues first before filing a bug report\n", "or reaching out for help.\n", "\n", "### Applying `nest_asyncio` when running within a notebook\n", "\n", "When using the concurrent futures worker within a Jupyter notebook you need to apply\n", "`nest_asyncio` with the following lines" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# This is needed to run parallel workflows in Jupyter notebooks\n", "import nest_asyncio\n", "\n", "nest_asyncio.apply()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Enclosing multi-process code within `if __name__ == \"__main__\"`\n", "\n", "When running multi-process Python code on macOS or Windows, as is the case when the \n", "concurrent futures worker is selected (i.e. `worker=\"cf\"`), then scripts that execute\n", "the forking code need to be enclosed within an `if __name__ == \"__main__\"` block, e.g." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.tasks.testing import UnsafeDivisionWorkflow\n", "from pydra.engine.submitter import Submitter\n", "\n", "# This workflow will fail because we are trying to divide by 0\n", "wf = UnsafeDivisionWorkflow(a=10, b=5, denominator=2)\n", "\n", "if __name__ == \"__main__\":\n", " with Submitter(worker=\"cf\") as sub:\n", " result = sub(wf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This allows the secondary processes to import the script without executing it. Without\n", "such a block Pydra will lock up and not process the workflow. On Linux this is not an\n", "issue due to the way that processes are forked, but is good practice in any case for\n", "code portability." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Removing stray lockfiles\n", "\n", "When a Pydra task is executed, a lockfile is generated to signify that the task is running.\n", "Other processes will wait for this lock to be released before attempting to access the\n", "tasks results. The lockfiles are automatically deleted after a task completes, either\n", "successfully or with an error, within a *try/finally* block so should run most of the time.\n", "However, if a task/workflow is terminated by an interactive\n", "debugger, the finally block may not be executed, leaving stray lockfiles. This\n", "can cause the Pydra to hang waiting for the lock to be released. If you suspect this to be\n", "an issue, and there are no other jobs running, then simply remove all lock files from your\n", "cache directory (e.g. `rm /*.lock`) and re-submit your job.\n", "\n", "If the `clean_stale_locks` flag is set (by default when using the *debug* worker), locks that\n", "were created before the outer task was submitted are removed before the task is run.\n", "However, since these locks could be created by separate submission processes, `clean_stale_locks`\n", "is not switched on by default when using production workers (e.g. `cf`, `slurm`, etc...)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Inspecting errors\n", "\n", "### Running in *debug* mode\n", "\n", "By default, Pydra will run with the *debug* worker, which executes each task serially\n", "within a single process without use of `async/await` blocks, to allow raised exceptions\n", "to propagate gracefully to the calling code. If you are having trouble with a pipeline,\n", "ensure that `worker=debug` is passed to the submission/execution call (the default).\n", "\n", "### Reading error files\n", "\n", "When a task raises an error, it is captured and saved in pickle file named `_error.pklz`\n", "within task's cache directory. For example, when calling the toy `UnsafeDivisionWorkflow`\n", "with a `denominator=0`, the task will fail." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# This workflow will fail because we are trying to divide by 0\n", "wf = UnsafeDivisionWorkflow(a=10, b=5).split(denominator=[3, 2, 0])\n", "\n", "if __name__ == \"__main__\":\n", " try:\n", " with Submitter(worker=\"cf\") as sub:\n", " result = sub(wf)\n", " except Exception as e:\n", " print(e)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The error pickle files can be loaded using the `cloudpickle` library, noting that it is\n", "important to use the same Python version to load the files that was used to run the Pydra\n", "workflow" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.utils.general import default_run_cache_root\n", "import cloudpickle as cp\n", "from pprint import pprint\n", "from pydra.tasks.testing import Divide\n", "\n", "with open(\n", " default_run_cache_root / Divide(x=15, y=0)._checksum / \"_error.pklz\", \"rb\"\n", ") as f:\n", " error = cp.load(f)\n", "\n", "pprint(error)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Tracing upstream issues\n", "\n", "Failures are common in scientific analysis, even for well tested workflows, due to\n", "the novel nature and of scientific experiments and known artefacts that can occur.\n", "Therefore, it is always to sanity-check results produced by workflows. When a problem\n", "occurs in a multi-stage workflow it can be difficult to identify at which stage the\n", "issue occurred.\n", "\n", "Currently in Pydra you need to step backwards through the tasks of the workflow, load\n", "the saved task object and inspect its inputs to find the preceding nodes. If any of the\n", "inputs that have been generated by previous nodes are not ok, then you should check the\n", "tasks that generated them in turn. For file-based inputs, you should be able to find\n", "the path of the preceding task's cache directory from the provided file path. However,\n", "for non-file inputs you may need to exhaustively iterate through all the task dirs\n", "in your cache root to find the issue.\n", "\n", "For example, in the following example workflow, if a divide by 0 occurs within the division\n", "node of the workflow, then an `float('inf')` will be returned, which will then propagate\n", "through the workflow." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.engine.submitter import Submitter\n", "from pydra.tasks.testing import SafeDivisionWorkflow\n", "\n", "wf = SafeDivisionWorkflow(a=10, b=5).split(denominator=[3, 2, 0])\n", "\n", "if __name__ == \"__main__\":\n", " with Submitter(worker=\"cf\") as sub:\n", " result = sub(wf)\n", "\n", "print(f\"Workflow completed successfully, results saved in: {result.cache_dir}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To find the task directory where the issue first surfaced, iterate through every task\n", "cache dir and check the results for `float(\"inf\")`s" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import cloudpickle as cp\n", "from pydra.utils.general import user_cache_root\n", "\n", "run_cache = user_cache_root / \"run-cache\"\n", "\n", "for task_cache_root in run_cache.iterdir():\n", " with open(task_cache_root / \"_result.pklz\", \"rb\") as f:\n", " result = cp.load(f)\n", " if result.outputs is not None:\n", " for field_name in result.outputs:\n", " if result.outputs[field_name] == float(\"nan\"):\n", " print(\n", " f\"Job {task_cache_root.name!r} produced a NaN value for {field_name!r}\"\n", " )" ] } ], "metadata": { "kernelspec": { "display_name": "wf13", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.1" } }, "nbformat": 4, "nbformat_minor": 2 }