Canonical task form

Under the hood, all Python, shell and workflow tasks generated by the pydra.compose.*.define decorators/functions are translated to dataclass-like classes by the attrs library. While the more compact syntax described in the Python-tasks, Shell-tasks and Workflow tutorials is convenient when designing tasks for specific use cases, it is too magical for linters follow. Therefore, when designing tasks to be used by third parties (e.g. pydra-fsl, pydra-ants) it is recommended to favour the, more explicit, "canonical" dataclass form.

The syntax of the canonical form is close to that used by the Attrs package itself, with class type annotations used to define the fields of the inputs and outputs of the task. Tasks defined in canonical form will be able to be statically type-checked by MyPy.

Python-tasks

Python tasks in dataclass form are decorated by pydra.compose.python.define with inputs listed as type annotations. Outputs are similarly defined in a nested class called Outputs. The function to be executed should be a staticmethod called function. Default values can also be set directly, as with Attrs classes.

In order to allow static type-checkers to check the type of outputs of tasks added to workflows, it is also necessary to explicitly extend from the pydra.engine.python.Task and pydra.engine.python.Outputs classes (they are otherwise set as bases by the define method implicitly). Thus the "canonical form" of Python task is as follows

[1]:
from pydra.utils import print_help
from pydra.compose import python


@python.define
class CanonicalPythonTask(python.Task["CanonicalPythonTask.Outputs"]):
    """Canonical Python task class for testing

    Args:
        a: First input
            to be inputted
        b: Second input
    """

    a: int
    b: float = 2.0  # set default value

    class Outputs(python.Outputs):
        """
        Args:
            c: Sum of a and b
            d: Product of a and b
        """

        c: float
        d: float

    @staticmethod
    def function(a, b):
        return a + b, a / b


print_help(CanonicalPythonTask)
------------------------------------------
Help for Python task 'CanonicalPythonTask'
------------------------------------------

Inputs:
- b: float; default = 2.0
    Second input
- a: int
    First input to be inputted
- function: Callable[]; default = function()

Outputs:
- c: float
    Sum of a and b
- d: float
    Product of a and b

To set additional attributes other than the type and default, such as allowed_values and validators, python.arg and python.out can be used instead.

[2]:
import attrs.validators


@python.define
class CanonicalPythonTask(python.Task["CanonicalPythonTask.Outputs"]):
    """Canonical Python task class for testing

    Args:
        a: First input
            to be inputted
        b: Second input
    """

    a: int = python.arg(allowed_values=[1, 2, 3, 4, 5])
    b: float = python.arg(default=2.0, validator=attrs.validators.not_(0))

    class Outputs(python.Outputs):
        """
        Args:
            c: Sum of a and b
            d: Product of a and b
        """

        c: float
        d: float

    @staticmethod
    def function(a, b):
        return a + b, a / b


print_help(CanonicalPythonTask)
------------------------------------------
Help for Python task 'CanonicalPythonTask'
------------------------------------------

Inputs:
- a: int (allowed_values=frozenset({1, 2, 3, 4, 5}))
    First input to be inputted
- b: float; default = 2.0
    Second input
- function: Callable[]; default = function()

Outputs:
- c: float
    Sum of a and b
- d: float
    Product of a and b

Shell-tasks

The canonical form of shell tasks is the same as for Python tasks, except a string executable attribute replaces the function staticmethod.

[3]:
import os
from pathlib import Path
from fileformats import generic
from pydra.compose import shell
from pydra.utils.typing import MultiInputObj


@shell.define
class CpWithSize(shell.Task["CpWithSize.Outputs"]):

    executable = "cp"

    in_fs_objects: MultiInputObj[generic.FsObject]
    recursive: bool = shell.arg(argstr="-R")
    text_arg: str = shell.arg(argstr="--text-arg")
    int_arg: int | None = shell.arg(argstr="--int-arg")
    tuple_arg: tuple[int, str] | None = shell.arg(argstr="--tuple-arg")

    class Outputs(shell.Outputs):

        @staticmethod
        def get_file_size(out_file: Path) -> int:
            """Calculate the file size"""
            result = os.stat(out_file)
            return result.st_size

        copied: generic.FsObject = shell.outarg(path_template="copied")
        out_file_size: int = shell.out(callable=get_file_size)


print_help(CpWithSize)
--------------------------------
Help for Shell task 'CpWithSize'
--------------------------------

Inputs:
- executable: str | Sequence[str]; default = 'cp'
    the first part of the command, can be a string, e.g. 'ls', or a list, e.g.
    ['ls', '-l', 'dirname']
- int_arg: int | None ('--int-arg')
- recursive: bool ('-R')
- text_arg: str ('--text-arg')
- tuple_arg: tuple[int, str] | None ('--tuple-arg')
- in_fs_objects: MultiInputObj[generic/fs-object]
- copied: Path | bool; default = True
    The path specified for the output file, if True, the default 'path
    template' will be used.
- append_args: list[str | generic/file]; default-factory = list()
    Additional free-form arguments to append to the end of the command.

Outputs:
- copied: generic/fs-object
- out_file_size: int
- return_code: int
    The process' exit code.
- stderr: str
    The standard error stream produced by the command.
- stdout: str
    The standard output stream produced by the command.

Workflow definitions

Workflows can also be defined in canonical form, which is the same as for Python tasks but with a staticmethod called constructor that constructs the workflow.

[4]:
import typing as ty
import re
from pydra.compose import python, workflow
from pydra.compose.base import is_set
from pydra.utils import print_help, show_workflow


# Example python tasks
@python.define
def Add(a, b):
    return a + b


@python.define
def Mul(a, b):
    return a * b


@workflow.define
class CanonicalWorkflowTask(workflow.Task["CanonicalWorkflowTask.Outputs"]):

    @staticmethod
    def str2num(value: ty.Any) -> float | int:
        if isinstance(value, str) and re.match(r"^\d+(\.\d+)?$", value):
            return eval(value)  # use eval to convert string to number
        return value

    a: int
    b: float = workflow.arg(help="A float input", converter=str2num)

    @staticmethod
    def constructor(a, b):
        add = workflow.add(Add(a=a, b=b))
        mul = workflow.add(Mul(a=add.out, b=b))
        return mul.out

    class Outputs(workflow.Outputs):
        out: float


print_help(CanonicalWorkflowTask)
show_workflow(CanonicalWorkflowTask)
----------------------------------------------
Help for Workflow task 'CanonicalWorkflowTask'
----------------------------------------------

Inputs:
- b: float
    A float input
- a: int
- constructor: Callable[]; default = constructor()

Outputs:
- out: float

../_images/tutorial_7-canonical-form_8_1.png