Source code for cyto.tasks.base
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
[docs]
class PipelineTask(ABC):
"""
Base class for all pipeline tasks, providing a common interface for execution
in different environments (baremetal, container).
"""
def __init__(self, execution_config: Optional[Dict[str, Any]] = None) -> None:
self.execution_config = execution_config or {}
def __call__(self, data: Any) -> Any:
"""
Executes the task either in a container or on bare metal based on the
execution_config.
"""
if self.execution_config.get("type") == "container":
return self.run_container(data)
else:
return self.run_baremetal(data)
def _get_runner(self):
"""Factory function to get the appropriate container runner."""
runner_type = self.execution_config.get("runner", "docker")
if runner_type == "docker":
# Late import to avoid circular dependencies and keep dependencies optional
from cyto.runners.docker import DockerRunner
return DockerRunner(self.execution_config)
elif runner_type == "singularity":
raise NotImplementedError("Singularity runner is not yet implemented.")
else:
raise ValueError(f"Unknown runner type: {runner_type}")
[docs]
def run_container(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
The core logic of the task to be executed in a container.
This method provides the default container execution logic that can be
used by subclasses or overridden for specialized behavior.
The container execution flow:
1. Get the appropriate runner (Docker/Singularity) from execution_config.
2. Pass the task (self) and data to the runner.
3. The runner serializes the task and data, runs them in a container,
and deserializes the result.
4. Inside the container, the container_worker calls run_baremetal().
Args:
data (Dict[str, Any]): Input data for the task
Returns:
Dict[str, Any]: Results from container execution
"""
print("Running containerized task...")
# Get the configured runner (Docker or Singularity)
runner = self._get_runner()
# Execute the task in the container
# The runner will:
# 1. Serialize this task object and the input data
# 2. Run a container with the specified image
# 3. Execute container_worker.py inside the container
# 4. container_worker calls self.run_baremetal(data)
# 5. Deserialize and return the results
results = runner.run(self, data)
return results