Software environments

Pydra supports running tasks within encapsulated software environments, such as Docker and Singularity containers. This can be specified at runtime or during workflow construction, and allows tasks to be run in environments that are isolated from the host system, and that have specific software dependencies.

The environment a task runs within is specified by the environment argument passed to the execution call (e.g. my_task(worker="cf", environment="docker")) or in the workflow.add() call in workflow constructors.

Specifying at execution

The environment for a task can be specified at execution time by passing the environment argument to the task call. This can be an instance of pydra.environments.native.Environment (for the host system), pydra.environments.docker.Environment (for Docker containers), or pydra.environments.singularity.Environment (for Singularity containers), or a custom environment.

Example:

from pydra.environments import native, docker, singularity
from pydra.compose import shell

# Define a simple shell task
Shelly = shell.define("echo <text:str>")

shelly = Shelly(text="Hello, Pydra!")

# Execute with a native environment
outputs_native = shelly(environment=native.Environment())

# Execute with a Docker environment (assuming busybox image is available)
outputs_docker = shelly(environment=docker.Environment(image="busybox"))

# Execute with a Singularity environment (assuming an image is available)
outputs_singularity = shelly(
    environment=singularity.Environment(image="/path/to/image.sif")
)

Alternatively, when using a pydra.engine.submitter.Submitter, the environment can be specified in the Submitter constructor:

from pydra.engine.submitter import Submitter
from pydra.environments import native
from pydra.compose import shell

Shelly = shell.define("echo <text:str>")
shelly = Shelly(text="Hello, Pydra!")
with Submitter(environment=native.Environment()) as sub:
    result = sub(shelly)

Specifying at workflow construction

When constructing a workflow, the environment can be specified in the workflow.add() call. This ensures that all tasks within that workflow branch will execute in the specified environment.

Example:

from pydra.environments import singularity
from pydra.compose import workflow, shell
from fileformats.generic import File

image = "/path/to/my_singularity_image.sif"  # Replace with your Singularity image path

Singu = shell.define("cat <file>")


def MyWorkflow(file: File) -> str:
    singu_task = workflow.add(
        Singu(file=file),
        environment=singularity.Environment(image=image),
    )
    return singu_task.stdout

# Now you can use MyWorkflow, and the 'cat' task will run in the Singularity environment

Implementing new environment types

Custom environment types can be implemented by creating a new class that inherits from pydra.environments.Environment. These custom environment classes are typically located in the pydra/environments/ directory.

Example (simplified custom environment):

from pydra.environments.base import Environment as PydraEnvironment
import typing as ty


class MyCustomEnvironment(PydraEnvironment):
    def __init__(self, some_config: str):
        super().__init__()
        self.some_config = some_config

    def setup(self):
        # Logic to set up the custom environment
        print(f"Setting up custom environment with config: {self.some_config}")

    def execute(self, job: "Job[shell.Task]") -> dict[str, ty.Any]:
        # Logic to execute a command within the custom environment
        # This is where you would integrate with a custom execution system
        print(f"Executing command: '{job.task.cmdline}' in custom environment")
        # For demonstration, just return a dummy result
        return {"stdout": "Custom environment output", "stderr": "", "return_code": 0}

    def teardown(self):
        # Logic to tear down the custom environment
        print("Tearing down custom environment")

Then, you can use your custom environment like any other built-in environment:

from pydra.compose import shell
from pydra.engine.job import Job

# Assume MyCustomEnvironment is defined as above
my_task = shell.define("echo <text:str>")(text="Hello from custom env")
outputs = my_task(environment=MyCustomEnvironment(some_config="test"))
print(outputs.stdout)