2. FunctionTask#

import nest_asyncio

nest_asyncio.apply()

A FunctionTask is a Task that can be created from every python function by using pydra decorator: pydra.mark.task:

import pydra

@pydra.mark.task
def add_var(a, b):
    return a + b

Once we decorate the function, we can create a pydra Task and specify the input:

task0 = add_var(a=4, b=5)

We can check the type of task0:

type(task0)
pydra.engine.task.FunctionTask

and we can check if the task has correct values of a and b, they should be saved in the task inputs:

print(f'a = {task0.inputs.a}')
print(f'b = {task0.inputs.b}')
a = 4
b = 5

We can also check content of entire inputs:

task0.inputs
Inputs(a=4, b=5, _func=b'\x80\x05\x95\xbb\x01\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_make_function\x94\x93\x94(h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x02K\x00K\x00K\x02K\x02K\x03C\x0c\x97\x00|\x00|\x01z\x00\x00\x00S\x00\x94N\x85\x94)\x8c\x01a\x94\x8c\x01b\x94\x86\x94\x8c!/tmp/ipykernel_6452/3542708107.py\x94\x8c\x07add_var\x94h\x0eK\x03C\x0b\x80\x00\xe0\x0b\x0c\x88q\x895\x80L\x94C\x00\x94))t\x94R\x94}\x94(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94uNNNt\x94R\x94h\x00\x8c\x12_function_setstate\x94\x93\x94h\x18}\x94}\x94(h\x15h\x0e\x8c\x0c__qualname__\x94h\x0e\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x16\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0.')

As you could see, task.inputs contains also information about the function, that is an inseparable part of the FunctionTask.

Once we have the task with set input, we can run it. Since Task is a “callable object”, we can use the syntax:

task0()
Result(output=Output(out=9), runtime=None, errored=False)

As you can see, the result was returned right away, but we can also access it later:

task0.result()
Result(output=Output(out=9), runtime=None, errored=False)

Result contains more than just an output, so if we want to get the task output, we can type:

result = task0.result()
result.output.out
9

And if we want to see the input that was used in the task, we can set an optional argument return_inputs to True.

task0.result(return_inputs=True)
({'add_var.a': 4, 'add_var.b': 5},
 Result(output=Output(out=9), runtime=None, errored=False))

2.1. Type-checking#

2.1.1. What is Type-checking?#

Type-checking is verifying the type of a value at compile or run time. It ensures that operations or assignments to variables are semantically meaningful and can be executed without type errors, enhancing code reliability and maintainability.

2.1.2. Why Use Type-checking?#

  1. Error Prevention: Type-checking helps catch type mismatches early, preventing potential runtime errors.

  2. Improved Readability: Type annotations make understanding what types of values a function expects and returns more straightforward.

  3. Better Documentation: Explicitly stating expected types acts as inline documentation, simplifying code collaboration and review.

  4. Optimized Performance: Type-related optimizations can be made during compilation when types are explicitly specified.

2.1.3. How is Type-checking Implemented in Pydra?#

2.1.3.1. Static Type-Checking#

Static type-checking is done using Python’s type annotations. You annotate the types of your function arguments and the return type and then use a tool like mypy to statically check if you’re using the function correctly according to those annotations.

@pydra.mark.task
def add(a: int, b: int) -> int:
    return a + b
# This usage is correct according to static type hints:
task1a = add(a=5, b=3)
task1a()
Result(output=Output(out=8), runtime=None, errored=False)
# This usage is incorrect according to static type hints:
task1b = add(a="hello", b="world")
task1b()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[13], line 2
      1 # This usage is incorrect according to static type hints:
----> 2 task1b = add(a="hello", b="world")
      3 task1b()

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/mark/functions.py:46, in task.<locals>.decorate(**kwargs)
     44 @wraps(func)
     45 def decorate(**kwargs):
---> 46     return FunctionTask(func=func, **kwargs)

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/engine/task.py:155, in FunctionTask.__init__(self, func, audit_flags, cache_dir, cache_locations, input_spec, cont_dim, messenger_args, messengers, name, output_spec, rerun, **kwargs)
    153 if name is None:
    154     name = func.__name__
--> 155 super().__init__(
    156     name,
    157     inputs=kwargs,
    158     cont_dim=cont_dim,
    159     audit_flags=audit_flags,
    160     messengers=messengers,
    161     messenger_args=messenger_args,
    162     cache_dir=cache_dir,
    163     cache_locations=cache_locations,
    164     rerun=rerun,
    165 )
    166 if output_spec is None:
    167     name = "Output"

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/engine/core.py:175, in TaskBase.__init__(self, name, audit_flags, cache_dir, cache_locations, inputs, cont_dim, messenger_args, messengers, rerun)
    172             raise ValueError(f"Unknown input set {inputs!r}")
    173         inputs = self._input_sets[inputs]
--> 175 self.inputs = attr.evolve(self.inputs, **inputs)
    177 # checking if metadata is set properly
    178 self.inputs.check_metadata()

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/attr/_funcs.py:413, in evolve(*args, **changes)
    410     if init_name not in changes:
    411         changes[init_name] = getattr(inst, attr_name)
--> 413 return cls(**changes)

File <attrs generated init pydra.engine.helpers.Inputs>:3, in __init__(self, a, b, func)
      1 def __init__(self, *, a, b, func=attr_dict['_func'].default):
      2     _setattr = _cached_setattr_get(self)
----> 3     _setattr('a', __attr_converter_a(a))
      4     _setattr('b', __attr_converter_b(b))
      5     _setattr('_func', __attr_converter__func(func))

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:154, in TypeParser.__call__(self, obj)
    152     coerced = StateArray(self(o) for o in obj)  # type: ignore[assignment]
    153 else:
--> 154     coerced = self.coerce(obj)
    155 return coerced

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:294, in TypeParser.coerce(self, object_)
    287         msg = (
    288             f" (part of coercion from {object_} to {self.pattern}"
    289             if obj is not object_
    290             else ""
    291         )
    292         raise TypeError(f"Cannot coerce {obj!r} into {type_}{msg}") from e
--> 294 return expand_and_coerce(object_, self.pattern)

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:167, in TypeParser.coerce.<locals>.expand_and_coerce(obj, pattern)
    165     return attr.NOTHING
    166 if not isinstance(pattern, tuple):
--> 167     return coerce_basic(obj, pattern)
    168 origin, pattern_args = pattern
    169 if origin is ty.Union:

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:203, in TypeParser.coerce.<locals>.coerce_basic(obj, pattern)
    201 if self.is_instance(obj, pattern):
    202     return obj
--> 203 self.check_coercible(obj, pattern)
    204 return coerce_obj(obj, pattern)

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:470, in TypeParser.check_coercible(self, source, target)
    467         return t._name  # typing generics for Python < 3.10
    469 if not matches_criteria(self.coercible):
--> 470     raise TypeError(
    471         f"Cannot coerce {repr(source)} into {target} as the coercion doesn't match "
    472         f"any of the explicit inclusion criteria: "
    473         + ", ".join(
    474             f"{type_name(s)} -> {type_name(t)}" for s, t in self.coercible
    475         )
    476     )
    477 matches_not_coercible = matches_criteria(self.not_coercible)
    478 if matches_not_coercible:

TypeError: Cannot coerce 'hello' into <class 'int'> as the coercion doesn't match any of the explicit inclusion criteria: Sequence -> Sequence, Mapping -> Mapping, Path -> PathLike, str -> PathLike, PathLike -> Path, PathLike -> str, Any -> MultiInputObj, int -> float, integer -> int, floating -> float, bool_ -> bool, integer -> float, character -> str, complexfloating -> complex, bytes_ -> bytes, ndarray -> Sequence, Sequence -> ndarray

2.1.3.2. Dynamic Type-Checking#

Dynamic type-checking is done at runtime. Add dynamic type checks if you want to enforce types when the function is executed.

@pydra.mark.task
def add(a, b):
    if not (isinstance(a, int) and isinstance(b, int)):
        raise TypeError("Both inputs should be integers.")
    return a + b
# This usage is correct and will not raise a runtime error:
task1c = add(a=5, b=3)
task1c()
Result(output=Output(out=8), runtime=None, errored=False)
# This usage is incorrect and will raise a runtime TypeError:
task1d = add(a="hello", b="world")
task1d()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[16], line 3
      1 # This usage is incorrect and will raise a runtime TypeError:
      2 task1d = add(a="hello", b="world")
----> 3 task1d()

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/engine/core.py:454, in TaskBase.__call__(self, submitter, plugin, plugin_kwargs, rerun, **kwargs)
    452         res = sub(self)
    453 else:  # tasks without state could be run without a submitter
--> 454     res = self._run(rerun=rerun, **kwargs)
    455 return res

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/engine/core.py:529, in TaskBase._run(self, rerun, **kwargs)
    527 try:
    528     self.audit.monitor()
--> 529     self._run_task()
    530     result.output = self._collect_outputs(output_dir=output_dir)
    531 except Exception:

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/engine/task.py:202, in FunctionTask._run_task(self)
    200 del inputs["_func"]
    201 self.output_ = None
--> 202 output = cp.loads(self.inputs._func)(**inputs)
    203 output_names = [el[0] for el in self.output_spec.fields]
    204 if output is None:

Cell In[14], line 4, in add(a, b)
      1 @pydra.mark.task
      2 def add(a, b):
      3     if not (isinstance(a, int) and isinstance(b, int)):
----> 4         raise TypeError("Both inputs should be integers.")
      5     return a + b

TypeError: Both inputs should be integers.

2.1.3.3. Checking Complex Types#

For more complex types like lists, dictionaries, or custom objects, we can use type hints combined with dynamic checks.

from typing import List, Tuple

@pydra.mark.task
def sum_of_pairs(pairs: List[Tuple[int, int]]) -> List[int]:
    if not all(isinstance(pair, Tuple) and len(pair) == 2 for pair in pairs):
        raise ValueError("Input should be a list of pairs (tuples with 2 integers each).")
    return [sum(pair) for pair in pairs]
# Correct usage
task1e = sum_of_pairs(pairs=[(1, 2), (3, 4)])  
task1e()
Result(output=Output(out=[3, 7]), runtime=None, errored=False)
# This will raise a ValueError
task1f = sum_of_pairs(pairs=[(1, 2), (3, "4")])  
task1f()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[19], line 2
      1 # This will raise a ValueError
----> 2 task1f = sum_of_pairs(pairs=[(1, 2), (3, "4")])  
      3 task1f()

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/mark/functions.py:46, in task.<locals>.decorate(**kwargs)
     44 @wraps(func)
     45 def decorate(**kwargs):
---> 46     return FunctionTask(func=func, **kwargs)

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/engine/task.py:155, in FunctionTask.__init__(self, func, audit_flags, cache_dir, cache_locations, input_spec, cont_dim, messenger_args, messengers, name, output_spec, rerun, **kwargs)
    153 if name is None:
    154     name = func.__name__
--> 155 super().__init__(
    156     name,
    157     inputs=kwargs,
    158     cont_dim=cont_dim,
    159     audit_flags=audit_flags,
    160     messengers=messengers,
    161     messenger_args=messenger_args,
    162     cache_dir=cache_dir,
    163     cache_locations=cache_locations,
    164     rerun=rerun,
    165 )
    166 if output_spec is None:
    167     name = "Output"

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/engine/core.py:175, in TaskBase.__init__(self, name, audit_flags, cache_dir, cache_locations, inputs, cont_dim, messenger_args, messengers, rerun)
    172             raise ValueError(f"Unknown input set {inputs!r}")
    173         inputs = self._input_sets[inputs]
--> 175 self.inputs = attr.evolve(self.inputs, **inputs)
    177 # checking if metadata is set properly
    178 self.inputs.check_metadata()

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/attr/_funcs.py:413, in evolve(*args, **changes)
    410     if init_name not in changes:
    411         changes[init_name] = getattr(inst, attr_name)
--> 413 return cls(**changes)

File <attrs generated init pydra.engine.helpers.Inputs-1>:3, in __init__(self, pairs, func)
      1 def __init__(self, *, pairs, func=attr_dict['_func'].default):
      2     _setattr = _cached_setattr_get(self)
----> 3     _setattr('pairs', __attr_converter_pairs(pairs))
      4     _setattr('_func', __attr_converter__func(func))

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:154, in TypeParser.__call__(self, obj)
    152     coerced = StateArray(self(o) for o in obj)  # type: ignore[assignment]
    153 else:
--> 154     coerced = self.coerce(obj)
    155 return coerced

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:294, in TypeParser.coerce(self, object_)
    287         msg = (
    288             f" (part of coercion from {object_} to {self.pattern}"
    289             if obj is not object_
    290             else ""
    291         )
    292         raise TypeError(f"Cannot coerce {obj!r} into {type_}{msg}") from e
--> 294 return expand_and_coerce(object_, self.pattern)

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:194, in TypeParser.coerce.<locals>.expand_and_coerce(obj, pattern)
    192     return coerce_tuple(type_, obj_args, pattern_args)
    193 if issubclass(origin, ty.Iterable):
--> 194     return coerce_sequence(type_, obj_args, pattern_args)
    195 assert False, f"Coercion from {obj} to {pattern} is not handled"

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:270, in TypeParser.coerce.<locals>.coerce_sequence(type_, obj_args, pattern_args)
    267 """Coerce a non-tuple sequence object (e.g. list, ...)"""
    268 assert len(pattern_args) == 1
    269 return coerce_obj(
--> 270     [expand_and_coerce(o, pattern_args[0]) for o in obj_args], type_
    271 )

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:270, in <listcomp>(.0)
    267 """Coerce a non-tuple sequence object (e.g. list, ...)"""
    268 assert len(pattern_args) == 1
    269 return coerce_obj(
--> 270     [expand_and_coerce(o, pattern_args[0]) for o in obj_args], type_
    271 )

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:192, in TypeParser.coerce.<locals>.expand_and_coerce(obj, pattern)
    188     raise TypeError(
    189         f"Could not coerce to {type_} as {obj} is not iterable{msg}"
    190     ) from e
    191 if issubclass(origin, tuple):
--> 192     return coerce_tuple(type_, obj_args, pattern_args)
    193 if issubclass(origin, ty.Iterable):
    194     return coerce_sequence(type_, obj_args, pattern_args)

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:261, in TypeParser.coerce.<locals>.coerce_tuple(type_, obj_args, pattern_args)
    255 elif len(pattern_args) != len(obj_args):
    256     raise TypeError(
    257         f"Incorrect number of items in tuple, expected "
    258         f"{len(pattern_args)}, got {len(obj_args)}"
    259     )
    260 return coerce_obj(
--> 261     [expand_and_coerce(o, p) for o, p in zip(obj_args, pattern_args)], type_
    262 )

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:261, in <listcomp>(.0)
    255 elif len(pattern_args) != len(obj_args):
    256     raise TypeError(
    257         f"Incorrect number of items in tuple, expected "
    258         f"{len(pattern_args)}, got {len(obj_args)}"
    259     )
    260 return coerce_obj(
--> 261     [expand_and_coerce(o, p) for o, p in zip(obj_args, pattern_args)], type_
    262 )

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:167, in TypeParser.coerce.<locals>.expand_and_coerce(obj, pattern)
    165     return attr.NOTHING
    166 if not isinstance(pattern, tuple):
--> 167     return coerce_basic(obj, pattern)
    168 origin, pattern_args = pattern
    169 if origin is ty.Union:

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:203, in TypeParser.coerce.<locals>.coerce_basic(obj, pattern)
    201 if self.is_instance(obj, pattern):
    202     return obj
--> 203 self.check_coercible(obj, pattern)
    204 return coerce_obj(obj, pattern)

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/utils/typing.py:470, in TypeParser.check_coercible(self, source, target)
    467         return t._name  # typing generics for Python < 3.10
    469 if not matches_criteria(self.coercible):
--> 470     raise TypeError(
    471         f"Cannot coerce {repr(source)} into {target} as the coercion doesn't match "
    472         f"any of the explicit inclusion criteria: "
    473         + ", ".join(
    474             f"{type_name(s)} -> {type_name(t)}" for s, t in self.coercible
    475         )
    476     )
    477 matches_not_coercible = matches_criteria(self.not_coercible)
    478 if matches_not_coercible:

TypeError: Cannot coerce '4' into <class 'int'> as the coercion doesn't match any of the explicit inclusion criteria: Sequence -> Sequence, Mapping -> Mapping, Path -> PathLike, str -> PathLike, PathLike -> Path, PathLike -> str, Any -> MultiInputObj, int -> float, integer -> int, floating -> float, bool_ -> bool, integer -> float, character -> str, complexfloating -> complex, bytes_ -> bytes, ndarray -> Sequence, Sequence -> ndarray

2.2. Customizing output names#

Note, that “out” is the default name for the task output, but we can always customize it. There are two ways of doing it: using python function annotation and using another pydra decorator:

Let’s start from the function annotation:

import typing as ty

@pydra.mark.task
def add_var_an(a: int, b: int) -> {'sum_a_b': int}:
    return a + b


task2a = add_var_an(a=4, b=5)
task2a()
Result(output=Output(sum_a_b=9), runtime=None, errored=False)

The annotation might be very useful to specify the output names when the function returns multiple values.

@pydra.mark.task
def modf_an(a: float) -> {'fractional': ty.Any, 'integer': ty.Any}:
    import math

    return math.modf(a)


task2b = modf_an(a=3.5)
task2b()
Result(output=Output(fractional=0.5, integer=3.0), runtime=None, errored=False)

The second way of customizing the output requires another decorator - pydra.mark.annotate

@pydra.mark.task
@pydra.mark.annotate({'return': {'fractional': ty.Any, 'integer': ty.Any}})
def modf(a: float):
    import math

    return math.modf(a)

task2c = modf(a=3.5)
task2c()
Result(output=Output(fractional=0.5, integer=3.0), runtime=None, errored=False)

Note, that the order of the pydra decorators is important!

2.3. Setting the input#

We don’t have to provide the input when we create a task, we can always set it later:

task3 = add_var()
task3.inputs.a = 4
task3.inputs.b = 5
task3()
Result(output=Output(out=9), runtime=None, errored=False)

If we don’t specify the input, attr.NOTHING will be used as the default value

task3a = add_var()
task3a.inputs.a = 4

# importing attr library, and checking the type of `b`
import attr

task3a.inputs.b == attr.NOTHING
True

And if we try to run the task, an error will be raised:

task3a()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[25], line 1
----> 1 task3a()

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/engine/core.py:454, in TaskBase.__call__(self, submitter, plugin, plugin_kwargs, rerun, **kwargs)
    452         res = sub(self)
    453 else:  # tasks without state could be run without a submitter
--> 454     res = self._run(rerun=rerun, **kwargs)
    455 return res

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/engine/core.py:529, in TaskBase._run(self, rerun, **kwargs)
    527 try:
    528     self.audit.monitor()
--> 529     self._run_task()
    530     result.output = self._collect_outputs(output_dir=output_dir)
    531 except Exception:

File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.11/site-packages/pydra/engine/task.py:202, in FunctionTask._run_task(self)
    200 del inputs["_func"]
    201 self.output_ = None
--> 202 output = cp.loads(self.inputs._func)(**inputs)
    203 output_names = [el[0] for el in self.output_spec.fields]
    204 if output is None:

Cell In[2], line 5, in add_var(a, b)
      3 @pydra.mark.task
      4 def add_var(a, b):
----> 5     return a + b

TypeError: unsupported operand type(s) for +: 'int' and '_Nothing'

2.4. Output directory and caching the results#

After running the task, we can check where the output directory with the results was created:

task3.output_dir
PosixPath('/tmp/tmpgccjsdko/FunctionTask_c3a2dd2f4d8f08f1286f136c748dd2ed')

Within the directory you can find the file with the results: _result.pklz.

import os
os.listdir(task3.output_dir)
['_task.pklz', '_result.pklz']

But we can also provide the path where we want to store the results. If a path is provided for the cache directory, then pydra will use the cached results of a node instead of recomputing the result. Let’s create a temporary directory and a specific subdirectory “task4”:

from tempfile import mkdtemp
from pathlib import Path
cache_dir_tmp = Path(mkdtemp()) / 'task4'
print(cache_dir_tmp)
/tmp/tmpqm3swbu2/task4

Now we can pass this path to the argument of FunctionTask - cache_dir. To observe the execution time, we specify a function that is sleeping for 5s:

@pydra.mark.task
def add_var_wait(a: int, b: int):
    import time

    time.sleep(5)
    return a + b

task4 = add_var_wait(a=4, b=6, cache_dir=cache_dir_tmp)

If you’re running the cell first time, it should take around 5s.

task4()
task4.result()
Result(output=Output(out=10), runtime=None, errored=False)

We can check output_dir of our task, it should contain the path of cache_dir_tmp and the last part contains the name of the task class FunctionTask and the task checksum:

task4.output_dir
PosixPath('/tmp/tmpqm3swbu2/task4/FunctionTask_3f42c96bdca5034892978e28d2f91773')

Let’s see what happens when we defined identical task again with the same cache_dir:

task4a = add_var_wait(a=4, b=6, cache_dir=cache_dir_tmp)
task4a()
Result(output=Output(out=10), runtime=None, errored=False)

This time the result should be ready right away! pydra uses available results and do not recompute the task.

pydra not only checks for the results in cache_dir, but you can provide a list of other locations that should be checked. Let’s create another directory that will be used as cache_dir and previous working directory will be used in cache_locations.

cache_dir_tmp_new = Path(mkdtemp()) / 'task4b'

task4b = add_var_wait(
    a=4, b=6, cache_dir=cache_dir_tmp_new, cache_locations=[cache_dir_tmp]
)
task4b()
Result(output=Output(out=10), runtime=None, errored=False)

This time the results should be also returned quickly! And we can check that task4b.output_dir was not created:

task4b.output_dir.exists()
False

If you want to rerun the task regardless having already the results, you can set rerun to True. The task will take several seconds and new output_dir will be created:

cache_dir_tmp_new = Path(mkdtemp()) / 'task4c'

task4c = add_var_wait(
    a=4, b=6, cache_dir=cache_dir_tmp_new, cache_locations=[cache_dir_tmp]
)
task4c(rerun=True)

task4c.output_dir.exists()
True

If we update the input of the task, and run again, the new directory will be created and task will be recomputed:

task4b.inputs.a = 1
print(task4b())
print(task4b.output_dir.exists())
Result(output=Output(out=7), runtime=None, errored=False)
True

and when we check the output_dir, we can see that it’s different than last time:

task4b.output_dir
PosixPath('/tmp/tmpk0srwvs8/task4b/FunctionTask_db43fd653c264ada9ddc5dae0774d20c')

This is because, the checksum changes when we change either input or function.

2.4.1. Exercise 1#

Create a task that take a list of numbers as an input and returns two fields: mean with the mean value and std with the standard deviation value.

Hide code cell content
@pydra.mark.task
@pydra.mark.annotate({'return': {'mean': ty.Any, 'std': ty.Any}})
def mean_dev(my_list: List):
    import statistics as st

    return st.mean(my_list), st.stdev(my_list)

my_task = mean_dev(my_list=[2, 2, 2])
my_task()
my_task.result()
Result(output=Output(mean=2, std=0.0), runtime=None, errored=False)
# write your solution here (you can use statistics module)

2.5. Using Audit#

pydra can record various run time information, including the workflow provenance, by setting audit_flags and the type of messengers.

AuditFlag.RESOURCE allows you to monitor resource usage for the Task, while AuditFlag.PROV tracks the provenance of the Task.

from pydra.utils.messenger import AuditFlag, PrintMessenger

task5 = add_var(a=4, b=5, audit_flags=AuditFlag.RESOURCE)
task5()
task5.result()
Result(output=Output(out=9), runtime=Runtime(rss_peak_gb=0.10631561328125, vms_peak_gb=0.9318733212890625, cpu_peak_percent=247.5), errored=False)

One can turn on both audit flags using AuditFlag.ALL, and print the messages on the terminal using the PrintMessenger.

task5 = add_var(
    a=4, b=5, audit_flags=AuditFlag.ALL, messengers=PrintMessenger()
)
task5()
task5.result()
id: 12044c5660c34c0ba99f19556758dda0
{
  "@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
  "@id": "uid:c9be22ea8057457e9e67a664d4bb8820",
  "@type": "task",
  "startedAtTime": "2023-10-17T01:16:48.268073",
  "executedBy": "uid:aebfcf9b75f140ef819dbf29363d7b3d"
}
id: d1a3bd088a2b408ba4ad723b00e2296a
{
  "@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
  "@id": "uid:c9be22ea8057457e9e67a664d4bb8820",
  "@type": "task",
  "Label": "add_var",
  "Command": null,
  "StartedAtTime": "2023-10-17T01:16:48.473775",
  "AssociatedWith": null
}
id: 5d9aae9e266848d6b7bf33b130ca0c58
{
  "@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
  "@id": "uid:a0cb069945ca4a218c7a2c53dbf2dc86",
  "@type": "monitor",
  "startedAtTime": "2023-10-17T01:16:48.474343",
  "wasStartedBy": "uid:c9be22ea8057457e9e67a664d4bb8820"
}
id: 1ac55861990243e69606c502a96af1e2
{
  "@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
  "@id": "uid:a0cb069945ca4a218c7a2c53dbf2dc86",
  "endedAtTime": "2023-10-17T01:16:48.480454",
  "wasEndedBy": "uid:c9be22ea8057457e9e67a664d4bb8820"
}
id: a5f28a1da9ac47d48b1c089a67dd8721
{
  "@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
  "rss_peak_gb": 0.1045722958984375,
  "vms_peak_gb": 0.9318733212890625,
  "cpu_peak_percent": 209.4,
  "@id": "uid:691bac5b2d3646fbab79d3cc4c9cc896",
  "@type": "runtime",
  "prov:wasGeneratedBy": "uid:c9be22ea8057457e9e67a664d4bb8820"
}
id: ee03402db48942978a0eaeeb02562932
{
  "@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
  "@type": "prov:Generation",
  "entity_generated": "uid:691bac5b2d3646fbab79d3cc4c9cc896",
  "hadActivity": "uid:a0cb069945ca4a218c7a2c53dbf2dc86"
}
id: 14d62803aca246839b6f49e7c578051c
{
  "@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
  "@id": "uid:c9be22ea8057457e9e67a664d4bb8820",
  "endedAtTime": "2023-10-17T01:16:48.480634",
  "errored": false
}
Result(output=Output(out=9), runtime=Runtime(rss_peak_gb=0.1045722958984375, vms_peak_gb=0.9318733212890625, cpu_peak_percent=209.4), errored=False)