{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Workflows\n", "\n", "In Pydra, workflows are DAG of component tasks to be executed on specified inputs.\n", "Workflow definitions are dataclasses, which interchangeable with Python and shell tasks\n", "definitions and executed in the same way." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Constructor functions\n", "\n", "Workflows are typically defined using the `pydra.compose.workflow.define` decorator on \n", "a \"constructor\" function that generates the workflow. For example, given two task\n", "definitions, `Add` and `Mul`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.compose import workflow, python\n", "from pydra.utils import show_workflow, print_help\n", "\n", "\n", "# Example python tasks\n", "@python.define\n", "def Add(a, b):\n", " return a + b\n", "\n", "\n", "@python.define\n", "def Mul(a, b):\n", " return a * b" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " we can create a simple workflow definition using `workflow.define` to decorate a function that constructs the workflow. Nodes are added to the workflow being constructed by calling `workflow.add` function." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@workflow.define\n", "def BasicWorkflow(a, b):\n", " add = workflow.add(Add(a=a, b=b))\n", " mul = workflow.add(Mul(a=add.out, b=b))\n", " return mul.out\n", "\n", "\n", "print_help(BasicWorkflow)\n", "show_workflow(BasicWorkflow, figsize=(2, 2.5))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`workflow.add` returns an \"outputs\" object corresponding to the definition added to the\n", "workflow. The fields of the outptus object can be referenced as inputs to downstream\n", "workflow nodes. Note that these output fields are just placeholders for the values that will\n", "be returned and can't be used in conditional statements during workflow construction\n", "(see [Dynamic construction](../explanation/conditional-lazy.html) on how to work around this\n", "limitation). The fields of the outputs to be returned by the workflow should be returned\n", "in a tuple." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pydra.compose import shell\n", "from fileformats import image, video\n", "\n", "\n", "@workflow.define\n", "def ShellWorkflow(\n", " input_video: video.Mp4,\n", " watermark: image.Png,\n", " watermark_dims: tuple[int, int] = (10, 10),\n", ") -> video.Mp4:\n", "\n", " add_watermark = workflow.add(\n", " shell.define(\n", " \"ffmpeg -i -i \"\n", " \"-filter_complex \"\n", " )(\n", " in_video=input_video,\n", " watermark=watermark,\n", " filter=\"overlay={}:{}\".format(*watermark_dims),\n", " )\n", " )\n", " output_video = workflow.add(\n", " shell.define(\n", " \"HandBrakeCLI -i -o \"\n", " \"--width --height \",\n", " )(in_video=add_watermark.out_video, width=1280, height=720)\n", " ).out_video\n", "\n", " return output_video # test implicit detection of output name\n", "\n", "\n", "print_help(ShellWorkflow)\n", "show_workflow(ShellWorkflow, figsize=(2.5, 3))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Splitting/combining task inputs\n", "\n", "Sometimes, you might want to perform the same task over a set of input values/files, and then collect the results into a list to perform further processing. This can be achieved by using the `split` and `combine` methods" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@python.define\n", "def Sum(x: list[float]) -> float:\n", " return sum(x)\n", "\n", "\n", "@workflow.define\n", "def SplitWorkflow(a: list[int], b: list[float]) -> list[float]:\n", " # Multiply over all combinations of the elements of a and b, then combine the results\n", " # for each a element into a list over each b element\n", " mul = workflow.add(Mul().split(a=a, b=b).combine(\"a\"))\n", " # Sume the multiplications across all all b elements for each a element\n", " sum = workflow.add(Sum(x=mul.out))\n", " return sum.out\n", "\n", "\n", "print_help(SplitWorkflow)\n", "show_workflow(SplitWorkflow, figsize=(2, 2.5))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The combination step doesn't have to be done on the same step as the split, in which case the splits propagate to downstream nodes" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@workflow.define\n", "def SplitThenCombineWorkflow(a: list[int], b: list[float], c: float) -> list[float]:\n", " mul = workflow.add(Mul().split(a=a, b=b))\n", " add = workflow.add(Add(a=mul.out, b=c).combine(\"Mul.a\"))\n", " sum = workflow.add(Sum(x=add.out))\n", " return sum.out\n", "\n", "\n", "print_help(SplitThenCombineWorkflow)\n", "show_workflow(SplitThenCombineWorkflow, figsize=(3, 3.5))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For more advanced discussion on the intricacies of splitting and combining see [Splitting and combining](../explanation/splitting-combining.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Nested and conditional workflows\n", "\n", "One of the most powerful features of Pydra is the ability to use inline Python code to conditionally add/omit nodes to workflow, and alter the parameterisation of the nodes, depending on inputs to the workflow " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@workflow.define\n", "def ConditionalWorkflow(\n", " input_video: video.Mp4,\n", " watermark: image.Png,\n", " watermark_dims: tuple[int, int] | None = None,\n", ") -> video.Mp4:\n", "\n", " if watermark_dims is not None:\n", " add_watermark = workflow.add(\n", " shell.define(\n", " \"ffmpeg -i -i \"\n", " \"-filter_complex \"\n", " )(\n", " in_video=input_video,\n", " watermark=watermark,\n", " filter=\"overlay={}:{}\".format(*watermark_dims),\n", " )\n", " )\n", " handbrake_input = add_watermark.out_video\n", " else:\n", " handbrake_input = input_video\n", "\n", " output_video = workflow.add(\n", " shell.define(\n", " \"HandBrakeCLI -i -o \"\n", " \"--width --height \",\n", " )(in_video=handbrake_input, width=1280, height=720)\n", " ).out_video\n", "\n", " return output_video # test implicit detection of output name\n", "\n", "\n", "print_help(ConditionalWorkflow)\n", "show_workflow(ConditionalWorkflow(watermark_dims=(10, 10)), figsize=(2.5, 3))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that outputs of upstream nodes cannot be used in conditional statements, since these are just placeholders at the time the workflow is being constructed. However, you can get around\n", "this limitation by placing the conditional logic within a nested workflow" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@python.define\n", "def Subtract(x: float, y: float) -> float:\n", " return x - y\n", "\n", "\n", "@workflow.define\n", "def RecursiveNestedWorkflow(a: float, depth: int) -> float:\n", " add = workflow.add(Add(a=a, b=1))\n", " decrement_depth = workflow.add(Subtract(x=depth, y=1))\n", " if depth > 0:\n", " out_node = workflow.add(\n", " RecursiveNestedWorkflow(a=add.out, depth=decrement_depth.out)\n", " )\n", " else:\n", " out_node = add\n", " return out_node.out\n", "\n", "\n", "print_help(RecursiveNestedWorkflow)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For more detailed discussion of the construction of conditional workflows and \"lazy field\"\n", "placeholders see [Conditionals and lazy fields](../explanation/conditional-lazy.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Type-checking between nodes\n", "\n", "Pydra utilizes Python type annotations to implement strong type-checking, which is performed\n", "when values or upstream outputs are assigned to task inputs.\n", "\n", "Job input and output fields do not need to be assigned types, since they will default to `typing.Any`.\n", "However, if they are assigned a type and a value or output from an upstream node conflicts\n", "with the type, a `TypeError` will be raised at construction time.\n", "\n", "Note that the type-checking \"assumes the best\", and will pass if the upstream field is typed\n", "by `Any` or a super-class of the field being assigned to. For example, an input of\n", "`fileformats.generic.File` passed to a field expecting a `fileformats.image.Png` file type,\n", "because `Png` is a subtype of `File`, where as `fileformats.image.Jpeg` input would fail\n", "since it is clearly not the intended type.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fileformats import generic\n", "\n", "Mp4Handbrake = shell.define(\n", " \"HandBrakeCLI -i -o \"\n", " \"--width --height \",\n", ")\n", "\n", "\n", "QuicktimeHandbrake = shell.define(\n", " \"HandBrakeCLI -i -o \"\n", " \"--width --height \",\n", ")\n", "\n", "\n", "@workflow.define\n", "def TypeErrorWorkflow(\n", " input_video: video.Mp4,\n", " watermark: generic.File,\n", " watermark_dims: tuple[int, int] = (10, 10),\n", ") -> video.Mp4:\n", "\n", " add_watermark = workflow.add(\n", " shell.define(\n", " \"ffmpeg -i -i \"\n", " \"-filter_complex \"\n", " )(\n", " in_video=input_video, # This is OK because in_video is typed Any\n", " watermark=watermark, # Type is OK because generic.File is superclass of image.Png\n", " filter=\"overlay={}:{}\".format(*watermark_dims),\n", " ),\n", " name=\"add_watermark\",\n", " )\n", "\n", " try:\n", " handbrake = workflow.add(\n", " QuicktimeHandbrake(\n", " in_video=add_watermark.out_video, width=1280, height=720\n", " ),\n", " ) # This will raise a TypeError because the input video is an Mp4\n", " except TypeError:\n", " handbrake = workflow.add(\n", " Mp4Handbrake(in_video=add_watermark.out_video, width=1280, height=720),\n", " ) # The type of the input video is now correct\n", "\n", " return handbrake.out_video\n", "\n", "\n", "print_help(TypeErrorWorkflow)\n", "show_workflow(TypeErrorWorkflow, plot_type=\"detailed\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For more detailed discussion on Pydra's type-checking see [Type Checking](../explanation/typing.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Accessing the workflow object\n", "\n", "If you need to access the workflow object being constructed from inside the constructor function you can use `workflow.this()`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@python.define(outputs=[\"divided\"])\n", "def Divide(x, y):\n", " return x / y\n", "\n", "\n", "@workflow.define(outputs=[\"out1\", \"out2\"])\n", "def DirectAccesWorkflow(a: int, b: float) -> tuple[float, float]:\n", " \"\"\"A test workflow demonstration a few alternative ways to set and connect nodes\n", "\n", " Args:\n", " a: An integer input\n", " b: A float input\n", "\n", " Returns:\n", " out1: The first output\n", " out2: The second output\n", " \"\"\"\n", "\n", " wf = workflow.this()\n", "\n", " add = wf.add(Add(a=a, b=b), name=\"addition\")\n", " mul = wf.add(Mul(a=add.out, b=b))\n", " divide = wf.add(Divide(x=wf[\"addition\"].lzout.out, y=mul.out), name=\"division\")\n", "\n", " # Alter one of the inputs to a node after it has been initialised\n", " wf[\"Mul\"].inputs.b *= 2\n", "\n", " return mul.out, divide.divided\n", "\n", "\n", "print_help(DirectAccesWorkflow)\n", "show_workflow(DirectAccesWorkflow(b=1), plot_type=\"detailed\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Directly access the workflow being constructed also enables you to set the outputs of the workflow directly" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@workflow.define(outputs={\"out1\": float, \"out2\": float})\n", "def SetOutputsOfWorkflow(a: int, b: float):\n", " \"\"\"A test workflow demonstration a few alternative ways to set and connect nodes\n", "\n", " Args:\n", " a: An integer input\n", " b: A float input\n", "\n", " Returns:\n", " out1: The first output\n", " out2: The second output\n", " \"\"\"\n", "\n", " wf = workflow.this()\n", "\n", " add = wf.add(Add(a=a, b=b), name=\"addition\")\n", " mul = wf.add(Mul(a=add.out, b=b))\n", " divide = wf.add(Divide(x=wf[\"addition\"].lzout.out, y=mul.out), name=\"division\")\n", "\n", " # Alter one of the inputs to a node after it has been initialised\n", " wf[\"Mul\"].inputs.b *= 2\n", "\n", " # Set the outputs of the workflow directly\n", " wf.outputs.out1 = mul.out\n", " wf.outputs.out2 = divide.divided\n", "\n", "\n", "print_help(SetOutputsOfWorkflow)\n", "show_workflow(SetOutputsOfWorkflow(b=3), plot_type=\"detailed\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setting software environments per node\n", "\n", "The [Advanced execution tutorial](./2-advanced-execution.html) showed how the software\n", "environment (e.g. Docker container) could be specified for shell tasks by passing the\n", "`environment` variable to the task execution/submission call. For shell tasks\n", "within workflows, the software environment used for them is specified when adding\n", "a new workflow node, i.e." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import tempfile\n", "from pathlib import Path\n", "import numpy as np\n", "from fileformats.medimage import Nifti1\n", "import fileformats.medimage_mrtrix3 as mrtrix3\n", "from pydra.environments import docker\n", "from pydra.compose import workflow, python\n", "from pydra.tasks.mrtrix3.v3_0 import MrConvert, MrThreshold\n", "\n", "MRTRIX2NUMPY_DTYPES = {\n", " \"Int8\": np.dtype(\"i1\"),\n", " \"UInt8\": np.dtype(\"u1\"),\n", " \"Int16LE\": np.dtype(\"i2\"),\n", " \"UInt16LE\": np.dtype(\"u2\"),\n", " \"Int32LE\": np.dtype(\"i4\"),\n", " \"UInt32LE\": np.dtype(\"u4\"),\n", " \"Float32LE\": np.dtype(\"f4\"),\n", " \"Float64LE\": np.dtype(\"f8\"),\n", " \"CFloat32LE\": np.dtype(\"c8\"),\n", " \"CFloat64LE\": np.dtype(\"c16\"),\n", "}\n", "\n", "\n", "@workflow.define(outputs=[\"out_image\"])\n", "def ToyMedianThreshold(in_image: Nifti1) -> mrtrix3.ImageFormat:\n", " \"\"\"A toy example workflow that\n", "\n", " * converts a NIfTI image to MRTrix3 image format with a separate header\n", " * loads the separate data file and selects the median value\n", " \"\"\"\n", "\n", " input_conversion = workflow.add(\n", " MrConvert(in_file=in_image, out_file=\"out_file.mih\"),\n", " name=\"input_conversion\",\n", " environment=docker.Environment(\"mrtrix3/mrtrix3\", tag=\"latest\"),\n", " )\n", "\n", " @python.define\n", " def Median(mih: mrtrix3.ImageHeader) -> float:\n", " \"\"\"A bespoke function that reads the separate data file in the MRTrix3 image\n", " header format (i.e. .mih) and calculates the median value.\n", "\n", " NB: We could use a MrStats task here, but this is just an example to show how\n", " to use a bespoke function in a workflow.\n", " \"\"\"\n", " dtype = MRTRIX2NUMPY_DTYPES[mih.metadata[\"datatype\"].strip()]\n", " data = np.frombuffer(Path.read_bytes(mih.data_file), dtype=dtype)\n", " return np.median(data)\n", "\n", " median = workflow.add(Median(mih=input_conversion.out_file))\n", "\n", " threshold = workflow.add(\n", " MrThreshold(in_file=in_image, out_file=\"binary.mif\", abs=median.out),\n", " environment=docker.Environment(\"mrtrix3/mrtrix3\", tag=\"latest\"),\n", " )\n", "\n", " output_conversion = workflow.add(\n", " MrConvert(in_file=threshold.out_file, out_file=\"out_image.mif\"),\n", " name=\"output_conversion\",\n", " environment=docker.Environment(\"mrtrix3/mrtrix3\", tag=\"latest\"),\n", " )\n", "\n", " return output_conversion.out_file\n", "\n", "\n", "test_dir = Path(tempfile.mkdtemp())\n", "\n", "nifti_file = Nifti1.sample(test_dir, seed=0)\n", "\n", "wf = ToyMedianThreshold(in_image=nifti_file)\n", "\n", "outputs = wf(cache_root=test_dir / \"cache\")\n", "\n", "print(outputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "See [Containers and Environments](../explanation/environments.rst) for more details on\n", "how to utilise containers and add support for other software environments." ] }, { "cell_type": "markdown", "metadata": {}, "source": [] } ], "metadata": { "kernelspec": { "display_name": "wf13", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.1" } }, "nbformat": 4, "nbformat_minor": 4 }