Source code for cyto.tasks.graph

import yaml
import networkx as nx
import os
from typing import Dict, Any, List, Optional
from cyto.tasks.definitions import Task

[docs] def build_task_graph(config_path: str, resources_path: Optional[str] = None) -> tuple[nx.DiGraph, Optional[Dict]]: """ Build a directed acyclic graph (DAG) of tasks from a YAML pipeline configuration. Args: config_path (str): Path to the YAML configuration file resources_path (str, optional): Path to the resources YAML file Returns: tuple: (NetworkX directed graph, resources configuration dict) """ # Load the YAML configuration with open(config_path, 'r') as f: config = yaml.safe_load(f) if 'tasks' not in config: raise ValueError("Configuration file must contain a 'tasks' section") # Create a directed graph graph = nx.DiGraph() # Create Task objects and add them as nodes tasks = {} for task_name, task_config in config['tasks'].items(): task = Task( name=task_name, module=task_config['module'], params=task_config.get('params', {}), dependencies=task_config.get('dependencies', []), execution_config=task_config.get('execution_config'), tags=task_config.get('tags', []) ) tasks[task_name] = task graph.add_node(task_name, task=task) # Add edges based on dependencies for task_name, task in tasks.items(): for dependency in task.dependencies: if dependency not in tasks: raise ValueError(f"Task '{task_name}' depends on '{dependency}' which is not defined") # Add edge from dependency to task (dependency -> task) graph.add_edge(dependency, task_name) # Check for cycles in the graph if not nx.is_directed_acyclic_graph(graph): cycle = nx.find_cycle(graph) raise ValueError(f"Circular dependency detected in task graph: {cycle}") # Load resources configuration if provided resources_config = None if resources_path and os.path.exists(resources_path): with open(resources_path, 'r') as f: resources_config = yaml.safe_load(f) return graph, resources_config
[docs] def get_execution_order(graph: nx.DiGraph) -> List[str]: """ Get the topological ordering of tasks for execution. Args: graph (nx.DiGraph): The task dependency graph Returns: List[str]: List of task names in execution order """ return list(nx.topological_sort(graph))
[docs] def get_task_by_name(graph: nx.DiGraph, task_name: str) -> Task: """ Get a Task object by name from the graph. Args: graph (nx.DiGraph): The task dependency graph task_name (str): Name of the task to retrieve Returns: Task: The Task object Raises: KeyError: If task_name is not found in the graph """ if task_name not in graph: raise KeyError(f"Task '{task_name}' not found in graph") return graph.nodes[task_name]['task']