cyto.tasks¶
Pipeline task graph infrastructure — base task class, task definitions, dependency graph, and task manager used by both standalone and distributed execution modes.
- class cyto.tasks.base.PipelineTask(execution_config: Dict[str, Any] | None = None)[source]¶
Bases:
ABCBase class for all pipeline tasks, providing a common interface for execution in different environments (baremetal, container).
- abstract run_baremetal(data: Dict[str, Any]) Dict[str, Any][source]¶
The core logic of the task to be executed on the local machine. This method must be implemented by all subclasses.
- run_container(data: Dict[str, Any]) Dict[str, Any][source]¶
The core logic of the task to be executed in a container.
This method provides the default container execution logic that can be used by subclasses or overridden for specialized behavior.
The container execution flow:
Get the appropriate runner (Docker/Singularity) from execution_config.
Pass the task (self) and data to the runner.
The runner serializes the task and data, runs them in a container, and deserializes the result.
Inside the container, the container_worker calls run_baremetal().
- class cyto.tasks.definitions.Task(name: str, module: str, params: ~typing.Dict[str, ~typing.Any] = <factory>, dependencies: ~typing.List[str] = <factory>, execution_config: ~typing.Dict[str, ~typing.Any] | None = None, tags: ~typing.List[str] = <factory>)[source]¶
Bases:
objectA data class representing a single task definition from the pipeline configuration file. It holds all the metadata and parameters needed to instantiate and execute a task.
- cyto.tasks.graph.build_task_graph(config_path: str, resources_path: str | None = None) tuple[DiGraph, Dict | None][source]¶
Build a directed acyclic graph (DAG) of tasks from a YAML pipeline configuration.
- cyto.tasks.graph.get_execution_order(graph: DiGraph) List[str][source]¶
Get the topological ordering of tasks for execution.
- Parameters:
graph (nx.DiGraph) – The task dependency graph
- Returns:
List of task names in execution order
- Return type:
List[str]
- cyto.tasks.graph.get_task_by_name(graph: DiGraph, task_name: str) Task[source]¶
Get a Task object by name from the graph.
- class cyto.tasks.manager.TaskManager(graph: DiGraph, resources_config: Dict | None = None, profile: str = 'default', verbose: bool = True)[source]¶
Bases:
objectManages the execution of a pipeline of tasks based on their dependency graph.
- execute(initial_data: Dict[str, Any]) Dict[str, Any][source]¶
Execute all tasks in the pipeline according to their dependency order.