{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Getting started\n", "\n", "The basic runnable component of Pydra is a *task*. Tasks are conceptually similar to\n", "functions, in that they take inputs, operate on them and then return results. However,\n", "unlike functions, tasks are parameterised before they are executed in a separate step.\n", "This enables parameterised tasks to be linked together into workflows that are checked for\n", "errors before they are executed, and modular execution workers and environments to specified\n", "independently of the task being performed.\n", "\n", "Tasks can encapsulate Python functions or shell-commands, or be multi-component workflows,\n", "themselves constructed from task components including nested workflows.\n", "\n", "## Preparation\n", "\n", "Before we get started, lets set up some test data to play with. Here we create a sample\n", "JSON file in a temporary directory" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pathlib import Path\n", "from tempfile import mkdtemp\n", "from pprint import pprint\n", "import json\n", "\n", "JSON_CONTENTS = {\"a\": True, \"b\": \"two\", \"c\": 3, \"d\": [7, 0.55, 6]}\n", "\n", "test_dir = Path(mkdtemp())\n", "json_file = test_dir / \"test.json\"\n", "with open(json_file, \"w\") as f:\n", " json.dump(JSON_CONTENTS, f)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we create a directory containing 10 randomly generated [NIfTI](https://nifti.nimh.nih.gov/) files" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fileformats.medimage import Nifti1\n", "\n", "nifti_dir = test_dir / \"nifti\"\n", "nifti_dir.mkdir()\n", "\n", "for i in range(10):\n", " Nifti1.sample(nifti_dir, seed=i) # Create a dummy NIfTI file in the dest. directory" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that when you run concurrent processes within a Jupyter notebook the following snippet\n", "is also required" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import nest_asyncio\n", "\n", "nest_asyncio.apply()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## Running your first task\n", "\n", "Pydra allows you to install independent packages with pre-defined tasks (e.g., `pydra-fsl`, `pydra-ants`). The task from the packages are installed under the `pydra.tasks.*`. You always have access to `pydra.tasks.common`, in addition `pydra-mrtrix3.v3_0` was also installed for this tutorial. To use a pre-defined task\n", "\n", "* import the class from the `pydra.tasks.*` package it is in\n", "* instantiate it with appropriate parameters\n", "* \"call\" resulting object (i.e. `my_task(...)`) to execute it as you would a function \n", "\n", "To demonstrate with an example of loading a JSON file with the\n", "`pydra.tasks.common.LoadJson` task, we first create an example JSON file to test with" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can load the JSON contents back from the file using the `LoadJson` task\n", "class" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import the task\n", "from pydra.tasks.common import LoadJson\n", "\n", "# Instantiate the task, providing the JSON file we want to load\n", "load_json = LoadJson(file=json_file)\n", "\n", "# Run the task to load the JSON file\n", "outputs = load_json()\n", "\n", "# Access the loaded JSON output contents and check they match original\n", "assert outputs.out == JSON_CONTENTS" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Iterating over inputs\n", "\n", "It is straightforward to apply the same operation over a set of inputs using the `split()`\n", "method. For example, if we wanted to re-grid all the NIfTI images stored in a directory,\n", "such as the sample ones generated by the code below" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then we can by importing the `MrGrid` shell-command task from the `pydra-mrtrix3` package\n", "and run it over every NIfTI file in the directory using the `Task.split()` method" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.tasks.mrtrix3.v3_0 import MrGrid\n", "\n", "# Instantiate the task, \"splitting\" over all NIfTI files in the test directory\n", "# by splitting the \"input\" input field over all files in the directory\n", "mrgrid = MrGrid(operation=\"regrid\", voxel=(0.5, 0.5, 0.5)).split(\n", " in_file=nifti_dir.iterdir()\n", ")\n", "\n", "# Run the task to resample all NIfTI files\n", "outputs = mrgrid()\n", "\n", "# Print the locations of the output files\n", "pprint(outputs.out_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is also possible to iterate over inputs in pairs/n-tuples. For example, if you wanted to use\n", "different voxel sizes for different images, both the list of images and the voxel sizes\n", "are passed to the `split()` method and their combination is specified by a tuple \"splitter\"\n", "\n", "\n", "Note that it is important to use a tuple not a list for the splitter definition in this\n", "case, because a list splitter is interpreted as the split over each combination of inputs\n", "(see [Splitting and combining](../explanation/splitting-combining.html) for more details\n", "on splitters)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mrgrid_varying_vox_sizes = MrGrid(operation=\"regrid\").split(\n", " (\"in_file\", \"voxel\"),\n", " in_file=nifti_dir.iterdir(),\n", " # Define a list of voxel sizes to resample the NIfTI files to,\n", " # the list must be the same length as the list of NIfTI files\n", " voxel=[\n", " (1.0, 1.0, 1.0),\n", " (1.0, 1.0, 1.0),\n", " (1.0, 1.0, 1.0),\n", " (0.5, 0.5, 0.5),\n", " (0.75, 0.75, 0.75),\n", " (0.5, 0.5, 0.5),\n", " (0.5, 0.5, 0.5),\n", " (1.0, 1.0, 1.0),\n", " (1.25, 1.25, 1.25),\n", " (1.25, 1.25, 1.25),\n", " ],\n", ")\n", "\n", "outputs = mrgrid_varying_vox_sizes()\n", "\n", "pprint(outputs.out_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Executing tasks in parallel\n", "\n", "By default, Pydra will use the *debug* worker, which executes each task sequentially.\n", "This makes it easier to debug tasks and workflows, however, in most cases, once a workflow\n", "is tested, a concurrent worker is preferable so tasks can be executed in parallel\n", "(see [Workers](./3-advanced-execution.html#Workers)). To use multiple processes on a\n", "workstation, select the `cf` worker option when executing the task/workflow. Additional\n", "keyword arguments, will be passed to the worker initialisation (e.g. `n_procs=4`).\n", "\n", "Note that when multiprocessing in Python on Windows and macOS (and good practice on Linux/POSIX\n", "OSs for compatibility), you need to place a `if __name__ == \"__main__\"` block when\n", "executing in top-level scripts to allow the script to be imported, but not executed,\n", "by subprocesses." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.tasks.mrtrix3.v3_0 import MrGrid\n", "\n", "if (\n", " __name__ == \"__main__\"\n", "): # <-- Add this block to allow the script to imported by subprocesses\n", " mrgrid = MrGrid(operation=\"regrid\", voxel=(0.5, 0.5, 0.5)).split(\n", " in_file=nifti_dir.iterdir()\n", " )\n", " outputs = mrgrid(worker=\"cf\", n_procs=4) # <-- Select the \"cf\" worker here\n", " print(\"\\n\".join(str(p) for p in outputs.out_file))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## File-system locations\n", "\n", "Output and intermediate files are typically generated during the course of a workflow/task run.\n", "In addition to this, Pydra generates a cache directory for each task, in which\n", "the task, results and any errors are stored in [cloudpickle](https://github.com/cloudpipe/cloudpickle)\n", "files for future reference (see [Troubleshooting](./troubleshooting.html)).\n", "By default, these cache directories are stored in a platform-specific application-cache\n", "directory\n", "\n", "* Windows: `C:\\Users\\\\AppData\\Local\\pydra\\\\run-cache`\n", "* Linux: `/home//.cache/pydra//run-cache`\n", "* macOS: `/Users//Library/Caches/pydra//run-cache`\n", "\n", "When a task runs, a unique hash is generated by the combination of all the inputs to the\n", "task and the operation to be performed. This hash is used to name the task cache directory\n", "within the specified cache root. Therefore, if you use the same cache\n", "root and in a subsequent run the same task is executed with the same\n", "inputs, then the path of its cache directory will be the same, and if Pydra finds\n", "existing results at that path, then the outputs generated by the previous run will be\n", "reused.\n", "\n", "This cache will grow as more runs are called, therefore care needs to be taken to ensure\n", "there is enough space on the target disk.\n", "a different location for this cache, simply provide the `cache_root` keyword argument to the execution call" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "outputs = mrgrid(cache_root=Path(\"~/pydra-cache\").expanduser())\n", "\n", "pprint(outputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To check alternative cache roots, while storing any generated task cache dirs in the \n", "specified cache root, the `readonly_caches` keyword argument can be used" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.utils.general import default_run_cache_root\n", "\n", "my_cache_root = Path(\"~/new-pydra-cache\").expanduser()\n", "my_cache_root.mkdir(exist_ok=True)\n", "\n", "outputs = mrgrid(cache_root=my_cache_root, readonly_caches=[default_run_cache_root])\n", "\n", "print(outputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [] } ], "metadata": { "kernelspec": { "display_name": "wf13", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.1" } }, "nbformat": 4, "nbformat_minor": 2 }