Getting started

The basic runnable component of Pydra is a task. Tasks are conceptually similar to functions, in that they take inputs, operate on them and then return results. However, unlike functions, tasks are parameterised before they are executed in a separate step. This enables parameterised tasks to be linked together into workflows that are checked for errors before they are executed, and modular execution workers and environments to specified independently of the task being performed.

Tasks can encapsulate Python functions or shell-commands, or be multi-component workflows, themselves constructed from task components including nested workflows.

Preparation

Before we get started, lets set up some test data to play with. Here we create a sample JSON file in a temporary directory

[1]:
from pathlib import Path
from tempfile import mkdtemp
from pprint import pprint
import json

JSON_CONTENTS = {"a": True, "b": "two", "c": 3, "d": [7, 0.55, 6]}

test_dir = Path(mkdtemp())
json_file = test_dir / "test.json"
with open(json_file, "w") as f:
    json.dump(JSON_CONTENTS, f)

Next we create a directory containing 10 randomly generated NIfTI files

[2]:
from fileformats.medimage import Nifti1

nifti_dir = test_dir / "nifti"
nifti_dir.mkdir()

for i in range(10):
    Nifti1.sample(nifti_dir, seed=i)  # Create a dummy NIfTI file in the dest. directory

Note that when you run concurrent processes within a Jupyter notebook the following snippet is also required

[3]:
import nest_asyncio

nest_asyncio.apply()

Running your first task

Pydra allows you to install independent packages with pre-defined tasks (e.g., pydra-fsl, pydra-ants). The task from the packages are installed under the pydra.tasks.*. You always have access to pydra.tasks.common, in addition pydra-mrtrix3.v3_0 was also installed for this tutorial. To use a pre-defined task

  • import the class from the pydra.tasks.* package it is in

  • instantiate it with appropriate parameters

  • "call" resulting object (i.e. my_task(...)) to execute it as you would a function

To demonstrate with an example of loading a JSON file with the pydra.tasks.common.LoadJson task, we first create an example JSON file to test with

Now we can load the JSON contents back from the file using the LoadJson task class

[4]:
# Import the task
from pydra.tasks.common import LoadJson

# Instantiate the task, providing the JSON file we want to load
load_json = LoadJson(file=json_file)

# Run the task to load the JSON file
outputs = load_json()

# Access the loaded JSON output contents and check they match original
assert outputs.out == JSON_CONTENTS

Iterating over inputs

It is straightforward to apply the same operation over a set of inputs using the split() method. For example, if we wanted to re-grid all the NIfTI images stored in a directory, such as the sample ones generated by the code below

Then we can by importing the MrGrid shell-command task from the pydra-mrtrix3 package and run it over every NIfTI file in the directory using the Task.split() method

[5]:
from pydra.tasks.mrtrix3.v3_0 import MrGrid

# Instantiate the task, "splitting" over all NIfTI files in the test directory
# by splitting the "input" input field over all files in the directory
mrgrid = MrGrid(operation="regrid", voxel=(0.5, 0.5, 0.5)).split(
    in_file=nifti_dir.iterdir()
)

# Run the task to resample all NIfTI files
outputs = mrgrid()

# Print the locations of the output files
pprint(outputs.out_file)
[ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file.mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (1).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (2).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (3).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (4).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (5).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (6).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (7).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (8).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (9).mif')]

It is also possible to iterate over inputs in pairs/n-tuples. For example, if you wanted to use different voxel sizes for different images, both the list of images and the voxel sizes are passed to the split() method and their combination is specified by a tuple "splitter"

Note that it is important to use a tuple not a list for the splitter definition in this case, because a list splitter is interpreted as the split over each combination of inputs (see Splitting and combining for more details on splitters).

[6]:
mrgrid_varying_vox_sizes = MrGrid(operation="regrid").split(
    ("in_file", "voxel"),
    in_file=nifti_dir.iterdir(),
    # Define a list of voxel sizes to resample the NIfTI files to,
    # the list must be the same length as the list of NIfTI files
    voxel=[
        (1.0, 1.0, 1.0),
        (1.0, 1.0, 1.0),
        (1.0, 1.0, 1.0),
        (0.5, 0.5, 0.5),
        (0.75, 0.75, 0.75),
        (0.5, 0.5, 0.5),
        (0.5, 0.5, 0.5),
        (1.0, 1.0, 1.0),
        (1.25, 1.25, 1.25),
        (1.25, 1.25, 1.25),
    ],
)

outputs = mrgrid_varying_vox_sizes()

pprint(outputs.out_file)
[ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file.mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (1).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (2).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (3).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (4).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (5).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (6).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (7).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (8).mif'),
 ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (9).mif')]

Executing tasks in parallel

By default, Pydra will use the debug worker, which executes each task sequentially. This makes it easier to debug tasks and workflows, however, in most cases, once a workflow is tested, a concurrent worker is preferable so tasks can be executed in parallel (see Workers). To use multiple processes on a workstation, select the cf worker option when executing the task/workflow. Additional keyword arguments, will be passed to the worker initialisation (e.g. n_procs=4).

Note that when multiprocessing in Python on Windows and macOS (and good practice on Linux/POSIX OSs for compatibility), you need to place a if __name__ == "__main__" block when executing in top-level scripts to allow the script to be imported, but not executed, by subprocesses.

[7]:
from pydra.tasks.mrtrix3.v3_0 import MrGrid

if (
    __name__ == "__main__"
):  # <-- Add this block to allow the script to imported by subprocesses
    mrgrid = MrGrid(operation="regrid", voxel=(0.5, 0.5, 0.5)).split(
        in_file=nifti_dir.iterdir()
    )
    outputs = mrgrid(worker="cf", n_procs=4)  # <-- Select the "cf" worker here
    print("\n".join(str(p) for p in outputs.out_file))
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file.mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (1).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (2).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (3).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (4).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (5).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (6).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (7).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (8).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (9).mif

File-system locations

Output and intermediate files are typically generated during the course of a workflow/task run. In addition to this, Pydra generates a cache directory for each task, in which the task, results and any errors are stored in cloudpickle files for future reference (see Troubleshooting). By default, these cache directories are stored in a platform-specific application-cache directory

  • Windows: C:\Users\<username>\AppData\Local\pydra\<pydra-version>\run-cache

  • Linux: /home/<username>/.cache/pydra/<pydra-version>/run-cache

  • macOS: /Users/<username>/Library/Caches/pydra/<pydra-version>/run-cache

When a task runs, a unique hash is generated by the combination of all the inputs to the task and the operation to be performed. This hash is used to name the task cache directory within the specified cache root. Therefore, if you use the same cache root and in a subsequent run the same task is executed with the same inputs, then the path of its cache directory will be the same, and if Pydra finds existing results at that path, then the outputs generated by the previous run will be reused.

This cache will grow as more runs are called, therefore care needs to be taken to ensure there is enough space on the target disk. a different location for this cache, simply provide the cache_root keyword argument to the execution call

[8]:
outputs = mrgrid(cache_root=Path("~/pydra-cache").expanduser())

pprint(outputs)
SplitOutputs(out_file=[ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file.mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (1).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (2).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (3).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (4).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (5).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (6).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (7).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (8).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (9).mif')], return_code=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0], stderr=['mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/XYEqaPDVxVqXTzHQlIXqXQ0u.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/0UAqFzWsDK4FrUMp48Y3tT3Q.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/Cxi1aF3fIMcxRCSjogF5KWxU.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/oHwLMeaZqo9DZDNjN1GTPdVK.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/ujOeHwdFcAefAZhnM6Jy8c1r.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/o7hRfp9mNCCElZfobqz3xhq9.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/i0VpEBOWfbZAVaBSo63bbH6x.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/76dfZTPtLLKjAyS96HBqcbCt.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/MTX6T5bC6O3hDpHJanr4VjXi.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/ogyjey4XVnHrkgn5ZYXltMT0.nii"... [==================================================]\n'], stdout=['', '', '', '', '', '', '', '', '', ''])

To check alternative cache roots, while storing any generated task cache dirs in the specified cache root, the readonly_caches keyword argument can be used

[9]:
from pydra.utils.general import default_run_cache_root

my_cache_root = Path("~/new-pydra-cache").expanduser()
my_cache_root.mkdir(exist_ok=True)

outputs = mrgrid(cache_root=my_cache_root, readonly_caches=[default_run_cache_root])

print(outputs)
SplitOutputs(out_file=[ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file.mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (1).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (2).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (3).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (4).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (5).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (6).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (7).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (8).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (9).mif')], return_code=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0], stderr=['mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/XYEqaPDVxVqXTzHQlIXqXQ0u.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/0UAqFzWsDK4FrUMp48Y3tT3Q.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/Cxi1aF3fIMcxRCSjogF5KWxU.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/oHwLMeaZqo9DZDNjN1GTPdVK.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/ujOeHwdFcAefAZhnM6Jy8c1r.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/o7hRfp9mNCCElZfobqz3xhq9.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/i0VpEBOWfbZAVaBSo63bbH6x.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/76dfZTPtLLKjAyS96HBqcbCt.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/MTX6T5bC6O3hDpHJanr4VjXi.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/ogyjey4XVnHrkgn5ZYXltMT0.nii"... [==================================================]\n'], stdout=['', '', '', '', '', '', '', '', '', ''])