{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Canonical form and serialisation\n", "\n", "## Canonical task form\n", "\n", "Under the hood, all Python, shell and workflow tasks generated by the\n", "`pydra.compose.*.define` decorators/functions are translated to\n", "[dataclass](https://docs.python.org/3/library/dataclasses.html)-like classes by the\n", "[attrs](https://www.attrs.org/en/stable/) library. While the more compact syntax described\n", "in the [Python-tasks](./4-python.html), [Shell-tasks](./5-shell.html) and [Workflow](./6-workflow.html)\n", "tutorials is convenient when designing tasks for specific use cases, it is too magical\n", "for linters follow. Therefore, when designing tasks to be used by third\n", "parties (e.g. `pydra-fsl`, `pydra-ants`) it is recommended to favour the, more\n", "explicit, \"canonical\" dataclass form.\n", "\n", "The syntax of the canonical form is close to that used by the\n", "[Attrs](https://www.attrs.org/en/stable/) package itself, with class type annotations\n", "used to define the fields of the inputs and outputs of the task. Tasks defined in canonical\n", "form will be able to be statically type-checked by [MyPy](https://mypy-lang.org/)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Python-tasks\n", "\n", "Python tasks in dataclass form are decorated by `pydra.compose.python.define`\n", "with inputs listed as type annotations. Outputs are similarly defined in a nested class\n", "called `Outputs`. The function to be executed should be a staticmethod called `function`.\n", "Default values can also be set directly, as with Attrs classes.\n", "\n", "In order to allow static type-checkers to check the type of outputs of tasks added\n", "to workflows, it is also necessary to explicitly extend from the `pydra.compose.python.Task`\n", "and `pydra.compose.python.Outputs` classes (they are otherwise set as bases by the\n", "`define` method implicitly). Thus the \"canonical form\" of Python task is as\n", "follows" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.utils import print_help\n", "from pydra.compose import python\n", "\n", "\n", "@python.define\n", "class CanonicalPythonTask(python.Task[\"CanonicalPythonTask.Outputs\"]):\n", " \"\"\"Canonical Python task class for testing\n", "\n", " Args:\n", " a: First input\n", " to be inputted\n", " b: Second input\n", " \"\"\"\n", "\n", " a: int\n", " b: float = 2.0 # set default value\n", "\n", " class Outputs(python.Outputs):\n", " \"\"\"\n", " Args:\n", " c: Sum of a and b\n", " d: Product of a and b\n", " \"\"\"\n", "\n", " c: float\n", " d: float\n", "\n", " @staticmethod\n", " def function(a, b):\n", " return a + b, a / b\n", "\n", "\n", "print_help(CanonicalPythonTask)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To set additional attributes other than the type and default, such as `allowed_values`\n", "and `validators`, `python.arg` and `python.out` can be used instead." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import attrs.validators\n", "\n", "\n", "@python.define\n", "class CanonicalPythonTask(python.Task[\"CanonicalPythonTask.Outputs\"]):\n", " \"\"\"Canonical Python task class for testing\n", "\n", " Args:\n", " a: First input\n", " to be inputted\n", " b: Second input\n", " \"\"\"\n", "\n", " a: int = python.arg(allowed_values=[1, 2, 3, 4, 5])\n", " b: float = python.arg(default=2.0, validator=attrs.validators.not_(0))\n", "\n", " class Outputs(python.Outputs):\n", " \"\"\"\n", " Args:\n", " c: Sum of a and b\n", " d: Product of a and b\n", " \"\"\"\n", "\n", " c: float\n", " d: float\n", "\n", " @staticmethod\n", " def function(a, b):\n", " return a + b, a / b\n", "\n", "\n", "print_help(CanonicalPythonTask)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Shell-tasks\n", "\n", "The canonical form of shell tasks is the same as for Python tasks, except a string `executable`\n", "attribute replaces the `function` staticmethod." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "from pathlib import Path\n", "from fileformats import generic\n", "from pydra.compose import shell\n", "\n", "\n", "# the \"copied\" output is magically passed to this function because the name matches\n", "def get_file_size(copied: Path) -> int:\n", " \"\"\"Calculate the file size\"\"\"\n", " result = os.stat(copied)\n", " return result.st_size\n", "\n", "\n", "@shell.define\n", "class CpFileWithSize(shell.Task[\"CpFileWithSize.Outputs\"]):\n", "\n", " executable = \"cp\"\n", "\n", " in_file: generic.File # = shell.arg() is assumed\n", " archive_mode: bool = shell.arg(argstr=\"-a\", default=False)\n", "\n", " class Outputs(shell.Outputs):\n", "\n", " copied: generic.File = shell.outarg(\n", " position=-1, path_template=\"{in_file}_copied\"\n", " )\n", " file_size: int = shell.out(callable=get_file_size)\n", "\n", "\n", "print_help(CpFileWithSize)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Workflow definitions\n", "\n", "Workflows can also be defined in canonical form, which is the same as for Python tasks\n", "but with a staticmethod called `constructor` that constructs the workflow." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import typing as ty\n", "import re\n", "from pydra.compose import python, workflow\n", "from pydra.utils import print_help, show_workflow\n", "\n", "\n", "# Example python tasks\n", "@python.define\n", "def Add(a, b):\n", " return a + b\n", "\n", "\n", "@python.define\n", "def Mul(a, b):\n", " return a * b\n", "\n", "\n", "@workflow.define\n", "class CanonicalWorkflowTask(workflow.Task[\"CanonicalWorkflowTask.Outputs\"]):\n", "\n", " @staticmethod\n", " def str2num(value: ty.Any) -> float | int:\n", " if isinstance(value, str) and re.match(r\"^\\d+(\\.\\d+)?$\", value):\n", " return eval(value) # use eval to convert string to number\n", " return value\n", "\n", " a: int\n", " b: float = workflow.arg(help=\"A float input\", converter=str2num)\n", "\n", " @staticmethod\n", " def constructor(a, b):\n", " add = workflow.add(Add(a=a, b=b))\n", " mul = workflow.add(Mul(a=add.out, b=b))\n", " return mul.out\n", "\n", " class Outputs(workflow.Outputs):\n", " out: float\n", "\n", "\n", "print_help(CanonicalWorkflowTask)\n", "show_workflow(CanonicalWorkflowTask)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Converting to/from dictionaries\n", "\n", "As well as the dataclass-like canonical form, it is also possible to represent all tasks\n", "in a nested dictionary form, which could be written to a static file (e.g. in JSON or\n", "YAML format). The dictionary form of a class can be generated by the `pydra.utils.unstructure`\n", "function. For example, the following shell command" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "MyCmd = shell.define(\n", " \"my-cmd --an-arg \"\n", " \"--a-flag --arg-with-default \"\n", ")\n", "\n", "print_help(MyCmd)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Can be converted into a serialised dictionary form" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pprint import pprint\n", "from pydra.utils import unstructure\n", "\n", "my_cmd_dict = unstructure(MyCmd)\n", "\n", "pprint(my_cmd_dict)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Noting that there is still a little more work has to be done to serialise some Python\n", "objects, e.g. classes used in field types and functions that are run in Python and\n", "construct workflows in workflow tasks, before the serialized form can be written to JSON/YAML." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cp_with_size_dict = unstructure(CpFileWithSize)\n", "\n", "pprint(cp_with_size_dict)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To unserialize the general dictionary form back into a Task class, you can use the\n", "`pydra.utils.structure` method" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.utils import structure\n", "\n", "ReloadedCpFileWithSize = structure(cp_with_size_dict)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "which should run just as before" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pathlib import Path\n", "import tempfile\n", "from pydra.utils import asdict\n", "\n", "tmp_dir = Path(tempfile.mkdtemp())\n", "\n", "a_file = tmp_dir / \"hello-world.txt\"\n", "a_file.write_text(\"Hello world\")\n", "\n", "cp_file_with_size = ReloadedCpFileWithSize(in_file=a_file)\n", "outputs = cp_file_with_size(cache_root=tmp_dir / \"cache\")\n", "\n", "pprint(asdict(outputs))" ] } ], "metadata": { "kernelspec": { "display_name": "wf13", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.1" } }, "nbformat": 4, "nbformat_minor": 2 }