API¶
Python tasks¶
- pydra.compose.python.Outputs¶
alias of
PythonOutputs
- pydra.compose.python.Task¶
alias of
PythonTask
- class pydra.compose.python.arg(*, name: str | None = None, type=typing.Any, default: Any = NO_DEFAULT, help: str = '', requires: str | Collection[Requirement | str | Collection[str | tuple[str, Collection[Any]]]] = NOTHING, converter: Callable[[...], Any] | None = None, validator: Callable[[...], bool] | None = None, hash_eq: bool = False, allowed_values=NOTHING, copy_mode: CopyMode = CopyMode.any, copy_collation: CopyCollation = CopyCollation.any, copy_ext_decomp: ExtensionDecomposition = ExtensionDecomposition.single, readonly: bool = False)[source]¶
Bases:
ArgArgument of a Python task
- Parameters:
help (str) -- A short description of the input field.
default (Any, optional) -- the default value for the argument
allowed_values (list, optional) -- List of allowed values for the field.
requires (list, optional) -- Names of the inputs that are required together with the field.
copy_mode (File.CopyMode, optional) -- The mode of copying the file, by default it is File.CopyMode.any
copy_collation (File.CopyCollation, optional) -- The collation of the file, by default it is File.CopyCollation.any
copy_ext_decomp (File.ExtensionDecomposition, optional) -- The extension decomposition of the file, by default it is File.ExtensionDecomposition.single
readonly (bool, optional) -- If True the input field can’t be provided by the user but it aggregates other input fields (for example the fields with argstr: -o {fldA} {fldB}), by default it is False
type (type, optional) -- The type of the field, by default it is Any
name (str, optional) -- The name of the field, used when specifying a list of fields instead of a mapping from name to field, by default it is None
- pydra.compose.python.define(wrapped: type | Callable | None = None, /, inputs: list[str | arg] | dict[str, arg | type] | None = None, outputs: list[str | out] | dict[str, out | type] | type | None = None, bases: Sequence[type] = (), outputs_bases: Sequence[type] = (), auto_attribs: bool = True, name: str | None = None, xor: Sequence[str | None] | Sequence[Sequence[str | None]] = ()) PythonTask[source]¶
Create an interface for a function or a class.
- Parameters:
wrapped (type | callable | None) -- The function or class to create an interface for.
inputs (list[str | Arg] | dict[str, Arg | type] | None) -- The inputs to the function or class.
outputs (list[str | base.Out] | dict[str, base.Out | type] | type | None) -- The outputs of the function or class.
auto_attribs (bool) -- Whether to use auto_attribs mode when creating the class.
name (str | None) -- The name of the returned class
xor (Sequence[str | None] | Sequence[Sequence[str | None]], optional) -- Names of args that are exclusive mutually exclusive, which must include the name of the current field. If this list includes None, then none of the fields need to be set.
- Returns:
The task class for the Python function
- Return type:
Task
- class pydra.compose.python.out(*, name: str | None = None, type=typing.Any, default: Any = NO_DEFAULT, help: str = '', requires: str | Collection[Requirement | str | Collection[str | tuple[str, Collection[Any]]]] = NOTHING, converter: Callable[[...], Any] | None = None, validator: Callable[[...], bool] | None = None, hash_eq: bool = False)[source]¶
Bases:
OutOutput of a Python task
- Parameters:
name (str, optional) -- The name of the field, used when specifying a list of fields instead of a mapping from name to field, by default it is None
type (type, optional) -- The type of the field, by default it is Any
help (str, optional) -- A short description of the input field.
requires (list, optional) -- Names of the inputs that are required together with the field.
converter (callable, optional) -- The converter for the field passed through to the attrs.field, by default it is None
validator (callable | iterable[callable], optional) -- The validator(s) for the field passed through to the attrs.field, by default it is None
position (int) -- The position of the output in the output list, allows for tuple unpacking of outputs
Shell tasks¶
- pydra.compose.shell.Outputs¶
alias of
ShellOutputs
- pydra.compose.shell.Task¶
alias of
ShellTask
- class pydra.compose.shell.arg(*, name: str | None = None, type=typing.Any, default: Any = NO_DEFAULT, help: str = '', requires: str | Collection[Requirement | str | Collection[str | tuple[str, Collection[Any]]]] = NOTHING, converter: Callable[[...], Any] | None = None, validator: Callable[[...], bool] | None = None, hash_eq: bool = False, copy_mode: CopyMode = CopyMode.any, copy_collation: CopyCollation = CopyCollation.any, copy_ext_decomp: ExtensionDecomposition = ExtensionDecomposition.single, readonly: bool = False, argstr: str | None = '', position: int | None = None, sep: str = ' ', allowed_values: list | None = None, container_path: bool = False, formatter: Callable | None = None)[source]¶
Bases:
ArgAn input field that specifies a command line argument
- Parameters:
help (str) -- A short description of the input field.
default (Any, optional) -- the default value for the argument
mandatory (bool, optional) -- If True user has to provide a value for the field, by default it is False
allowed_values (list, optional) -- List of allowed values for the field.
requires (list, optional) -- List of field names that are required together with the field.
copy_mode (File.CopyMode, optional) -- The mode of copying the file, by default it is File.CopyMode.any
copy_collation (File.CopyCollation, optional) -- The collation of the file, by default it is File.CopyCollation.any
copy_ext_decomp (File.ExtensionDecomposition, optional) -- The extension decomposition of the file, by default it is File.ExtensionDecomposition.single
readonly (bool, optional) -- If True the input field can’t be provided by the user but it aggregates other input fields (for example the fields with argstr: -o {fldA} {fldB}), by default it is False
type (type, optional) -- The type of the field, by default it is Any
name (str, optional) -- The name of the field, used when specifying a list of fields instead of a mapping from name to field, by default it is None
argstr (str, optional) -- A flag or string that is used in the command before the value, e.g. -v or -v {inp_field}, but it could be and empty string, “”, in which case the value is just printed to the command line. If … are used, e.g. -v…, the flag is used before every element if a list is provided as a value. If the argstr is None, the field is not part of the command.
position (int, optional) -- Position of the field in the command, could be nonnegative or negative integer. If nothing is provided the field will be inserted between all fields with nonnegative positions and fields with negative positions.
sep (str, optional) -- A separator if a sequence type is provided as a value, by default " ".
container_path (bool, optional) -- If True a path will be consider as a path inside the container (and not as a local path, by default it is False
formatter (function, optional) -- If provided the argstr of the field is created using the function. This function can for example be used to combine several inputs into one command argument. The function can take field (this input field will be passed to the function), inputs (entire inputs will be passed) or any input field name (a specific input field will be sent).
- pydra.compose.shell.define(wrapped: type | str | None = None, /, inputs: list[str | Arg] | dict[str, Arg | type] | None = None, outputs: list[str | Out] | dict[str, Out | type] | type | None = None, bases: Sequence[type] = (), outputs_bases: Sequence[type] = (), auto_attribs: bool = True, name: str | None = None, xor: Sequence[str | None] | Sequence[Sequence[str | None]] = ()) ShellTask[source]¶
Create a task for a shell command. Can be used either as a decorator on the "canonical" dataclass-form of a task or as a function that takes a "shell-command template string" of the form
` shell.define("command <input1> <input2> --output <out|output1>") `Fields are inferred from the template if not provided. In the template, inputs are specified with <fieldname> and outputs with <out|fieldname>.
` my_command <myinput> <out|myoutput2> `The types of the fields can be specified using their MIME like (see fileformats.core.from_mime), e.g.
` my_command <myinput:text/csv> <out|myoutput2:image/png> `The template can also specify options with - or -- followed by the option name and arguments with <argname:type>. The type is optional and will default to generic/fs-object if not provided for arguments and field/text for options. The file-formats namespace can be dropped for generic and field formats, e.g.
` another-command <input1:directory> <input2:int> --output <out|output1:text/csv> `- Parameters:
wrapped (type | str | None) -- The class or command line template to create an interface for
inputs (list[str | Arg] | dict[str, Arg | type] | None) -- The input fields of the shell command
outputs (list[str | Out] | dict[str, Out | type] | type | None) -- The output fields of the shell command
auto_attribs (bool) -- Whether to use auto_attribs mode when creating the class
args_last (bool) -- Whether to put the executable argument last in the command line instead of first as they appear in the template
name (str | None) -- The name of the returned class
xor (Sequence[str | None] | Sequence[Sequence[str | None]], optional) -- Names of args that are exclusive mutually exclusive, which must include the name of the current field. If this list includes None, then none of the fields need to be set.
- Returns:
The interface for the shell command
- Return type:
Task
- class pydra.compose.shell.out(*, name: str | None = None, type=typing.Any, default: Any = NO_DEFAULT, help: str = '', requires: str | Collection[Requirement | str | Collection[str | tuple[str, Collection[Any]]]] = NOTHING, converter: Callable[[...], Any] | None = None, validator: Callable[[...], bool] | None = None, hash_eq: bool = False, callable: Callable | None = None)[source]¶
Bases:
OutAn output field that specifies a command line argument
- Parameters:
callable (Callable, optional) -- If provided the output file name (or list of file names) is created using the function. The function can take field (the specific output field will be passed to the function), cache_dir (task cache_dir will be used), stdout, stderr (stdout and stderr of the task will be sent) inputs (entire inputs will be passed) or any input field name (a specific input field will be sent).
- class pydra.compose.shell.outarg(*, name: str | None = None, type=typing.Any, default: Any = NO_DEFAULT, help: str = '', requires: str | Collection[Requirement | str | Collection[str | tuple[str, Collection[Any]]]] = NOTHING, converter: Callable[[...], Any] | None = None, validator: Callable[[...], bool] | None = None, hash_eq: bool = False, copy_mode: CopyMode = CopyMode.any, copy_collation: CopyCollation = CopyCollation.any, copy_ext_decomp: ExtensionDecomposition = ExtensionDecomposition.single, readonly: bool = False, argstr: str | None = '', position: int | None = None, sep: str = ' ', allowed_values: list | None = None, container_path: bool = False, formatter: Callable | None = None, path_template: str | None = None, keep_extension: bool = True)[source]¶
Bases:
arg,OutAn input field that specifies where to save the output file
- Parameters:
help (str) -- A short description of the input field.
default (Any, optional) -- the default value for the argument
mandatory (bool, optional) -- If True user has to provide a value for the field, by default it is False
allowed_values (list, optional) -- List of allowed values for the field.
requires (list, optional) -- List of field names that are required together with the field.
copy_mode (File.CopyMode, optional) -- The mode of copying the file, by default it is File.CopyMode.any
copy_collation (File.CopyCollation, optional) -- The collation of the file, by default it is File.CopyCollation.any
copy_ext_decomp (File.ExtensionDecomposition, optional) -- The extension decomposition of the file, by default it is File.ExtensionDecomposition.single
readonly (bool, optional) -- If True the input field can’t be provided by the user but it aggregates other input fields (for example the fields with argstr: -o {fldA} {fldB}), by default it is False
type (type, optional) -- The type of the field, by default it is Any
name (str, optional) -- The name of the field, used when specifying a list of fields instead of a mapping from name to field, by default it is None
argstr (str, optional) -- A flag or string that is used in the command before the value, e.g. -v or -v {inp_field}, but it could be and empty string, “”. If … are used, e.g. -v…, the flag is used before every element if a list is provided as a value. If no argstr is used the field is not part of the command.
position (int, optional) -- Position of the field in the command line, could be nonnegative or negative integer. If nothing is provided the field will be inserted between all fields with nonnegative positions and fields with negative positions.
sep (str, optional) -- A separator if a list is provided as a value.
container_path (bool, optional) -- If True a path will be consider as a path inside the container (and not as a local path, by default it is False
formatter (function, optional) -- If provided the argstr of the field is created using the function. This function can for example be used to combine several inputs into one command argument. The function can take field (this input field will be passed to the function), inputs (entire inputs will be passed) or any input field name (a specific input field will be sent).
path_template (str, optional) -- The template used to specify where the output file will be written to can use other fields, e.g. {file1}. Used in order to create an output definition.
- OPTIONAL_PATH_TEMPLATE_HELP = "The path specified for the output file, if True, the default 'path template' will be used.If False or None, the output file will not be saved."¶
- PATH_TEMPLATE_HELP = "The path specified for the output file, if True, the default 'path template' will be used."¶
- markdown_listing(line_width: int = 79, help_indent: int = 4, as_input: bool = False, **kwargs)[source]¶
Get the listing for the field in markdown-like format
- Parameters:
line_width (int) -- The maximum line width for the output, by default it is 79
help_indent (int) -- The indentation for the help text, by default it is 4
as_input (bool) -- Whether to format the field as an input or output if it can be both, by default it is False
**kwargs (Any) -- Additional arguments to allow it to be duck-typed with extension classes
- Returns:
The listing for the field in markdown-like format
- Return type:
Workflows¶
- pydra.compose.workflow.Outputs¶
alias of
WorkflowOutputs
- pydra.compose.workflow.Task¶
alias of
WorkflowTask
- pydra.compose.workflow.add(task: Task[OutputsType], name: str | None = None, environment: Environment | None = None, hooks: TaskHooks | None = None) OutputsType[source]¶
Add a node to the workflow currently being constructed
- Parameters:
task (Task) -- The definition of the task to add to the workflow as a node
name (str, optional) -- The name of the node, by default it will be the name of the task class
environment (Environment, optional) -- The environment to run the task in, such as the Docker or Singularity container, by default it will be the "native"
hooks (TaskHooks, optional) -- The hooks to run before or after the task, by default no hooks will be run
- Returns:
The outputs of the node
- Return type:
Outputs
- class pydra.compose.workflow.arg(*, name: str | None = None, type=typing.Any, default: Any = NO_DEFAULT, help: str = '', requires: str | Collection[Requirement | str | Collection[str | tuple[str, Collection[Any]]]] = NOTHING, converter: Callable[[...], Any] | None = None, validator: Callable[[...], bool] | None = None, hash_eq: bool = False, allowed_values=NOTHING, copy_mode: CopyMode = CopyMode.any, copy_collation: CopyCollation = CopyCollation.any, copy_ext_decomp: ExtensionDecomposition = ExtensionDecomposition.single, readonly: bool = False)[source]¶
Bases:
ArgArgument of a workflow task
- Parameters:
help (str) -- A short description of the input field.
default (Any, optional) -- the default value for the argument
allowed_values (list, optional) -- List of allowed values for the field.
requires (list, optional) -- Names of the inputs that are required together with the field.
copy_mode (File.CopyMode, optional) -- The mode of copying the file, by default it is File.CopyMode.any
copy_collation (File.CopyCollation, optional) -- The collation of the file, by default it is File.CopyCollation.any
copy_ext_decomp (File.ExtensionDecomposition, optional) -- The extension decomposition of the file, by default it is File.ExtensionDecomposition.single
readonly (bool, optional) -- If True the input field can’t be provided by the user but it aggregates other input fields (for example the fields with argstr: -o {fldA} {fldB}), by default it is False
type (type, optional) -- The type of the field, by default it is Any
name (str, optional) -- The name of the field, used when specifying a list of fields instead of a mapping from name to field, by default it is None
lazy (bool, optional) -- If True the input field is not required at construction time but is passed straight through to the tasks, by default it is False
- pydra.compose.workflow.cast(field: Any, new_type: type[U]) U[source]¶
Cast a lazy field to a new type. Note that the typing in the signature is a white lie, as the return field is actually a LazyField as placeholder for the object of type U.
- Parameters:
field (LazyField[T]) -- The field to cast
new_type (type[U]) -- The new type to cast the field to
- Returns:
A copy of the lazy field with the new type
- Return type:
LazyField[U]
- pydra.compose.workflow.define(wrapped: type | Callable | None = None, /, inputs: list[str | arg] | dict[str, arg | type] | None = None, outputs: list[str | out] | dict[str, out | type] | type | None = None, bases: Sequence[type] = (), outputs_bases: Sequence[type] = (), lazy: list[str] | None = None, auto_attribs: bool = True, name: str | None = None, xor: Sequence[str | None] | Sequence[Sequence[str | None]] = ()) WorkflowTask[source]¶
Create an interface for a function or a class. Can be used either as a decorator on a constructor function or the "canonical" dataclass-form of a task.
- Parameters:
wrapped (type | callable | None) -- The function or class to create an interface for.
inputs (list[str | Arg] | dict[str, Arg | type] | None) -- The inputs to the function or class.
outputs (list[str | Out] | dict[str, Out | type] | type | None) -- The outputs of the function or class.
auto_attribs (bool) -- Whether to use auto_attribs mode when creating the class.
name (str | None) -- The name of the returned class
xor (Sequence[str | None] | Sequence[Sequence[str | None]], optional) -- Names of args that are exclusive mutually exclusive, which must include the name of the current field. If this list includes None, then none of the fields need to be set.
- Returns:
The interface for the function or class.
- Return type:
Task
- class pydra.compose.workflow.out(*, name: str | None = None, type=typing.Any, default: Any = NO_DEFAULT, help: str = '', requires: str | Collection[Requirement | str | Collection[str | tuple[str, Collection[Any]]]] = NOTHING, converter: Callable[[...], Any] | None = None, validator: Callable[[...], bool] | None = None, hash_eq: bool = False)[source]¶
Bases:
OutOutput of a workflow task
- Parameters:
name (str, optional) -- The name of the field, used when specifying a list of fields instead of a mapping from name to field, by default it is None
type (type, optional) -- The type of the field, by default it is Any
help (str, optional) -- A short description of the input field.
requires (list, optional) -- Names of the inputs that are required together with the field.
converter (callable, optional) -- The converter for the field passed through to the attrs.field, by default it is None
validator (callable | iterable[callable], optional) -- The validator(s) for the field passed through to the attrs.field, by default it is None
Engine classes¶
The core of the workflow engine.
- class pydra.engine.Submitter(cache_root: ~os.PathLike | None = None, worker: str | ~typing.Type[~pydra.workers.base.Worker] | ~pydra.workers.base.Worker | None = 'debug', environment: Environment | None = None, readonly_caches: list[~os.PathLike] | None = None, audit_flags: ~pydra.utils.messenger.AuditFlag = <AuditFlag.NONE: 0>, messengers: ~typing.Iterable[~pydra.utils.messenger.Messenger] | None = None, messenger_args: dict[str, ~typing.Any] | None = None, max_concurrent: int | float = inf, propagate_rerun: bool = True, clean_stale_locks: bool | None = None, **kwargs)[source]¶
Bases:
objectSend a job to the execution backend.
- Parameters:
cache_root (os.PathLike, optional) -- Cache directory where the working directory/results for the job will be stored, by default None
worker (str or Worker, optional) -- The worker to use, by default "cf"
environment (Environment, optional) -- The execution environment to use, by default None
readonly_caches (list[os.PathLike], optional) -- Alternate cache locations to check for pre-computed results, by default None
max_concurrent (int | float, optional) -- Maximum number of concurrent tasks to run, by default float("inf") (unlimited)
audit_flags (AuditFlag, optional) -- Configure provenance tracking. available flags:
AuditFlagDefault is no provenance tracking.messenger (
MessengerorlistofMessengeror None) -- Messenger(s) used by Audit. Saved in the audit attribute. See available flags atMessenger.messengers_args (dict[str, Any], optional) -- Argument(s) used by messegner. Saved in the audit attribu
clean_stale_locks (bool, optional) -- Whether to clean stale lock files, i.e. lock files that were created before the start of the current run. Don't set if using a global cache where there are potentially multiple workflows that are running concurrently. By default (None), lock files will be cleaned if the debug worker is used
**kwargs (dict) -- Keyword arguments to pass on to the worker initialisation
- audit_flags: AuditFlag¶
- property cache_root¶
Get the location of the cache directory.
- expand_workflow(workflow_task: Job[WorkflowTask], rerun: bool) None[source]¶
Expands and executes a workflow job synchronously. Typically only used during debugging and testing, as the asynchronous version is more efficient.
- Parameters:
job (
Task]) -- Workflow Job object
- async expand_workflow_async(workflow_task: Job[WorkflowTask], rerun: bool) None[source]¶
Expand and execute a workflow job asynchronously.
- Parameters:
job (
Task]) -- Workflow Job object
- async fetch_finished(futures) tuple[set[Task], set[Task]][source]¶
Awaits asyncio's
asyncio.Taskuntil one is finished.- Parameters:
futures (set of asyncio awaitables) -- Job execution coroutines or asyncio
asyncio.Task- Returns:
pending (set) -- Pending asyncio
asyncio.Task.done (set) -- Completed asyncio
asyncio.Task
- get_runnable_tasks(graph: DiGraph) list[Job[TaskType]][source]¶
Parse a graph and return all runnable tasks.
- Parameters:
graph (
DiGraph) -- Graph object- Returns:
tasks (list of
Job) -- List of runnable tasksfollowing_err (dict[NodeToExecute, list[str]]) -- Dictionary of tasks that are blocked by errored tasks
- submit(job: Job[TaskType], rerun: bool = False) None[source]¶
Submit a job to the worker.
- Parameters:
job (
Job) -- The job to submitrerun (bool, optional) -- Whether to force the re-computation of the job results even if existing results are found, by default False
- worker: Worker¶