Source code for cyto.tasks.base

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional

[docs] class PipelineTask(ABC): """ Base class for all pipeline tasks, providing a common interface for execution in different environments (baremetal, container). """ def __init__(self, execution_config: Optional[Dict[str, Any]] = None) -> None: self.execution_config = execution_config or {} def __call__(self, data: Any) -> Any: """ Executes the task either in a container or on bare metal based on the execution_config. """ if self.execution_config.get("type") == "container": return self.run_container(data) else: return self.run_baremetal(data) def _get_runner(self): """Factory function to get the appropriate container runner.""" runner_type = self.execution_config.get("runner", "docker") if runner_type == "docker": # Late import to avoid circular dependencies and keep dependencies optional from cyto.runners.docker import DockerRunner return DockerRunner(self.execution_config) elif runner_type == "singularity": raise NotImplementedError("Singularity runner is not yet implemented.") else: raise ValueError(f"Unknown runner type: {runner_type}")
[docs] @abstractmethod def run_baremetal(self, data: Dict[str, Any]) -> Dict[str, Any]: """ The core logic of the task to be executed on the local machine. This method must be implemented by all subclasses. Args: data (Dict[str, Any]): Input data for the task Returns: Dict[str, Any]: Results from task execution """ pass
[docs] def run_container(self, data: Dict[str, Any]) -> Dict[str, Any]: """ The core logic of the task to be executed in a container. This method provides the default container execution logic that can be used by subclasses or overridden for specialized behavior. The container execution flow: 1. Get the appropriate runner (Docker/Singularity) from execution_config. 2. Pass the task (self) and data to the runner. 3. The runner serializes the task and data, runs them in a container, and deserializes the result. 4. Inside the container, the container_worker calls run_baremetal(). Args: data (Dict[str, Any]): Input data for the task Returns: Dict[str, Any]: Results from container execution """ print("Running containerized task...") # Get the configured runner (Docker or Singularity) runner = self._get_runner() # Execute the task in the container # The runner will: # 1. Serialize this task object and the input data # 2. Run a container with the specified image # 3. Execute container_worker.py inside the container # 4. container_worker calls self.run_baremetal(data) # 5. Deserialize and return the results results = runner.run(self, data) return results