Warning: This document is for an old version of Pydra: A simple dataflow engine with scalable semantics. The main version is master.

pydra.engine.helpers module

Administrative support for the engine framework.

class pydra.engine.helpers.PydraFileLock(lockfile)

Bases: object

Wrapper for filelock’s SoftFileLock that makes it work with asyncio.

pydra.engine.helpers.argstr_formatting(argstr, inputs, value_updates=None)

formatting argstr that have form {field_name}, using values from inputs and updating with value_update if provided

pydra.engine.helpers.copyfile_workflow(wf_path, result)

if file in the wf results, the file will be copied to the workflow directory

pydra.engine.helpers.create_checksum(name, inputs)

Generate a checksum name for a given combination of task name and inputs.

Parameters:
  • name (str) – Task name.

  • inputs (str) – String of inputs.

pydra.engine.helpers.custom_validator(instance, attribute, value)

simple custom validation take into account ty.Union, ty.List, ty.Dict (but only one level depth) adding an additional validator, if allowe_values provided

pydra.engine.helpers.ensure_list(obj, tuple2list=False)

Return a list whatever the input object is.

Examples

>>> ensure_list(list("abc"))
['a', 'b', 'c']
>>> ensure_list("abc")
['abc']
>>> ensure_list(tuple("abc"))
[('a', 'b', 'c')]
>>> ensure_list(tuple("abc"), tuple2list=True)
['a', 'b', 'c']
>>> ensure_list(None)
[]
>>> ensure_list(5.0)
[5.0]
pydra.engine.helpers.execute(cmd, strip=False)

Run the event loop with coroutine.

Uses read_and_display_async() unless a loop is already running, in which case read_and_display() is used.

Parameters:
  • cmd (list or tuple) – The command line to be executed.

  • strip (bool) – TODO

pydra.engine.helpers.gather_runtime_info(fname)

Extract runtime information from a file.

Parameters:

fname (os.pathlike) – The file containing runtime information

Returns:

runtime – A runtime object containing the collected information.

Return type:

Runtime

pydra.engine.helpers.get_available_cpus()

Return the number of CPUs available to the current process or, if that is not available, the total number of CPUs on the system.

Returns:

n_proc – The number of available CPUs.

Return type:

int

pydra.engine.helpers.get_open_loop()

Get current event loop.

If the loop is closed, a new loop is created and set as the current event loop.

Returns:

loop – The current event loop

Return type:

asyncio.EventLoop

pydra.engine.helpers.hash_function(obj)

Generate hash of object.

pydra.engine.helpers.hash_value(value, tp=None, metadata=None, precalculated=None)

calculating hash or returning values recursively

pydra.engine.helpers.load_and_run(task_pkl, ind=None, rerun=False, submitter=None, plugin=None, **kwargs)

loading a task from a pickle file, settings proper input and running the task

async pydra.engine.helpers.load_and_run_async(task_pkl, ind=None, submitter=None, rerun=False, **kwargs)

loading a task from a pickle file, settings proper input and running the workflow

pydra.engine.helpers.load_result(checksum, cache_locations)

Restore a result from the cache.

Parameters:
  • checksum (str) – Unique identifier of the task to be loaded.

  • cache_locations (list of os.pathlike) – List of cache directories, in order of priority, where the checksum will be looked for.

pydra.engine.helpers.load_task(task_pkl, ind=None)

loading a task from a pickle file, settings proper input for the specific ind

pydra.engine.helpers.make_klass(spec)

Create a data class given a spec.

Parameters:

spec – TODO

pydra.engine.helpers.output_from_inputfields(output_spec, input_spec)

Collect values from output from input fields. If names_only is False, the output_spec is updated, if names_only is True only the names are returned

Parameters:
  • output_spec – TODO

  • input_spec – TODO

pydra.engine.helpers.position_sort(args)

Sort objects by position, following Python indexing conventions.

Ordering is positive positions, lowest to highest, followed by unspecified positions (None) and negative positions, lowest to highest.

>>> position_sort([(None, "d"), (-3, "e"), (2, "b"), (-2, "f"), (5, "c"), (1, "a")])
['a', 'b', 'c', 'd', 'e', 'f']
Parameters:

args (list of (int/None, object) tuples)

Return type:

list of objects

pydra.engine.helpers.print_help(obj)

Visit a task object and print its input/output interface.

pydra.engine.helpers.read_and_display(*cmd, strip=False, hide_display=False)

Capture a process’ standard output.

async pydra.engine.helpers.read_and_display_async(*cmd, hide_display=False, strip=False)

Capture standard input and output of a process, displaying them as they arrive.

Works line-by-line.

async pydra.engine.helpers.read_stream_and_display(stream, display)

Read from stream line by line until EOF, display, and capture the lines.

pydra.engine.helpers.record_error(error_path, error)

Write an error file.

pydra.engine.helpers.save(task_path: Path, result=None, task=None, name_prefix=None)

Save a TaskBase object and/or results.

Parameters:
  • task_path (Path) – Write directory

  • result (Result) – Result to pickle and write

  • task (TaskBase) – Task to pickle and write

pydra.engine.helpers.task_hash(task)

Calculate the checksum of a task.

input hash, output hash, environment hash

Parameters:

task (TaskBase) – The input task.