Getting started¶
The basic runnable component of Pydra is a task. Tasks are conceptually similar to functions, in that they take inputs, operate on them and then return results. However, unlike functions, tasks are parameterised before they are executed in a separate step. This enables parameterised tasks to be linked together into workflows that are checked for errors before they are executed, and modular execution workers and environments to specified independently of the task being performed.
Tasks can encapsulate Python functions or shell-commands, or be multi-component workflows, themselves constructed from task components including nested workflows.
Preparation¶
Before we get started, lets set up some test data to play with. Here we create a sample JSON file in a temporary directory
[1]:
from pathlib import Path
from tempfile import mkdtemp
from pprint import pprint
import json
JSON_CONTENTS = {"a": True, "b": "two", "c": 3, "d": [7, 0.55, 6]}
test_dir = Path(mkdtemp())
json_file = test_dir / "test.json"
with open(json_file, "w") as f:
json.dump(JSON_CONTENTS, f)
Next we create a directory containing 10 randomly generated NIfTI files
[2]:
from fileformats.medimage import Nifti1
nifti_dir = test_dir / "nifti"
nifti_dir.mkdir()
for i in range(10):
Nifti1.sample(nifti_dir, seed=i) # Create a dummy NIfTI file in the dest. directory
Note that when you run concurrent processes within a Jupyter notebook the following snippet is also required
[3]:
import nest_asyncio
nest_asyncio.apply()
Running your first task¶
Pydra allows you to install independent packages with pre-defined tasks (e.g., pydra-fsl
, pydra-ants
). The task from the packages are installed under the pydra.tasks.*
. You always have access to pydra.tasks.common
, in addition pydra-mrtrix3.v3_0
was also installed for this tutorial. To use a pre-defined task
import the class from the
pydra.tasks.*
package it is ininstantiate it with appropriate parameters
"call" resulting object (i.e.
my_task(...)
) to execute it as you would a function
To demonstrate with an example of loading a JSON file with the pydra.tasks.common.LoadJson
task, we first create an example JSON file to test with
Now we can load the JSON contents back from the file using the LoadJson
task class
[4]:
# Import the task
from pydra.tasks.common import LoadJson
# Instantiate the task, providing the JSON file we want to load
load_json = LoadJson(file=json_file)
# Run the task to load the JSON file
outputs = load_json()
# Access the loaded JSON output contents and check they match original
assert outputs.out == JSON_CONTENTS
Iterating over inputs¶
It is straightforward to apply the same operation over a set of inputs using the split()
method. For example, if we wanted to re-grid all the NIfTI images stored in a directory, such as the sample ones generated by the code below
Then we can by importing the MrGrid
shell-command task from the pydra-mrtrix3
package and run it over every NIfTI file in the directory using the Task.split()
method
[5]:
from pydra.tasks.mrtrix3.v3_0 import MrGrid
# Instantiate the task, "splitting" over all NIfTI files in the test directory
# by splitting the "input" input field over all files in the directory
mrgrid = MrGrid(operation="regrid", voxel=(0.5, 0.5, 0.5)).split(
in_file=nifti_dir.iterdir()
)
# Run the task to resample all NIfTI files
outputs = mrgrid()
# Print the locations of the output files
pprint(outputs.out_file)
[ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file.mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (1).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (2).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (3).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (4).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (5).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (6).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (7).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (8).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (9).mif')]
It is also possible to iterate over inputs in pairs/n-tuples. For example, if you wanted to use different voxel sizes for different images, both the list of images and the voxel sizes are passed to the split()
method and their combination is specified by a tuple "splitter"
Note that it is important to use a tuple not a list for the splitter definition in this case, because a list splitter is interpreted as the split over each combination of inputs (see Splitting and combining for more details on splitters).
[6]:
mrgrid_varying_vox_sizes = MrGrid(operation="regrid").split(
("in_file", "voxel"),
in_file=nifti_dir.iterdir(),
# Define a list of voxel sizes to resample the NIfTI files to,
# the list must be the same length as the list of NIfTI files
voxel=[
(1.0, 1.0, 1.0),
(1.0, 1.0, 1.0),
(1.0, 1.0, 1.0),
(0.5, 0.5, 0.5),
(0.75, 0.75, 0.75),
(0.5, 0.5, 0.5),
(0.5, 0.5, 0.5),
(1.0, 1.0, 1.0),
(1.25, 1.25, 1.25),
(1.25, 1.25, 1.25),
],
)
outputs = mrgrid_varying_vox_sizes()
pprint(outputs.out_file)
[ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file.mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (1).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (2).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (3).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (4).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (5).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (6).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (7).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (8).mif'),
ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-6134c9fdb044248c905cbd279f4a5700/out_file (9).mif')]
Executing tasks in parallel¶
By default, Pydra will use the debug worker, which executes each task sequentially. This makes it easier to debug tasks and workflows, however, in most cases, once a workflow is tested, a concurrent worker is preferable so tasks can be executed in parallel (see Workers). To use multiple processes on a workstation, select the cf
worker option when executing the task/workflow. Additional keyword arguments, will be passed to the worker initialisation
(e.g. n_procs=4
).
Note that when multiprocessing in Python on Windows and macOS (and good practice on Linux/POSIX OSs for compatibility), you need to place a if __name__ == "__main__"
block when executing in top-level scripts to allow the script to be imported, but not executed, by subprocesses.
[7]:
from pydra.tasks.mrtrix3.v3_0 import MrGrid
if (
__name__ == "__main__"
): # <-- Add this block to allow the script to imported by subprocesses
mrgrid = MrGrid(operation="regrid", voxel=(0.5, 0.5, 0.5)).split(
in_file=nifti_dir.iterdir()
)
outputs = mrgrid(worker="cf", n_procs=4) # <-- Select the "cf" worker here
print("\n".join(str(p) for p in outputs.out_file))
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file.mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (1).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (2).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (3).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (4).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (5).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (6).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (7).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (8).mif
/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (9).mif
File-system locations¶
Output and intermediate files are typically generated during the course of a workflow/task run. In addition to this, Pydra generates a cache directory for each task, in which the task, results and any errors are stored in cloudpickle files for future reference (see Troubleshooting). By default, these cache directories are stored in a platform-specific application-cache directory
Windows:
C:\Users\<username>\AppData\Local\pydra\<pydra-version>\run-cache
Linux:
/home/<username>/.cache/pydra/<pydra-version>/run-cache
macOS:
/Users/<username>/Library/Caches/pydra/<pydra-version>/run-cache
When a task runs, a unique hash is generated by the combination of all the inputs to the task and the operation to be performed. This hash is used to name the task cache directory within the specified cache root. Therefore, if you use the same cache root and in a subsequent run the same task is executed with the same inputs, then the path of its cache directory will be the same, and if Pydra finds existing results at that path, then the outputs generated by the previous run will be reused.
This cache will grow as more runs are called, therefore care needs to be taken to ensure there is enough space on the target disk. a different location for this cache, simply provide the cache_root
keyword argument to the execution call
[8]:
outputs = mrgrid(cache_root=Path("~/pydra-cache").expanduser())
pprint(outputs)
SplitOutputs(out_file=[ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file.mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (1).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (2).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (3).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (4).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (5).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (6).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (7).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (8).mif'), ImageFormat('/home/runner/pydra-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (9).mif')], return_code=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0], stderr=['mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/XYEqaPDVxVqXTzHQlIXqXQ0u.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/0UAqFzWsDK4FrUMp48Y3tT3Q.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/Cxi1aF3fIMcxRCSjogF5KWxU.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/oHwLMeaZqo9DZDNjN1GTPdVK.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/ujOeHwdFcAefAZhnM6Jy8c1r.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/o7hRfp9mNCCElZfobqz3xhq9.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/i0VpEBOWfbZAVaBSo63bbH6x.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/76dfZTPtLLKjAyS96HBqcbCt.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/MTX6T5bC6O3hDpHJanr4VjXi.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/ogyjey4XVnHrkgn5ZYXltMT0.nii"... [==================================================]\n'], stdout=['', '', '', '', '', '', '', '', '', ''])
To check alternative cache roots, while storing any generated task cache dirs in the specified cache root, the readonly_caches
keyword argument can be used
[9]:
from pydra.utils.general import default_run_cache_root
my_cache_root = Path("~/new-pydra-cache").expanduser()
my_cache_root.mkdir(exist_ok=True)
outputs = mrgrid(cache_root=my_cache_root, readonly_caches=[default_run_cache_root])
print(outputs)
SplitOutputs(out_file=[ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file.mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (1).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (2).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (3).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (4).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (5).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (6).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (7).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (8).mif'), ImageFormat('/home/runner/.cache/pydra/1.0a3/run-cache/workflow-cd3289da91216f00f0b277edde33504c/out_file (9).mif')], return_code=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0], stderr=['mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/XYEqaPDVxVqXTzHQlIXqXQ0u.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/0UAqFzWsDK4FrUMp48Y3tT3Q.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/Cxi1aF3fIMcxRCSjogF5KWxU.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/oHwLMeaZqo9DZDNjN1GTPdVK.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/ujOeHwdFcAefAZhnM6Jy8c1r.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/o7hRfp9mNCCElZfobqz3xhq9.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/i0VpEBOWfbZAVaBSo63bbH6x.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/76dfZTPtLLKjAyS96HBqcbCt.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/MTX6T5bC6O3hDpHJanr4VjXi.nii"... [==================================================]\n', 'mrgrid: reslicing "/tmp/tmpi8n7flbs/nifti/ogyjey4XVnHrkgn5ZYXltMT0.nii"... [==================================================]\n'], stdout=['', '', '', '', '', '', '', '', '', ''])