Compute Node Model

A pyCyto compute node is any Python callable that accepts a dict and returns a dict. This simple contract is what makes the pipeline composable, testable in isolation, and runnable without modification across notebooks, CLI, and SLURM.


The Pure Function Model

        graph LR
    A["input_dict"] --> B["Module(params)"]
    B --> C["output_dict"]

    classDef io fill:#1e293b,color:#94a3b8,stroke:#334155
    classDef mod fill:#0d7377,color:#fff,stroke:#0a5c60
    class A,C io
    class B mod
    

Every node is:

  • Stateless per call: all algorithm state lives in __init__ parameters; the __call__ method is side-effect-free (except for optional file I/O via output: true)

  • Type-agnostic: the dict can carry any data — the node only accesses the keys it needs

  • Scheduler-blind: the node has no knowledge of SLURM, MPI, or job arrays; the compute spec is sideloaded by the runner

Distinction between params and compute spec

Concern

Where defined

Example

Algorithm params

__init__ arguments / pipeline YAML args:

model_type, threshold, diameter

Compute spec

pipeline-resources.yaml per stage tag

partition, mem, gres, batch_size

The node class never reads the resource config. This allows the same YAML pipeline to run locally (no resource config) or distributed (resource config supplied separately).


Dict I/O Contract

Nodes communicate exclusively through a Python dict with standardized keys:

Key

Type

Shape / format

Direction

"image"

Dask or NumPy array

(T, Y, X) float32

in / out

"label"

Dask or NumPy array

(T, Y, X) uint32, background=0

in / out

"dataframe"

pandas or Dask DataFrame

one row per cell per frame

in / out

"network"

NetworkX Graph

nodes = cell IDs, edges = contacts

in / out

Nodes consume only the keys they need and return only the keys they produce. They may pass through keys unchanged, but must not silently modify keys they are not responsible for.

Input/output key table by stage

Stage

Reads

Writes

Preprocessing

"image"

"image"

Segmentation

"image" (or "label" for refinement)

"label"

Tabulation

"image", "label"

"dataframe"

Tracking

"dataframe"

"dataframe" (adds track_id)

Postprocessing

any combination

any combination


Minimal End-to-End Example

1. Define the node

from tqdm import tqdm


class ThresholdSegmentation:
    def __init__(self, threshold=0.5, verbose=True):
        self.name      = "ThresholdSegmentation"
        self.threshold = threshold
        self.verbose   = verbose

    def __call__(self, data: dict) -> dict:
        image = data["image"]                      # read
        if self.verbose:
            tqdm.write(f"[{self.name}] {image.shape}")
        label = (image > self.threshold).astype("uint32")
        return {"label": label}                    # write

2. Call it from Python / Jupyter

import numpy as np
from my_module import ThresholdSegmentation

seg = ThresholdSegmentation(threshold=0.4)
result = seg({"image": np.random.rand(10, 512, 512).astype("float32")})
print(result["label"].shape)    # (10, 512, 512)

3. Wire it into a pipeline YAML

pipeline:
  segmentation:
    - name: ThresholdSegmentation
      tag: Threshold_TCell
      channels: [TCell]
      input_type: image
      args:
        threshold: 0.4
        verbose: true
      output_type: label
      output: true

4. Add a resource spec for SLURM

# configs/distributed/pipeline-resources.yaml
pipeline:
  segmentation:
    Threshold_TCell:
      partition: short
      cpus-per-task: 4
      mem: 16G
      time: "01:00:00"
      batch_size: 100
      dependency: Normalize
      dependency_type: afterok

Dask Arrays

Nodes should return Dask arrays (lazy) rather than NumPy arrays (eager) wherever possible. This allows downstream stages to fuse operations and avoids loading the full time-series into memory.

import dask.array as da

# Prefer:
result = da.map_blocks(my_fn, image, dtype="float32")

# Avoid unless necessary:
result = my_fn(image.compute())

Call .compute() only at the final write step (output: true), not inside node logic.


Testing a Node in Isolation

import numpy as np
from my_module import ThresholdSegmentation

def test_threshold_seg():
    rng   = np.random.default_rng(0)
    image = rng.random((5, 64, 64), dtype="float32")

    seg    = ThresholdSegmentation(threshold=0.5, verbose=False)
    result = seg({"image": image})

    assert "label" in result
    assert result["label"].shape == image.shape
    assert result["label"].dtype == np.uint32

See Boilerplate Templates for a complete test stub.


See Also