Advanced execution

One of the key design features of Pydra is the separation between the parameterisation of the task to be executed, and the parameresiation of where and how the task should be executed (e.g. on the cloud, on a HPC cluster, …). This tutorial steps you through some of the available options for executing a task.

Binder

Remember that before attempting to run multi-process code in Jupyter notebooks, the following snippet must be called

[1]:
import nest_asyncio

nest_asyncio.apply()

Submitter

If you want to access a richer Result object you can use a Submitter object to initiate the task execution. For example, using the TenToThePower task from the testing package

[2]:
from pydra.engine.submitter import Submitter
from pydra.tasks.testing import TenToThePower


ten_to_the_power = TenToThePower(p=3)

with Submitter() as submitter:
    result = submitter(ten_to_the_power)

print(result)
Result(cache_dir=PosixPath('/home/runner/.cache/pydra/1.0a0/run-cache/python-c2d7b5fda7cdf238279c3ceab3cae3f9'), outputs=TenToThePowerOutputs(out=1000), runtime=None, errored=False, task=TenToThePower(p=3))

The Result object contains

  • output: the outputs of the task (if there is only one output it is called out by default)

  • runtime: information about the peak memory and CPU usage

  • errored: the error status of the task

  • task: the task object that generated the results

  • cache_dir: the output directory the results are stored in

Workers

Pydra supports several workers with which to execute tasks

  • debug (default)

  • cf

  • slurm

  • sge

  • psij

  • dask (experimental)

By default, the debug worker is used, which runs tasks serially in a single process without use of the asyncio module. This makes it easier to debug errors in workflows and python tasks, however, when using in Pydra in production you will typically want to parallelise the execution for efficiency.

If running on a local workstation, then the cf (ConcurrentFutures) worker is a good option because it is able to spread the tasks to be run over multiple processes and maximise CPU usage.

If you have access to a high-performance cluster (HPC) then the SLURM and SGE and PSI/J workers can be used to submit each workflow node as separate jobs to the HPC scheduler. There is also an experimental Dask worker, which provides a range of execution backends to choose from.

To specify a worker, the abbreviation can be passed either as a string or using the class itself. Additional parameters can be passed to the worker initialisation as keyword arguments to the execution call. For example, if we wanted to run five tasks using the ConcurentFutures worker but only use three CPUs, we can pass n_procs=3 to the execution call.

Remember that when calling multi-process code in a top level script the call must be enclosed within a if __name__ == "__main__" block to allow the worker processes to import the module without re-executing it.

[3]:
import tempfile

cache_root = tempfile.mkdtemp()

if __name__ == "__main__":

    ten_to_the_power = TenToThePower().split(p=[1, 2, 3, 4, 5])

    # Run the 5 tasks in parallel split across 3 processes
    outputs = ten_to_the_power(worker="cf", n_procs=3, cache_root=cache_root)

    p1, p2, p3, p4, p5 = outputs.out

    print(f"10^5 = {p5}")
10^5 = 100000

Alternatively, the worker object can be initialised in the calling code and passed directly to the execution call

[4]:
from pydra.workers import cf

ten_to_the_power = TenToThePower().split(p=[6, 7, 8, 9, 10])

# Run the 5 tasks in parallel split across 3 processes
outputs = ten_to_the_power(worker=cf.Worker(n_procs=3))

p6, p7, p8, p9, p10 = outputs.out

print(f"10^10 = {p10}")
10^10 = 10000000000

Reusing previously generated results

Pydra caches all task results in the runtime cache (see File-system locations) as long as exactly the hashes of the inputs provided to the task are the same. Here we go through some of the practicalities of this caching and hashing (see Caches and hashes for more details and issues to consider).

If we attempt to run the same task with the same parameterisation the cache directory will point to the same location and the results will be reused

[5]:
from copy import copy
from pathlib import Path
import tempfile
from pprint import pprint
from fileformats.medimage import Nifti1
from pydra.engine.submitter import Submitter
from pydra.tasks.mrtrix3.v3_0 import MrGrid

# Make a temporary directory
test_dir = Path(tempfile.mkdtemp())
nifti_dir = test_dir / "nifti"
nifti_dir.mkdir()

# Generate some random NIfTI files to work with
nifti_files = [Nifti1.sample(nifti_dir, seed=i) for i in range(10)]

VOX_SIZES = [
    (0.5, 0.5, 0.5),
    (0.25, 0.25, 0.25),
    (0.1, 0.1, 0.1),
    (0.35, 0.35, 0.35),
    (0.1, 0.1, 0.1),
    (0.5, 0.5, 0.5),
    (0.25, 0.25, 0.25),
    (0.2, 0.2, 0.2),
    (0.35, 0.35, 0.35),
    (0.1, 0.1, 0.1),
]

mrgrid_varying_vox = MrGrid(operation="regrid").split(
    ("in_file", "voxel"),
    in_file=nifti_files,
    voxel=VOX_SIZES,
)

submitter = Submitter(cache_root=test_dir / "cache")


with submitter:
    result1 = submitter(mrgrid_varying_vox)


mrgrid_varying_vox2 = MrGrid(operation="regrid").split(
    ("in_file", "voxel"),
    in_file=nifti_files,
    voxel=copy(VOX_SIZES),
)

# Result from previous run is reused as the task and inputs are identical
with submitter:
    result2 = submitter(mrgrid_varying_vox2)

# Check that the output directory is the same for both runs
assert result2.cache_dir == result1.cache_dir

# Change the voxel sizes to resample the NIfTI files to for one of the files
mrgrid_varying_vox2.voxel[2] = [0.25]

# Result from previous run is reused as the task and inputs are identical
with submitter:
    result3 = submitter(mrgrid_varying_vox2)

# The output directory will be different as the inputs are now different
assert result3.cache_dir != result1.cache_dir

Note that for file objects, the contents of the files are used to calculate the hash not their paths. Therefore, when inputting large files there might be some additional overhead on the first run (the file hashes themselves are cached by path and mtime so shouldn’t need to be recalculated unless they are modified). However, this makes the hashes invariant to file-system movement. For example, changing the name of one of the files in the nifti directory won’t invalidate the hash.

[6]:
# Rename a NIfTI file within the test directory
nifti_files[0] = Nifti1(
    nifti_files[0].fspath.rename(nifti_files[0].fspath.with_name("first.nii"))
)

mrgrid_varying_vox3 = MrGrid(operation="regrid").split(
    ("in_file", "voxel"),
    in_file=nifti_files,
    voxel=VOX_SIZES,
)

# Result from previous run is reused as contents of the files have not changed, despite
# the file names changing
with submitter:
    result4 = submitter(mrgrid_varying_vox3)

assert result4.cache_dir == result1.cache_dir

# Replace the first NIfTI file with a new file
nifti_files[0] = Nifti1.sample(nifti_dir, seed=100)

# Update the in_file input field to include the new file
mrgrid_varying_vox4 = MrGrid(operation="regrid").split(
    ("in_file", "voxel"),
    in_file=nifti_files,
    voxel=VOX_SIZES,
)

# The results from the previous runs are ignored as the files have changed
with submitter:
    result4 = submitter(mrgrid_varying_vox4)

# The cache directory for the new run is different
assert result4.cache_dir != result1.cache_dir

Environments and hooks

For shell tasks, it is possible to specify that the command runs within a specific software environment, such as those provided by software containers (e.g. Docker or Singularity/Apptainer). This is down by providing the environment to the submitter/execution call,

[7]:
import tempfile
from pydra.tasks.mrtrix3.v3_0 import MrGrid
from pydra.environments import docker

test_dir = tempfile.mkdtemp()

nifti_file = Nifti1.sample(test_dir, seed=0)

# Instantiate the task, "splitting" over all NIfTI files in the test directory
# by splitting the "input" input field over all files in the directory
mrgrid = MrGrid(in_file=nifti_file, operation="regrid", voxel=(0.5, 0.5, 0.5))

# Run the task to resample all NIfTI files
outputs = mrgrid(environment=docker.Environment(image="mrtrix3/mrtrix3", tag="latest"))

# Print the locations of the output files
pprint(outputs.out_file)
ImageFormat('/home/runner/.cache/pydra/1.0a0/run-cache/shell-cc1c66c8695b0df72632ee1e0c950c20/out_file.mif')

Of course for this to work Docker needs to work and be configured for sudo-less execution. See Containers and Environments for more details on how to utilise containers and add support for other software environments.

It is also possible to specify functions to run at hooks that are immediately before and after the task is executed by passing a pydra.engine.spec.TaskHooks object to the hooks keyword arg. The callable should take the pydra.engine.core.Job object as its only argument and return None. The available hooks to attach functions are:

  • pre_run: before the task cache directory is created

  • pre_run_task: after the cache directory has been created and the inputs resolved but before the task is executed

  • post_run_task: after the task has been run and the outputs collected

  • post_run: after the cache directory has been finalised

QUESTION: What are these hooks intended for? Should the post_run_task hook be run before the outputs have been collected?

[8]:
from pydra.engine.job import Job
from pydra.engine.hooks import TaskHooks
from pydra.engine.result import Result
import os
import platform


def notify_task_completion(task: Job, result: Result):
    # Print a message to the terminal
    print(f"Job completed! Results are stored in {str(task.cache_dir)!r}")

    # Platform-specific notifications
    if platform.system() == "Darwin":  # macOS
        os.system(
            'osascript -e \'display notification "Job has completed successfully!" '
            'with title "Job Notification"\''
        )
    elif platform.system() == "Linux":  # Linux
        os.system('notify-send "Job Notification" "Job has completed successfully!"')
    elif platform.system() == "Windows":  # Windows
        os.system('msg * "Job has completed successfully!"')


# Run the task to resample all NIfTI files
outputs = mrgrid(
    hooks=TaskHooks(post_run=notify_task_completion), cache_root=tempfile.mkdtemp()
)

# Print the locations of the output files
pprint(outputs.out_file)
Job completed! Results are stored in '/tmp/tmp6ra1pbac/shell-cc1c66c8695b0df72632ee1e0c950c20'
ImageFormat('/tmp/tmp6ra1pbac/shell-cc1c66c8695b0df72632ee1e0c950c20/out_file.mif')

Provenance and auditing

Work in progress…