# FunctionTask

In [1]:
import nest_asyncio

nest_asyncio.apply()

A `FunctionTask` is a `Task` that can be created from every *python* function by using *pydra* decorator: `pydra.mark.task`:

In [2]:
import pydra

@pydra.mark.task
def add_var(a, b):
    return a + b

Once we decorate the function, we can create a pydra `Task` and specify the input:

In [3]:
task0 = add_var(a=4, b=5)

We can check the type of `task0`:

In [4]:
type(task0)

pydra.engine.task.FunctionTask

and we can check if the task has correct values of `a` and `b`, they should be saved in the task `inputs`:

In [5]:
print(f'a = {task0.inputs.a}')
print(f'b = {task0.inputs.b}')

a = 4
b = 5


We can also check content of entire `inputs`:

In [6]:
task0.inputs

Inputs(a=4, b=5, _func=b'\x80\x05\x95\xbb\x01\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_make_function\x94\x93\x94(h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x02K\x00K\x00K\x02K\x02K\x03C\x0c\x97\x00|\x00|\x01z\x00\x00\x00S\x00\x94N\x85\x94)\x8c\x01a\x94\x8c\x01b\x94\x86\x94\x8c!/tmp/ipykernel_6452/3542708107.py\x94\x8c\x07add_var\x94h\x0eK\x03C\x0b\x80\x00\xe0\x0b\x0c\x88q\x895\x80L\x94C\x00\x94))t\x94R\x94}\x94(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94uNNNt\x94R\x94h\x00\x8c\x12_function_setstate\x94\x93\x94h\x18}\x94}\x94(h\x15h\x0e\x8c\x0c__qualname__\x94h\x0e\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x16\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0.')

As you could see, `task.inputs` contains also information about the function, that is an inseparable part of the `FunctionTask`.

Once we have the task with set input, we can run it. Since `Task` is a "callable object", we can use the syntax:

In [7]:
task0()

Result(output=Output(out=9), runtime=None, errored=False)

As you can see, the result was returned right away, but we can also access it later:

In [8]:
task0.result()

Result(output=Output(out=9), runtime=None, errored=False)

`Result` contains more than just an output, so if we want to get the task output, we can type:

In [9]:
result = task0.result()
result.output.out

9

And if we want to see the input that was used in the task, we can set an optional argument `return_inputs` to True.

In [10]:
task0.result(return_inputs=True)

({'add_var.a': 4, 'add_var.b': 5},
 Result(output=Output(out=9), runtime=None, errored=False))

## Type-checking

### What is Type-checking?

Type-checking is verifying the type of a value at compile or run time. It ensures that operations or assignments to variables are semantically meaningful and can be executed without type errors, enhancing code reliability and maintainability.

### Why Use Type-checking?

1. **Error Prevention**: Type-checking helps catch type mismatches early, preventing potential runtime errors.
2. **Improved Readability**: Type annotations make understanding what types of values a function expects and returns more straightforward.
3. **Better Documentation**: Explicitly stating expected types acts as inline documentation, simplifying code collaboration and review.
4. **Optimized Performance**: Type-related optimizations can be made during compilation when types are explicitly specified.

### How is Type-checking Implemented in Pydra?

#### Static Type-Checking
Static type-checking is done using Python's type annotations. You annotate the types of your function arguments and the return type and then use a tool like `mypy` to statically check if you're using the function correctly according to those annotations.

In [11]:
@pydra.mark.task
def add(a: int, b: int) -> int:
    return a + b

In [12]:
# This usage is correct according to static type hints:
task1a = add(a=5, b=3)
task1a()

Result(output=Output(out=8), runtime=None, errored=False)

In [13]:
# This usage is incorrect according to static type hints:
task1b = add(a="hello", b="world")
task1b()

TypeError: Cannot coerce 'hello' into <class 'int'> as the coercion doesn't match any of the explicit inclusion criteria: Sequence -> Sequence, Mapping -> Mapping, Path -> PathLike, str -> PathLike, PathLike -> Path, PathLike -> str, Any -> MultiInputObj, int -> float, integer -> int, floating -> float, bool_ -> bool, integer -> float, character -> str, complexfloating -> complex, bytes_ -> bytes, ndarray -> Sequence, Sequence -> ndarray

#### Dynamic Type-Checking

Dynamic type-checking is done at runtime. Add dynamic type checks if you want to enforce types when the function is executed.

In [14]:
@pydra.mark.task
def add(a, b):
    if not (isinstance(a, int) and isinstance(b, int)):
        raise TypeError("Both inputs should be integers.")
    return a + b

In [15]:
# This usage is correct and will not raise a runtime error:
task1c = add(a=5, b=3)
task1c()

Result(output=Output(out=8), runtime=None, errored=False)

In [16]:
# This usage is incorrect and will raise a runtime TypeError:
task1d = add(a="hello", b="world")
task1d()

TypeError: Both inputs should be integers.

#### Checking Complex Types

For more complex types like lists, dictionaries, or custom objects, we can use type hints combined with dynamic checks.

In [17]:
from typing import List, Tuple

@pydra.mark.task
def sum_of_pairs(pairs: List[Tuple[int, int]]) -> List[int]:
    if not all(isinstance(pair, Tuple) and len(pair) == 2 for pair in pairs):
        raise ValueError("Input should be a list of pairs (tuples with 2 integers each).")
    return [sum(pair) for pair in pairs]

In [18]:
# Correct usage
task1e = sum_of_pairs(pairs=[(1, 2), (3, 4)])  
task1e()

Result(output=Output(out=[3, 7]), runtime=None, errored=False)

In [19]:
# This will raise a ValueError
task1f = sum_of_pairs(pairs=[(1, 2), (3, "4")])  
task1f()

TypeError: Cannot coerce '4' into <class 'int'> as the coercion doesn't match any of the explicit inclusion criteria: Sequence -> Sequence, Mapping -> Mapping, Path -> PathLike, str -> PathLike, PathLike -> Path, PathLike -> str, Any -> MultiInputObj, int -> float, integer -> int, floating -> float, bool_ -> bool, integer -> float, character -> str, complexfloating -> complex, bytes_ -> bytes, ndarray -> Sequence, Sequence -> ndarray

## Customizing output names
Note, that "out" is the default name for the task output, but we can always customize it. There are two ways of doing it: using *python* function annotation and using another *pydra* decorator:

Let's start from the function annotation:

In [20]:
import typing as ty

@pydra.mark.task
def add_var_an(a: int, b: int) -> {'sum_a_b': int}:
    return a + b


task2a = add_var_an(a=4, b=5)
task2a()

Result(output=Output(sum_a_b=9), runtime=None, errored=False)

The annotation might be very useful to specify the output names when the function returns multiple values.

In [21]:
@pydra.mark.task
def modf_an(a: float) -> {'fractional': ty.Any, 'integer': ty.Any}:
    import math

    return math.modf(a)


task2b = modf_an(a=3.5)
task2b()

Result(output=Output(fractional=0.5, integer=3.0), runtime=None, errored=False)

The second way of customizing the output requires another decorator - `pydra.mark.annotate`

In [22]:
@pydra.mark.task
@pydra.mark.annotate({'return': {'fractional': ty.Any, 'integer': ty.Any}})
def modf(a: float):
    import math

    return math.modf(a)

task2c = modf(a=3.5)
task2c()

Result(output=Output(fractional=0.5, integer=3.0), runtime=None, errored=False)

**Note, that the order of the pydra decorators is important!**

## Setting the input

We don't have to provide the input when we create a task, we can always set it later:

In [23]:
task3 = add_var()
task3.inputs.a = 4
task3.inputs.b = 5
task3()

Result(output=Output(out=9), runtime=None, errored=False)

If we don't specify the input, `attr.NOTHING` will be used as the default value

In [24]:
task3a = add_var()
task3a.inputs.a = 4

# importing attr library, and checking the type of `b`
import attr

task3a.inputs.b == attr.NOTHING

True

And if we try to run the task, an error will be raised:

In [25]:
task3a()

TypeError: unsupported operand type(s) for +: 'int' and '_Nothing'

## Output directory and caching the results

After running the task, we can check where the output directory with the results was created:

In [26]:
task3.output_dir

PosixPath('/tmp/tmpgccjsdko/FunctionTask_c3a2dd2f4d8f08f1286f136c748dd2ed')

Within the directory you can find the file with the results: `_result.pklz`.

In [27]:
import os

In [28]:
os.listdir(task3.output_dir)

['_task.pklz', '_result.pklz']

But we can also provide the path where we want to store the results. If a path is provided for the cache directory, then pydra will use the cached results of a node instead of recomputing the result. Let's create a temporary directory and a specific subdirectory "task4":

In [29]:
from tempfile import mkdtemp
from pathlib import Path

In [30]:
cache_dir_tmp = Path(mkdtemp()) / 'task4'
print(cache_dir_tmp)

/tmp/tmpqm3swbu2/task4


Now we can pass this path to the argument of `FunctionTask` - `cache_dir`. To observe the execution time, we specify a function that is sleeping for 5s:

In [31]:
@pydra.mark.task
def add_var_wait(a: int, b: int):
    import time

    time.sleep(5)
    return a + b

task4 = add_var_wait(a=4, b=6, cache_dir=cache_dir_tmp)

If you're running the cell first time, it should take around 5s.

In [32]:
task4()
task4.result()

Result(output=Output(out=10), runtime=None, errored=False)

We can check `output_dir` of our task, it should contain the path of `cache_dir_tmp` and the last part contains the name of the task class `FunctionTask` and the task checksum:

In [33]:
task4.output_dir

PosixPath('/tmp/tmpqm3swbu2/task4/FunctionTask_3f42c96bdca5034892978e28d2f91773')

Let's see what happens when we defined identical task again with the same `cache_dir`:

In [34]:
task4a = add_var_wait(a=4, b=6, cache_dir=cache_dir_tmp)
task4a()

Result(output=Output(out=10), runtime=None, errored=False)

This time the result should be ready right away! *pydra* uses available results and do not recompute the task.

*pydra* not only checks for the results in `cache_dir`, but you can provide a list of other locations that should be checked. Let's create another directory that will be used as `cache_dir` and previous working directory will be used in `cache_locations`.

In [35]:
cache_dir_tmp_new = Path(mkdtemp()) / 'task4b'

task4b = add_var_wait(
    a=4, b=6, cache_dir=cache_dir_tmp_new, cache_locations=[cache_dir_tmp]
)
task4b()

Result(output=Output(out=10), runtime=None, errored=False)

This time the results should be also returned quickly! And we can check that `task4b.output_dir` was not created:

In [36]:
task4b.output_dir.exists()

False

If you want to rerun the task regardless having already the results, you can set `rerun` to `True`. The task will take several seconds and new `output_dir` will be created:

In [37]:
cache_dir_tmp_new = Path(mkdtemp()) / 'task4c'

task4c = add_var_wait(
    a=4, b=6, cache_dir=cache_dir_tmp_new, cache_locations=[cache_dir_tmp]
)
task4c(rerun=True)

task4c.output_dir.exists()

True

If we update the input of the task, and run again, the new directory will be created and task will be recomputed:

In [38]:
task4b.inputs.a = 1
print(task4b())
print(task4b.output_dir.exists())

Result(output=Output(out=7), runtime=None, errored=False)
True


and when we check the `output_dir`, we can see that it's different than last time:

In [39]:
task4b.output_dir

PosixPath('/tmp/tmpk0srwvs8/task4b/FunctionTask_db43fd653c264ada9ddc5dae0774d20c')

This is because, the checksum changes when we change either input or function.

### Exercise 1
Create a task that take a list of numbers as an input and returns two fields: `mean` with the mean value and `std` with the standard deviation value.

In [40]:
@pydra.mark.task
@pydra.mark.annotate({'return': {'mean': ty.Any, 'std': ty.Any}})
def mean_dev(my_list: List):
    import statistics as st

    return st.mean(my_list), st.stdev(my_list)

my_task = mean_dev(my_list=[2, 2, 2])
my_task()
my_task.result()

Result(output=Output(mean=2, std=0.0), runtime=None, errored=False)

In [41]:
# write your solution here (you can use statistics module)

## Using Audit

*pydra* can record various run time information, including the workflow provenance, by setting `audit_flags` and the type of messengers.

`AuditFlag.RESOURCE` allows you to monitor resource usage for the `Task`, while `AuditFlag.PROV` tracks the provenance of the `Task`.

In [42]:
from pydra.utils.messenger import AuditFlag, PrintMessenger

task5 = add_var(a=4, b=5, audit_flags=AuditFlag.RESOURCE)
task5()
task5.result()

Result(output=Output(out=9), runtime=Runtime(rss_peak_gb=0.10631561328125, vms_peak_gb=0.9318733212890625, cpu_peak_percent=247.5), errored=False)

One can turn on both audit flags using `AuditFlag.ALL`, and print the messages on the terminal using the `PrintMessenger`.

In [43]:
task5 = add_var(
    a=4, b=5, audit_flags=AuditFlag.ALL, messengers=PrintMessenger()
)
task5()
task5.result()

id: 12044c5660c34c0ba99f19556758dda0
{
  "@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
  "@id": "uid:c9be22ea8057457e9e67a664d4bb8820",
  "@type": "task",
  "startedAtTime": "2023-10-17T01:16:48.268073",
  "executedBy": "uid:aebfcf9b75f140ef819dbf29363d7b3d"
}


id: d1a3bd088a2b408ba4ad723b00e2296a
{
  "@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
  "@id": "uid:c9be22ea8057457e9e67a664d4bb8820",
  "@type": "task",
  "Label": "add_var",
  "Command": null,
  "StartedAtTime": "2023-10-17T01:16:48.473775",
  "AssociatedWith": null
}
id: 5d9aae9e266848d6b7bf33b130ca0c58
{
  "@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
  "@id": "uid:a0cb069945ca4a218c7a2c53dbf2dc86",
  "@type": "monitor",
  "startedAtTime": "2023-10-17T01:16:48.474343",
  "wasStartedBy": "uid:c9be22ea8057457e9e67a664d4bb8820"
}
id: 1ac55861990243e69606c502a96af1e2
{
  "@context": "https://raw.githubusercontent.com/nipype/pydra/master/pydra/schema/context.jsonld",
  "@id": "uid:a0cb069945ca4a218c7a2c53dbf2dc86",
  "endedAtTime": "2023-10-17T01:16:48.480454",
  "wasEndedBy": "uid:c9be22ea8057457e9e67a664d4bb8820"
}
id: a5f28a1da9ac47d48b1c089a67dd8721
{
  "@context": "https://raw.git

Result(output=Output(out=9), runtime=Runtime(rss_peak_gb=0.1045722958984375, vms_peak_gb=0.9318733212890625, cpu_peak_percent=209.4), errored=False)