Source code for cyto.tasks.graph
import yaml
import networkx as nx
import os
from typing import Dict, Any, List, Optional
from cyto.tasks.definitions import Task
[docs]
def build_task_graph(config_path: str, resources_path: Optional[str] = None) -> tuple[nx.DiGraph, Optional[Dict]]:
"""
Build a directed acyclic graph (DAG) of tasks from a YAML pipeline configuration.
Args:
config_path (str): Path to the YAML configuration file
resources_path (str, optional): Path to the resources YAML file
Returns:
tuple: (NetworkX directed graph, resources configuration dict)
"""
# Load the YAML configuration
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
if 'tasks' not in config:
raise ValueError("Configuration file must contain a 'tasks' section")
# Create a directed graph
graph = nx.DiGraph()
# Create Task objects and add them as nodes
tasks = {}
for task_name, task_config in config['tasks'].items():
task = Task(
name=task_name,
module=task_config['module'],
params=task_config.get('params', {}),
dependencies=task_config.get('dependencies', []),
execution_config=task_config.get('execution_config'),
tags=task_config.get('tags', [])
)
tasks[task_name] = task
graph.add_node(task_name, task=task)
# Add edges based on dependencies
for task_name, task in tasks.items():
for dependency in task.dependencies:
if dependency not in tasks:
raise ValueError(f"Task '{task_name}' depends on '{dependency}' which is not defined")
# Add edge from dependency to task (dependency -> task)
graph.add_edge(dependency, task_name)
# Check for cycles in the graph
if not nx.is_directed_acyclic_graph(graph):
cycle = nx.find_cycle(graph)
raise ValueError(f"Circular dependency detected in task graph: {cycle}")
# Load resources configuration if provided
resources_config = None
if resources_path and os.path.exists(resources_path):
with open(resources_path, 'r') as f:
resources_config = yaml.safe_load(f)
return graph, resources_config
[docs]
def get_execution_order(graph: nx.DiGraph) -> List[str]:
"""
Get the topological ordering of tasks for execution.
Args:
graph (nx.DiGraph): The task dependency graph
Returns:
List[str]: List of task names in execution order
"""
return list(nx.topological_sort(graph))
[docs]
def get_task_by_name(graph: nx.DiGraph, task_name: str) -> Task:
"""
Get a Task object by name from the graph.
Args:
graph (nx.DiGraph): The task dependency graph
task_name (str): Name of the task to retrieve
Returns:
Task: The Task object
Raises:
KeyError: If task_name is not found in the graph
"""
if task_name not in graph:
raise KeyError(f"Task '{task_name}' not found in graph")
return graph.nodes[task_name]['task']