Warning: This document is for the development version of Pydra: A simple dataflow engine with scalable semantics. The main version is master.

pydra.engine.workers module

Execution workers.

class pydra.engine.workers.ConcurrentFuturesWorker(n_procs=None)

Bases: Worker

A worker to execute in parallel using Python’s concurrent futures.

close()

Finalize the internal pool of tasks.

async exec_as_coro(runnable, rerun=False, environment=None)

Run a task (coroutine wrapper).

plugin_name = 'cf'
run_el(runnable, rerun=False, environment=None, **kwargs)

Run a task.

class pydra.engine.workers.DaskWorker(**kwargs)

Bases: Worker

A worker to execute in parallel using Dask.distributed. This is an experimental implementation with limited testing.

close()

Finalize the internal pool of tasks.

async exec_dask(runnable, rerun=False)

Run a task (coroutine wrapper).

plugin_name = 'dask'
run_el(runnable, rerun=False, **kwargs)

Run a task.

class pydra.engine.workers.DistributedWorker(loop=None, max_jobs=None)

Bases: Worker

Base Worker for distributed execution.

async fetch_finished(futures)

Awaits asyncio’s asyncio.Task until one is finished.

Limits number of submissions based on py:attr:DistributedWorker.max_jobs.

Parameters:

futures (set of asyncio awaitables) – Task execution coroutines or asyncio asyncio.Task

Returns:

pending – Pending asyncio asyncio.Task.

Return type:

set

max_jobs

Maximum number of concurrently running jobs.

class pydra.engine.workers.PsijLocalWorker(**kwargs)

Bases: PsijWorker

A worker to execute tasks using PSI/J on the local machine.

plugin_name = 'psij-local'
subtype = 'local'
class pydra.engine.workers.PsijSlurmWorker(**kwargs)

Bases: PsijWorker

A worker to execute tasks using PSI/J using SLURM.

plugin_name = 'psij-slurm'
subtype = 'slurm'
class pydra.engine.workers.PsijWorker(**kwargs)

Bases: Worker

A worker to execute tasks using PSI/J.

close()

Finalize the internal pool of tasks.

async exec_psij(runnable, rerun=False)

Run a task (coroutine wrapper).

Raises:

Exception – If stderr is not empty.

Return type:

None

make_job(spec, attributes)

Create a PSI/J job.

Parameters:
  • spec (psij.JobSpec) – PSI/J job specification.

  • attributes (any) – Job attributes.

Returns:

PSI/J job.

Return type:

psij.Job

make_spec(cmd=None, arg=None)

Create a PSI/J job specification.

Parameters:
  • cmd (str, optional) – Executable command. Defaults to None.

  • arg (list, optional) – List of arguments. Defaults to None.

Returns:

PSI/J job specification.

Return type:

psij.JobSpec

run_el(interface, rerun=False, **kwargs)

Run a task.

class pydra.engine.workers.SGEWorker(loop=None, max_jobs=None, poll_delay=1, qsub_args=None, write_output_files=True, max_job_array_length=50, indirect_submit_host=None, max_threads=None, poll_for_result_file=True, default_threads_per_task=1, polls_before_checking_evicted=60, collect_jobs_delay=30, default_qsub_args='', max_mem_free=None)

Bases: DistributedWorker

A worker to execute tasks on SLURM systems.

async check_for_results_files(jobid, threads_requested)
async get_output_by_task_pkl(task_pkl)
async get_tasks_to_run(task_qsub_args, mem_free)
plugin_name = 'sge'
run_el(runnable, rerun=False)

Worker submission API.

async submit_array_job(sargs, tasks_to_run, error_file)
class pydra.engine.workers.SerialWorker(**kwargs)

Bases: Worker

A worker to execute linearly.

close()

Return whether the task is finished.

async exec_serial(runnable, rerun=False, environment=None)
async fetch_finished(futures)

Awaits asyncio’s asyncio.Task until one is finished.

Parameters:

futures (set of asyncio awaitables) – Task execution coroutines or asyncio asyncio.Task

Returns:

pending – Pending asyncio asyncio.Task.

Return type:

set

plugin_name = 'serial'
run_el(interface, rerun=False, environment=None, **kwargs)

Run a task.

class pydra.engine.workers.SlurmWorker(loop=None, max_jobs=None, poll_delay=1, sbatch_args=None)

Bases: DistributedWorker

A worker to execute tasks on SLURM systems.

plugin_name = 'slurm'
run_el(runnable, rerun=False, environment=None)

Worker submission API.

class pydra.engine.workers.Worker(loop=None)

Bases: object

A base class for execution of tasks.

close()

Close this worker.

async fetch_finished(futures)

Awaits asyncio’s asyncio.Task until one is finished.

Parameters:

futures (set of asyncio awaitables) – Task execution coroutines or asyncio asyncio.Task

Returns:

pending – Pending asyncio asyncio.Task.

Return type:

set

run_el(interface, **kwargs)

Return coroutine for task execution.