{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Advanced execution\n", "\n", "One of the key design features of Pydra is the separation between the parameterisation of\n", "the task to be executed, and the parameresiation of where and how the task should be\n", "executed (e.g. on the cloud, on a HPC cluster, ...). This tutorial steps you through\n", "some of the available options for executing a task.\n", "\n", "[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/nipype/pydra-tutorial/develop/notebooks/tutorial/advanced_execution.ipynb)\n", "\n", "Remember that before attempting to run multi-process code in Jupyter notebooks, the\n", "following snippet must be called" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import nest_asyncio\n", "\n", "nest_asyncio.apply()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Submitter\n", "\n", "If you want to access a richer `Result` object you can use a Submitter object to initiate\n", "the task execution. For example, using the `TenToThePower` task from the testing package" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.engine.submitter import Submitter\n", "from pydra.tasks.testing import TenToThePower\n", "\n", "\n", "ten_to_the_power = TenToThePower(p=3)\n", "\n", "with Submitter() as submitter:\n", " result = submitter(ten_to_the_power)\n", "\n", "print(result)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `Result` object contains\n", "\n", "* `output`: the outputs of the task (if there is only one output it is called `out` by default)\n", "* `runtime`: information about the peak memory and CPU usage\n", "* `errored`: the error status of the task\n", "* `task`: the task object that generated the results\n", "* `cache_dir`: the output directory the results are stored in" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Workers\n", "\n", "Pydra supports several workers with which to execute tasks\n", "\n", "- `debug` (default)\n", "- `cf`\n", "- `slurm`\n", "- `sge`\n", "- `psij`\n", "- `dask` (experimental)\n", "\n", "By default, the *debug* worker is used, which runs tasks serially in a single process\n", "without use of the `asyncio` module. This makes it easier to debug errors in workflows\n", "and python tasks, however, when using in Pydra in production you will typically want to\n", "parallelise the execution for efficiency.\n", "\n", "If running on a local workstation, then the `cf` (*ConcurrentFutures*) worker is a good\n", "option because it is able to spread the tasks to be run over multiple processes and\n", "maximise CPU usage.\n", "\n", "If you have access to a high-performance cluster (HPC) then\n", "the [SLURM](https://slurm.schedmd.com/documentation.html) and\n", "[SGE](https://www.metagenomics.wiki/tools/hpc-sge) and [PSI/J](https://exaworks.org/psij)\n", "workers can be used to submit each workflow node as separate jobs to the HPC scheduler.\n", "There is also an experimental [Dask](https://www.dask.org/) worker, which provides a\n", "range of execution backends to choose from.\n", "\n", "To specify a worker, the abbreviation can be passed either as a string or using the\n", "class itself. Additional parameters can be passed to the worker initialisation as keyword\n", "arguments to the execution call. For example, if we wanted to run five tasks using the\n", "ConcurentFutures worker but only use three CPUs, we can pass `n_procs=3` to the execution\n", "call.\n", "\n", "Remember that when calling multi-process code in a top level script the call must be\n", "enclosed within a `if __name__ == \"__main__\"` block to allow the worker processes to\n", "import the module without re-executing it." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import tempfile\n", "\n", "cache_root = tempfile.mkdtemp()\n", "\n", "if __name__ == \"__main__\":\n", "\n", " ten_to_the_power = TenToThePower().split(p=[1, 2, 3, 4, 5])\n", "\n", " # Run the 5 tasks in parallel split across 3 processes\n", " outputs = ten_to_the_power(worker=\"cf\", n_procs=3, cache_root=cache_root)\n", "\n", " p1, p2, p3, p4, p5 = outputs.out\n", "\n", " print(f\"10^5 = {p5}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Alternatively, the worker object can be initialised in the calling code and passed directly to the execution call" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.workers import cf\n", "\n", "ten_to_the_power = TenToThePower().split(p=[6, 7, 8, 9, 10])\n", "\n", "# Run the 5 tasks in parallel split across 3 processes\n", "outputs = ten_to_the_power(worker=cf.Worker(n_procs=3))\n", "\n", "p6, p7, p8, p9, p10 = outputs.out\n", "\n", "print(f\"10^10 = {p10}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Reusing previously generated results\n", "\n", "Pydra caches all task results in the runtime cache (see [File-system locations](./1-getting-started.html##File-system-locations))\n", "as long as exactly the hashes of the inputs provided to the task are the same. Here we\n", "go through some of the practicalities of this caching and hashing (see\n", "[Caches and hashes](../explanation/hashing-caching.html) for more details and issues\n", "to consider)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If we attempt to run the same task with the same parameterisation the cache directory\n", "will point to the same location and the results will be reused" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from copy import copy\n", "from pathlib import Path\n", "import tempfile\n", "from pprint import pprint\n", "from fileformats.medimage import Nifti1\n", "from pydra.engine.submitter import Submitter\n", "from pydra.tasks.mrtrix3.v3_0 import MrGrid\n", "\n", "# Make a temporary directory\n", "test_dir = Path(tempfile.mkdtemp())\n", "nifti_dir = test_dir / \"nifti\"\n", "nifti_dir.mkdir()\n", "\n", "# Generate some random NIfTI files to work with\n", "nifti_files = [Nifti1.sample(nifti_dir, seed=i) for i in range(10)]\n", "\n", "VOX_SIZES = [\n", " (0.5, 0.5, 0.5),\n", " (0.25, 0.25, 0.25),\n", " (0.1, 0.1, 0.1),\n", " (0.35, 0.35, 0.35),\n", " (0.1, 0.1, 0.1),\n", " (0.5, 0.5, 0.5),\n", " (0.25, 0.25, 0.25),\n", " (0.2, 0.2, 0.2),\n", " (0.35, 0.35, 0.35),\n", " (0.1, 0.1, 0.1),\n", "]\n", "\n", "mrgrid_varying_vox = MrGrid(operation=\"regrid\").split(\n", " (\"in_file\", \"voxel\"),\n", " in_file=nifti_files,\n", " voxel=VOX_SIZES,\n", ")\n", "\n", "submitter = Submitter(cache_root=test_dir / \"cache\")\n", "\n", "\n", "with submitter:\n", " result1 = submitter(mrgrid_varying_vox)\n", "\n", "\n", "mrgrid_varying_vox2 = MrGrid(operation=\"regrid\").split(\n", " (\"in_file\", \"voxel\"),\n", " in_file=nifti_files,\n", " voxel=copy(VOX_SIZES),\n", ")\n", "\n", "# Result from previous run is reused as the task and inputs are identical\n", "with submitter:\n", " result2 = submitter(mrgrid_varying_vox2)\n", "\n", "# Check that the output directory is the same for both runs\n", "assert result2.cache_dir == result1.cache_dir\n", "\n", "# Change the voxel sizes to resample the NIfTI files to for one of the files\n", "mrgrid_varying_vox2.voxel[2] = [0.25]\n", "\n", "# Result from previous run is reused as the task and inputs are identical\n", "with submitter:\n", " result3 = submitter(mrgrid_varying_vox2)\n", "\n", "# The output directory will be different as the inputs are now different\n", "assert result3.cache_dir != result1.cache_dir" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that for file objects, the contents of the files are used to calculate the hash\n", "not their paths. Therefore, when inputting large files there might be some additional\n", "overhead on the first run (the file hashes themselves are cached by path and mtime so\n", "shouldn't need to be recalculated unless they are modified). However, this makes the\n", "hashes invariant to file-system movement. For example, changing the name of one of the\n", "files in the nifti directory won't invalidate the hash." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Rename a NIfTI file within the test directory\n", "nifti_files[0] = Nifti1(\n", " nifti_files[0].fspath.rename(nifti_files[0].fspath.with_name(\"first.nii\"))\n", ")\n", "\n", "mrgrid_varying_vox3 = MrGrid(operation=\"regrid\").split(\n", " (\"in_file\", \"voxel\"),\n", " in_file=nifti_files,\n", " voxel=VOX_SIZES,\n", ")\n", "\n", "# Result from previous run is reused as contents of the files have not changed, despite\n", "# the file names changing\n", "with submitter:\n", " result4 = submitter(mrgrid_varying_vox3)\n", "\n", "assert result4.cache_dir == result1.cache_dir\n", "\n", "# Replace the first NIfTI file with a new file\n", "nifti_files[0] = Nifti1.sample(nifti_dir, seed=100)\n", "\n", "# Update the in_file input field to include the new file\n", "mrgrid_varying_vox4 = MrGrid(operation=\"regrid\").split(\n", " (\"in_file\", \"voxel\"),\n", " in_file=nifti_files,\n", " voxel=VOX_SIZES,\n", ")\n", "\n", "# The results from the previous runs are ignored as the files have changed\n", "with submitter:\n", " result4 = submitter(mrgrid_varying_vox4)\n", "\n", "# The cache directory for the new run is different\n", "assert result4.cache_dir != result1.cache_dir" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Environments and hooks\n", "\n", "For shell tasks, it is possible to specify that the command runs within a specific\n", "software environment, such as those provided by software containers (e.g. Docker or Singularity/Apptainer).\n", "This is down by providing the environment to the submitter/execution call," ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import tempfile\n", "from pydra.tasks.mrtrix3.v3_0 import MrGrid\n", "from pydra.environments import docker\n", "\n", "test_dir = tempfile.mkdtemp()\n", "\n", "nifti_file = Nifti1.sample(test_dir, seed=0)\n", "\n", "# Instantiate the task, \"splitting\" over all NIfTI files in the test directory\n", "# by splitting the \"input\" input field over all files in the directory\n", "mrgrid = MrGrid(in_file=nifti_file, operation=\"regrid\", voxel=(0.5, 0.5, 0.5))\n", "\n", "# Run the task to resample all NIfTI files\n", "outputs = mrgrid(environment=docker.Environment(image=\"mrtrix3/mrtrix3\", tag=\"latest\"))\n", "\n", "# Print the locations of the output files\n", "pprint(outputs.out_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Of course for this to work Docker needs to work and be configured for\n", "[sudo-less execution](https://docs.docker.com/engine/install/linux-postinstall/).\n", "See [Containers and Environments](../explanation/environments.rst) for more details on\n", "how to utilise containers and add support for other software environments.\n", "\n", "It is also possible to specify functions to run at hooks that are immediately before and after\n", "the task is executed by passing a `pydra.engine.spec.TaskHooks` object to the `hooks`\n", "keyword arg. The callable should take the `pydra.engine.core.Job` object as its only\n", "argument and return None. The available hooks to attach functions are:\n", "\n", "* pre_run: before the task cache directory is created\n", "* pre_run_task: after the cache directory has been created and the inputs resolved but before the task is executed\n", "* post_run_task: after the task has been run and the outputs collected\n", "* post_run: after the cache directory has been finalised\n", "\n", "\n", "QUESTION: What are these hooks intended for? Should the post_run_task hook be run before the outputs have been\n", "collected?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.engine.job import Job\n", "from pydra.engine.hooks import TaskHooks\n", "from pydra.engine.result import Result\n", "import os\n", "import platform\n", "\n", "\n", "def notify_task_completion(task: Job, result: Result):\n", " # Print a message to the terminal\n", " print(f\"Job completed! Results are stored in {str(task.cache_dir)!r}\")\n", "\n", " # Platform-specific notifications\n", " if platform.system() == \"Darwin\": # macOS\n", " os.system(\n", " 'osascript -e \\'display notification \"Job has completed successfully!\" '\n", " 'with title \"Job Notification\"\\''\n", " )\n", " elif platform.system() == \"Linux\": # Linux\n", " os.system('notify-send \"Job Notification\" \"Job has completed successfully!\"')\n", " elif platform.system() == \"Windows\": # Windows\n", " os.system('msg * \"Job has completed successfully!\"')\n", "\n", "\n", "# Run the task to resample all NIfTI files\n", "outputs = mrgrid(\n", " hooks=TaskHooks(post_run=notify_task_completion), cache_root=tempfile.mkdtemp()\n", ")\n", "\n", "# Print the locations of the output files\n", "pprint(outputs.out_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Provenance and auditing\n", "\n", "Work in progress..." ] } ], "metadata": { "kernelspec": { "display_name": "wf13", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.1" } }, "nbformat": 4, "nbformat_minor": 2 }