cyto.tasks

Pipeline task graph infrastructure — base task class, task definitions, dependency graph, and task manager used by both standalone and distributed execution modes.

class cyto.tasks.base.PipelineTask(execution_config: Dict[str, Any] | None = None)[source]

Bases: ABC

Base class for all pipeline tasks, providing a common interface for execution in different environments (baremetal, container).

abstract run_baremetal(data: Dict[str, Any]) Dict[str, Any][source]

The core logic of the task to be executed on the local machine. This method must be implemented by all subclasses.

Parameters:

data (Dict[str, Any]) – Input data for the task

Returns:

Results from task execution

Return type:

Dict[str, Any]

run_container(data: Dict[str, Any]) Dict[str, Any][source]

The core logic of the task to be executed in a container.

This method provides the default container execution logic that can be used by subclasses or overridden for specialized behavior.

The container execution flow:

  1. Get the appropriate runner (Docker/Singularity) from execution_config.

  2. Pass the task (self) and data to the runner.

  3. The runner serializes the task and data, runs them in a container, and deserializes the result.

  4. Inside the container, the container_worker calls run_baremetal().

Parameters:

data (Dict[str, Any]) – Input data for the task

Returns:

Results from container execution

Return type:

Dict[str, Any]

class cyto.tasks.definitions.Task(name: str, module: str, params: ~typing.Dict[str, ~typing.Any] = <factory>, dependencies: ~typing.List[str] = <factory>, execution_config: ~typing.Dict[str, ~typing.Any] | None = None, tags: ~typing.List[str] = <factory>)[source]

Bases: object

A data class representing a single task definition from the pipeline configuration file. It holds all the metadata and parameters needed to instantiate and execute a task.

dependencies: List[str]
execution_config: Dict[str, Any] | None = None
instance: Any | None = None
module: str
name: str
params: Dict[str, Any]
tags: List[str]
cyto.tasks.graph.build_task_graph(config_path: str, resources_path: str | None = None) tuple[DiGraph, Dict | None][source]

Build a directed acyclic graph (DAG) of tasks from a YAML pipeline configuration.

Parameters:
  • config_path (str) – Path to the YAML configuration file

  • resources_path (str, optional) – Path to the resources YAML file

Returns:

(NetworkX directed graph, resources configuration dict)

Return type:

tuple

cyto.tasks.graph.get_execution_order(graph: DiGraph) List[str][source]

Get the topological ordering of tasks for execution.

Parameters:

graph (nx.DiGraph) – The task dependency graph

Returns:

List of task names in execution order

Return type:

List[str]

cyto.tasks.graph.get_task_by_name(graph: DiGraph, task_name: str) Task[source]

Get a Task object by name from the graph.

Parameters:
  • graph (nx.DiGraph) – The task dependency graph

  • task_name (str) – Name of the task to retrieve

Returns:

The Task object

Return type:

Task

Raises:

KeyError – If task_name is not found in the graph

class cyto.tasks.manager.TaskManager(graph: DiGraph, resources_config: Dict | None = None, profile: str = 'default', verbose: bool = True)[source]

Bases: object

Manages the execution of a pipeline of tasks based on their dependency graph.

execute(initial_data: Dict[str, Any]) Dict[str, Any][source]

Execute all tasks in the pipeline according to their dependency order.

Parameters:

initial_data (Dict[str, Any]) – Initial data to pass to the first tasks

Returns:

Results from the final task(s) in the pipeline

Return type:

Dict[str, Any]

get_task_result(task_name: str) Any[source]

Get the result from a specific task.

Parameters:

task_name (str) – Name of the task

Returns:

The task result

Return type:

Any

Raises:

KeyError – If the task hasn’t been executed or doesn’t exist

get_task_results() Dict[str, Any][source]

Get all task results.

Returns:

Dictionary mapping task names to their results

Return type:

Dict[str, Any]