2. FunctionTask#
import nest_asyncio
nest_asyncio.apply()
A FunctionTask
is a Task
that can be created from every python function by using pydra decorator: pydra.mark.task
:
import pydra
@pydra.mark.task
def add_var(a, b):
return a + b
Once we decorate the function, we can create a pydra Task
and specify the input:
task0 = add_var(a=4, b=5)
We can check the type of task0
:
type(task0)
pydra.engine.task.FunctionTask
and we can check if the task has correct values of a
and b
, they should be saved in the task inputs
:
print(f'a = {task0.inputs.a}')
print(f'b = {task0.inputs.b}')
a = 4
b = 5
We can also check content of entire inputs
:
task0.inputs
Inputs(a=4, b=5, _func=b'\x80\x05\x95\xcf\x01\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_make_function\x94\x93\x94(h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x02K\x00K\x00K\x02K\x02K\x03C\n\x95\x00X\x01-\x00\x00\x00$\x00\x94N\x85\x94)\x8c\x01a\x94\x8c\x01b\x94\x86\x94\x8c!/tmp/ipykernel_2291/3542708107.py\x94\x8c\x07add_var\x94\x8c\x07add_var\x94K\x03C\t\x80\x00\xe0\x0b\x0c\x895\x80L\x94C\x00\x94))t\x94R\x94}\x94(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94uNNNt\x94R\x94h\x00\x8c\x12_function_setstate\x94\x93\x94h\x19}\x94}\x94(h\x16\x8c\x07add_var\x94\x8c\x0c__qualname__\x94\x8c\x07add_var\x94\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x17\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0.')
As you could see, task.inputs
contains also information about the function, that is an inseparable part of the FunctionTask
.
Once we have the task with set input, we can run it. Since Task
is a “callable object”, we can use the syntax:
task0()
Result(output=Output(out=9), runtime=None, errored=False)
As you can see, the result was returned right away, but we can also access it later:
task0.result()
Result(output=Output(out=9), runtime=None, errored=False)
Result
contains more than just an output, so if we want to get the task output, we can type:
result = task0.result()
result.output.out
9
And if we want to see the input that was used in the task, we can set an optional argument return_inputs
to True.
task0.result(return_inputs=True)
({'add_var.a': 4, 'add_var.b': 5},
Result(output=Output(out=9), runtime=None, errored=False))
2.1. Type-checking#
2.1.1. What is Type-checking?#
Type-checking is verifying the type of a value at compile or run time. It ensures that operations or assignments to variables are semantically meaningful and can be executed without type errors, enhancing code reliability and maintainability.
2.1.2. Why Use Type-checking?#
Error Prevention: Type-checking helps catch type mismatches early, preventing potential runtime errors.
Improved Readability: Type annotations make understanding what types of values a function expects and returns more straightforward.
Better Documentation: Explicitly stating expected types acts as inline documentation, simplifying code collaboration and review.
Optimized Performance: Type-related optimizations can be made during compilation when types are explicitly specified.
2.1.3. How is Type-checking Implemented in Pydra?#
2.1.3.1. Static Type-Checking#
Static type-checking is done using Python’s type annotations. You annotate the types of your function arguments and the return type and then use a tool like mypy
to statically check if you’re using the function correctly according to those annotations.
@pydra.mark.task
def add(a: int, b: int) -> int:
return a + b
# This usage is correct according to static type hints:
task1a = add(a=5, b=3)
task1a()
Result(output=Output(out=8), runtime=None, errored=False)
# This usage is incorrect according to static type hints:
task1b = add(a="hello", b="world")
task1b()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[13], line 2
1 # This usage is incorrect according to static type hints:
----> 2 task1b = add(a="hello", b="world")
3 task1b()
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/mark/functions.py:46, in task.<locals>.decorate(**kwargs)
44 @wraps(func)
45 def decorate(**kwargs):
---> 46 return FunctionTask(func=func, **kwargs)
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/task.py:155, in FunctionTask.__init__(self, func, audit_flags, cache_dir, cache_locations, input_spec, cont_dim, messenger_args, messengers, name, output_spec, rerun, **kwargs)
153 if name is None:
154 name = func.__name__
--> 155 super().__init__(
156 name,
157 inputs=kwargs,
158 cont_dim=cont_dim,
159 audit_flags=audit_flags,
160 messengers=messengers,
161 messenger_args=messenger_args,
162 cache_dir=cache_dir,
163 cache_locations=cache_locations,
164 rerun=rerun,
165 )
166 if output_spec is None:
167 name = "Output"
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py:175, in TaskBase.__init__(self, name, audit_flags, cache_dir, cache_locations, inputs, cont_dim, messenger_args, messengers, rerun)
172 raise ValueError(f"Unknown input set {inputs!r}")
173 inputs = self._input_sets[inputs]
--> 175 self.inputs = attr.evolve(self.inputs, **inputs)
177 # checking if metadata is set properly
178 self.inputs.check_metadata()
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/attr/_make.py:624, in evolve(*args, **changes)
621 if init_name not in changes:
622 changes[init_name] = getattr(inst, attr_name)
--> 624 return cls(**changes)
File <attrs generated init pydra.engine.helpers.Inputs>:3, in __init__(self, a, b, func)
1 def __init__(self, *, a, b, func=attr_dict['_func'].default):
2 _setattr = _cached_setattr_get(self)
----> 3 _setattr('a', __attr_converter_a(a))
4 _setattr('b', __attr_converter_b(b))
5 _setattr('_func', __attr_converter__func(func))
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:154, in TypeParser.__call__(self, obj)
152 coerced = StateArray(self(o) for o in obj) # type: ignore[assignment]
153 else:
--> 154 coerced = self.coerce(obj)
155 return coerced
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:294, in TypeParser.coerce(self, object_)
287 msg = (
288 f" (part of coercion from {object_} to {self.pattern}"
289 if obj is not object_
290 else ""
291 )
292 raise TypeError(f"Cannot coerce {obj!r} into {type_}{msg}") from e
--> 294 return expand_and_coerce(object_, self.pattern)
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:167, in TypeParser.coerce.<locals>.expand_and_coerce(obj, pattern)
165 return attr.NOTHING
166 if not isinstance(pattern, tuple):
--> 167 return coerce_basic(obj, pattern)
168 origin, pattern_args = pattern
169 if origin is ty.Union:
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:203, in TypeParser.coerce.<locals>.coerce_basic(obj, pattern)
201 if self.is_instance(obj, pattern):
202 return obj
--> 203 self.check_coercible(obj, pattern)
204 return coerce_obj(obj, pattern)
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:470, in TypeParser.check_coercible(self, source, target)
467 return t._name # typing generics for Python < 3.10
469 if not matches_criteria(self.coercible):
--> 470 raise TypeError(
471 f"Cannot coerce {repr(source)} into {target} as the coercion doesn't match "
472 f"any of the explicit inclusion criteria: "
473 + ", ".join(
474 f"{type_name(s)} -> {type_name(t)}" for s, t in self.coercible
475 )
476 )
477 matches_not_coercible = matches_criteria(self.not_coercible)
478 if matches_not_coercible:
TypeError: Cannot coerce 'hello' into <class 'int'> as the coercion doesn't match any of the explicit inclusion criteria: Sequence -> Sequence, Mapping -> Mapping, Path -> PathLike, str -> PathLike, PathLike -> Path, PathLike -> str, Any -> MultiInputObj, int -> float, integer -> int, floating -> float, bool -> bool, integer -> float, character -> str, complexfloating -> complex, bytes_ -> bytes, ndarray -> Sequence, Sequence -> ndarray
2.1.3.2. Dynamic Type-Checking#
Dynamic type-checking is done at runtime. Add dynamic type checks if you want to enforce types when the function is executed.
@pydra.mark.task
def add(a, b):
if not (isinstance(a, int) and isinstance(b, int)):
raise TypeError("Both inputs should be integers.")
return a + b
# This usage is correct and will not raise a runtime error:
task1c = add(a=5, b=3)
task1c()
Result(output=Output(out=8), runtime=None, errored=False)
# This usage is incorrect and will raise a runtime TypeError:
task1d = add(a="hello", b="world")
task1d()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[16], line 3
1 # This usage is incorrect and will raise a runtime TypeError:
2 task1d = add(a="hello", b="world")
----> 3 task1d()
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py:454, in TaskBase.__call__(self, submitter, plugin, plugin_kwargs, rerun, **kwargs)
452 res = sub(self)
453 else: # tasks without state could be run without a submitter
--> 454 res = self._run(rerun=rerun, **kwargs)
455 return res
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py:529, in TaskBase._run(self, rerun, **kwargs)
527 try:
528 self.audit.monitor()
--> 529 self._run_task()
530 result.output = self._collect_outputs(output_dir=output_dir)
531 except Exception:
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/task.py:202, in FunctionTask._run_task(self)
200 del inputs["_func"]
201 self.output_ = None
--> 202 output = cp.loads(self.inputs._func)(**inputs)
203 output_names = [el[0] for el in self.output_spec.fields]
204 if output is None:
Cell In[14], line 4, in add(a, b)
1 @pydra.mark.task
2 def add(a, b):
3 if not (isinstance(a, int) and isinstance(b, int)):
----> 4 raise TypeError("Both inputs should be integers.")
5 return a + b
TypeError: Both inputs should be integers.
2.1.3.3. Checking Complex Types#
For more complex types like lists, dictionaries, or custom objects, we can use type hints combined with dynamic checks.
from typing import List, Tuple
@pydra.mark.task
def sum_of_pairs(pairs: List[Tuple[int, int]]) -> List[int]:
if not all(isinstance(pair, Tuple) and len(pair) == 2 for pair in pairs):
raise ValueError("Input should be a list of pairs (tuples with 2 integers each).")
return [sum(pair) for pair in pairs]
# Correct usage
task1e = sum_of_pairs(pairs=[(1, 2), (3, 4)])
task1e()
Result(output=Output(out=[3, 7]), runtime=None, errored=False)
# This will raise a ValueError
task1f = sum_of_pairs(pairs=[(1, 2), (3, "4")])
task1f()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[19], line 2
1 # This will raise a ValueError
----> 2 task1f = sum_of_pairs(pairs=[(1, 2), (3, "4")])
3 task1f()
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/mark/functions.py:46, in task.<locals>.decorate(**kwargs)
44 @wraps(func)
45 def decorate(**kwargs):
---> 46 return FunctionTask(func=func, **kwargs)
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/task.py:155, in FunctionTask.__init__(self, func, audit_flags, cache_dir, cache_locations, input_spec, cont_dim, messenger_args, messengers, name, output_spec, rerun, **kwargs)
153 if name is None:
154 name = func.__name__
--> 155 super().__init__(
156 name,
157 inputs=kwargs,
158 cont_dim=cont_dim,
159 audit_flags=audit_flags,
160 messengers=messengers,
161 messenger_args=messenger_args,
162 cache_dir=cache_dir,
163 cache_locations=cache_locations,
164 rerun=rerun,
165 )
166 if output_spec is None:
167 name = "Output"
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py:175, in TaskBase.__init__(self, name, audit_flags, cache_dir, cache_locations, inputs, cont_dim, messenger_args, messengers, rerun)
172 raise ValueError(f"Unknown input set {inputs!r}")
173 inputs = self._input_sets[inputs]
--> 175 self.inputs = attr.evolve(self.inputs, **inputs)
177 # checking if metadata is set properly
178 self.inputs.check_metadata()
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/attr/_make.py:624, in evolve(*args, **changes)
621 if init_name not in changes:
622 changes[init_name] = getattr(inst, attr_name)
--> 624 return cls(**changes)
File <attrs generated init pydra.engine.helpers.Inputs-1>:3, in __init__(self, pairs, func)
1 def __init__(self, *, pairs, func=attr_dict['_func'].default):
2 _setattr = _cached_setattr_get(self)
----> 3 _setattr('pairs', __attr_converter_pairs(pairs))
4 _setattr('_func', __attr_converter__func(func))
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:154, in TypeParser.__call__(self, obj)
152 coerced = StateArray(self(o) for o in obj) # type: ignore[assignment]
153 else:
--> 154 coerced = self.coerce(obj)
155 return coerced
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:294, in TypeParser.coerce(self, object_)
287 msg = (
288 f" (part of coercion from {object_} to {self.pattern}"
289 if obj is not object_
290 else ""
291 )
292 raise TypeError(f"Cannot coerce {obj!r} into {type_}{msg}") from e
--> 294 return expand_and_coerce(object_, self.pattern)
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:194, in TypeParser.coerce.<locals>.expand_and_coerce(obj, pattern)
192 return coerce_tuple(type_, obj_args, pattern_args)
193 if issubclass(origin, ty.Iterable):
--> 194 return coerce_sequence(type_, obj_args, pattern_args)
195 assert False, f"Coercion from {obj} to {pattern} is not handled"
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:270, in TypeParser.coerce.<locals>.coerce_sequence(type_, obj_args, pattern_args)
267 """Coerce a non-tuple sequence object (e.g. list, ...)"""
268 assert len(pattern_args) == 1
269 return coerce_obj(
--> 270 [expand_and_coerce(o, pattern_args[0]) for o in obj_args], type_
271 )
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:192, in TypeParser.coerce.<locals>.expand_and_coerce(obj, pattern)
188 raise TypeError(
189 f"Could not coerce to {type_} as {obj} is not iterable{msg}"
190 ) from e
191 if issubclass(origin, tuple):
--> 192 return coerce_tuple(type_, obj_args, pattern_args)
193 if issubclass(origin, ty.Iterable):
194 return coerce_sequence(type_, obj_args, pattern_args)
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:261, in TypeParser.coerce.<locals>.coerce_tuple(type_, obj_args, pattern_args)
255 elif len(pattern_args) != len(obj_args):
256 raise TypeError(
257 f"Incorrect number of items in tuple, expected "
258 f"{len(pattern_args)}, got {len(obj_args)}"
259 )
260 return coerce_obj(
--> 261 [expand_and_coerce(o, p) for o, p in zip(obj_args, pattern_args)], type_
262 )
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:167, in TypeParser.coerce.<locals>.expand_and_coerce(obj, pattern)
165 return attr.NOTHING
166 if not isinstance(pattern, tuple):
--> 167 return coerce_basic(obj, pattern)
168 origin, pattern_args = pattern
169 if origin is ty.Union:
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:203, in TypeParser.coerce.<locals>.coerce_basic(obj, pattern)
201 if self.is_instance(obj, pattern):
202 return obj
--> 203 self.check_coercible(obj, pattern)
204 return coerce_obj(obj, pattern)
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/utils/typing.py:470, in TypeParser.check_coercible(self, source, target)
467 return t._name # typing generics for Python < 3.10
469 if not matches_criteria(self.coercible):
--> 470 raise TypeError(
471 f"Cannot coerce {repr(source)} into {target} as the coercion doesn't match "
472 f"any of the explicit inclusion criteria: "
473 + ", ".join(
474 f"{type_name(s)} -> {type_name(t)}" for s, t in self.coercible
475 )
476 )
477 matches_not_coercible = matches_criteria(self.not_coercible)
478 if matches_not_coercible:
TypeError: Cannot coerce '4' into <class 'int'> as the coercion doesn't match any of the explicit inclusion criteria: Sequence -> Sequence, Mapping -> Mapping, Path -> PathLike, str -> PathLike, PathLike -> Path, PathLike -> str, Any -> MultiInputObj, int -> float, integer -> int, floating -> float, bool -> bool, integer -> float, character -> str, complexfloating -> complex, bytes_ -> bytes, ndarray -> Sequence, Sequence -> ndarray
2.2. Customizing output names#
Note, that “out” is the default name for the task output, but we can always customize it. There are two ways of doing it: using python function annotation and using another pydra decorator:
Let’s start from the function annotation:
import typing as ty
@pydra.mark.task
def add_var_an(a: int, b: int) -> {'sum_a_b': int}:
return a + b
task2a = add_var_an(a=4, b=5)
task2a()
Result(output=Output(sum_a_b=9), runtime=None, errored=False)
The annotation might be very useful to specify the output names when the function returns multiple values.
@pydra.mark.task
def modf_an(a: float) -> {'fractional': ty.Any, 'integer': ty.Any}:
import math
return math.modf(a)
task2b = modf_an(a=3.5)
task2b()
Result(output=Output(fractional=0.5, integer=3.0), runtime=None, errored=False)
The second way of customizing the output requires another decorator - pydra.mark.annotate
@pydra.mark.task
@pydra.mark.annotate({'return': {'fractional': ty.Any, 'integer': ty.Any}})
def modf(a: float):
import math
return math.modf(a)
task2c = modf(a=3.5)
task2c()
Result(output=Output(fractional=0.5, integer=3.0), runtime=None, errored=False)
Note, that the order of the pydra decorators is important!
2.3. Setting the input#
We don’t have to provide the input when we create a task, we can always set it later:
task3 = add_var()
task3.inputs.a = 4
task3.inputs.b = 5
task3()
Result(output=Output(out=9), runtime=None, errored=False)
If we don’t specify the input, attr.NOTHING
will be used as the default value
task3a = add_var()
task3a.inputs.a = 4
# importing attr library, and checking the type of `b`
import attr
task3a.inputs.b == attr.NOTHING
True
And if we try to run the task, an error will be raised:
task3a()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[25], line 1
----> 1 task3a()
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py:454, in TaskBase.__call__(self, submitter, plugin, plugin_kwargs, rerun, **kwargs)
452 res = sub(self)
453 else: # tasks without state could be run without a submitter
--> 454 res = self._run(rerun=rerun, **kwargs)
455 return res
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/core.py:529, in TaskBase._run(self, rerun, **kwargs)
527 try:
528 self.audit.monitor()
--> 529 self._run_task()
530 result.output = self._collect_outputs(output_dir=output_dir)
531 except Exception:
File /usr/share/miniconda/envs/pydra-tutorial/lib/python3.13/site-packages/pydra/engine/task.py:202, in FunctionTask._run_task(self)
200 del inputs["_func"]
201 self.output_ = None
--> 202 output = cp.loads(self.inputs._func)(**inputs)
203 output_names = [el[0] for el in self.output_spec.fields]
204 if output is None:
Cell In[2], line 5, in add_var(a, b)
3 @pydra.mark.task
4 def add_var(a, b):
----> 5 return a + b
TypeError: unsupported operand type(s) for +: 'int' and '_Nothing'
2.4. Output directory and caching the results#
After running the task, we can check where the output directory with the results was created:
task3.output_dir
PosixPath('/tmp/tmpfo5ssr1m/FunctionTask_0ab49877e31f8ae515d5f2431b40cdab')
Within the directory you can find the file with the results: _result.pklz
.
import os
os.listdir(task3.output_dir)
['_task.pklz', '_result.pklz']
But we can also provide the path where we want to store the results. If a path is provided for the cache directory, then pydra will use the cached results of a node instead of recomputing the result. Let’s create a temporary directory and a specific subdirectory “task4”:
from tempfile import mkdtemp
from pathlib import Path
cache_dir_tmp = Path(mkdtemp()) / 'task4'
print(cache_dir_tmp)
/tmp/tmp49shyr4y/task4
Now we can pass this path to the argument of FunctionTask
- cache_dir
. To observe the execution time, we specify a function that is sleeping for 5s:
@pydra.mark.task
def add_var_wait(a: int, b: int):
import time
time.sleep(5)
return a + b
task4 = add_var_wait(a=4, b=6, cache_dir=cache_dir_tmp)
If you’re running the cell first time, it should take around 5s.
task4()
task4.result()
Result(output=Output(out=10), runtime=None, errored=False)
We can check output_dir
of our task, it should contain the path of cache_dir_tmp
and the last part contains the name of the task class FunctionTask
and the task checksum:
task4.output_dir
PosixPath('/tmp/tmp49shyr4y/task4/FunctionTask_a18730eaae7bc52020f7e092e2d90915')
Let’s see what happens when we defined identical task again with the same cache_dir
:
task4a = add_var_wait(a=4, b=6, cache_dir=cache_dir_tmp)
task4a()
Result(output=Output(out=10), runtime=None, errored=False)
This time the result should be ready right away! pydra uses available results and do not recompute the task.
pydra not only checks for the results in cache_dir
, but you can provide a list of other locations that should be checked. Let’s create another directory that will be used as cache_dir
and previous working directory will be used in cache_locations
.
cache_dir_tmp_new = Path(mkdtemp()) / 'task4b'
task4b = add_var_wait(
a=4, b=6, cache_dir=cache_dir_tmp_new, cache_locations=[cache_dir_tmp]
)
task4b()
Result(output=Output(out=10), runtime=None, errored=False)
This time the results should be also returned quickly! And we can check that task4b.output_dir
was not created:
task4b.output_dir.exists()
False
If you want to rerun the task regardless having already the results, you can set rerun
to True
. The task will take several seconds and new output_dir
will be created:
cache_dir_tmp_new = Path(mkdtemp()) / 'task4c'
task4c = add_var_wait(
a=4, b=6, cache_dir=cache_dir_tmp_new, cache_locations=[cache_dir_tmp]
)
task4c(rerun=True)
task4c.output_dir.exists()
True
If we update the input of the task, and run again, the new directory will be created and task will be recomputed:
task4b.inputs.a = 1
print(task4b())
print(task4b.output_dir.exists())
Result(output=Output(out=7), runtime=None, errored=False)
True
and when we check the output_dir
, we can see that it’s different than last time:
task4b.output_dir
PosixPath('/tmp/tmpf31gm_vr/task4b/FunctionTask_2fad104f53ca0af5dc6a027e2002a5ad')
This is because, the checksum changes when we change either input or function.
2.4.1. Exercise 1#
Create a task that take a list of numbers as an input and returns two fields: mean
with the mean value and std
with the standard deviation value.
Show code cell content
@pydra.mark.task
@pydra.mark.annotate({'return': {'mean': ty.Any, 'std': ty.Any}})
def mean_dev(my_list: List):
import statistics as st
return st.mean(my_list), st.stdev(my_list)
my_task = mean_dev(my_list=[2, 2, 2])
my_task()
my_task.result()
Result(output=Output(mean=2, std=0.0), runtime=None, errored=False)
# write your solution here (you can use statistics module)
2.5. Using Audit#
pydra can record various run time information, including the workflow provenance, by setting audit_flags
and the type of messengers.
AuditFlag.RESOURCE
allows you to monitor resource usage for the Task
, while AuditFlag.PROV
tracks the provenance of the Task
.
from pydra.utils.messenger import AuditFlag, PrintMessenger
task5 = add_var(a=4, b=5, audit_flags=AuditFlag.RESOURCE)
task5()
task5.result()
Result(output=Output(out=9), runtime=Runtime(rss_peak_gb=0.0990180966796875, vms_peak_gb=1.0483818056640626, cpu_peak_percent=0.0), errored=False)
One can turn on both audit flags using AuditFlag.ALL
, and print the messages on the terminal using the PrintMessenger
.
task5 = add_var(
a=4, b=5, audit_flags=AuditFlag.ALL, messengers=PrintMessenger()
)
task5()
task5.result()
id: 1d9cd9752e4d453f83dba3a8c6e99f2a
{
"@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
"@id": "uid:82d37ce4d019459cbb9ad594a9072e38",
"@type": "task",
"startedAtTime": "2025-01-01T03:04:04.641844",
"executedBy": "uid:0d7cad86c28d403490e6e83f4b615902"
}
id: ec9e37e5f43b4a72828a988a267270b8
{
"@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
"@id": "uid:82d37ce4d019459cbb9ad594a9072e38",
"@type": "task",
"Label": "add_var",
"Command": null,
"StartedAtTime": "2025-01-01T03:04:04.845686",
"AssociatedWith": null
}
id: 9ddc305096984eb29a0d1fbe2d9eda3e
{
"@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
"@id": "uid:bdfa25c356fd47e896cf69232d18d2ab",
"@type": "monitor",
"startedAtTime": "2025-01-01T03:04:04.846159",
"wasStartedBy": "uid:82d37ce4d019459cbb9ad594a9072e38"
}
id: ee691f97668c40629237442eec583744
{
"@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
"@id": "uid:bdfa25c356fd47e896cf69232d18d2ab",
"endedAtTime": "2025-01-01T03:04:04.852519",
"wasEndedBy": "uid:82d37ce4d019459cbb9ad594a9072e38"
}
id: 352a8b5d22c84437ae49a6a288eef6e8
{
"@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
"rss_peak_gb": 0.0990180966796875,
"vms_peak_gb": 1.0483818056640626,
"cpu_peak_percent": 0.0,
"@id": "uid:95231d93c1504bef828f0b69bf7a774d",
"@type": "runtime",
"prov:wasGeneratedBy": "uid:82d37ce4d019459cbb9ad594a9072e38"
}
id: 39ec0658c4434fca8ee33a6c88f32b7a
{
"@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
"@type": "prov:Generation",
"entity_generated": "uid:95231d93c1504bef828f0b69bf7a774d",
"hadActivity": "uid:bdfa25c356fd47e896cf69232d18d2ab"
}
id: 6975a200859a438f99d2e4d6545e2b35
{
"@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
"@id": "uid:82d37ce4d019459cbb9ad594a9072e38",
"endedAtTime": "2025-01-01T03:04:04.852642",
"errored": false
}
Result(output=Output(out=9), runtime=Runtime(rss_peak_gb=0.0990180966796875, vms_peak_gb=1.0483818056640626, cpu_peak_percent=0.0), errored=False)