Source code for cyto.utils.kinematics
import pandas as pd
import dask.dataframe as dd
import numpy as np
import dask.array as da
[docs]
def cal_kinematics(tracks, x_col="x", y_col="y", frame_col="frame", track_id_col="track_id",chuck_size=2000, verbose=False):
if isinstance(tracks, pd.DataFrame):
tracks_df = tracks
if verbose:
print(f"Input: pandas DataFrame with {len(tracks_df)} rows")
else:
tracks_df = tracks.compute() if hasattr(tracks, 'compute') else tracks
if verbose:
print(f"Input: converted to pandas DataFrame with {len(tracks_df)} rows")
try:
if verbose:
print("Step 1: Sorting by track and frame...")
# Direct sort is orders of magnitude faster than groupby().apply()
sorted_df = tracks_df.sort_values([track_id_col, frame_col]).reset_index(drop=True)
if verbose:
print("Step 2: Computing dx, dy, dt from previous points...")
# Compute shifts
sorted_df["dx_from_previous_point"] = sorted_df.groupby(track_id_col)[x_col].shift(-1) - sorted_df[x_col]
sorted_df["dy_from_previous_point"] = sorted_df.groupby(track_id_col)[y_col].shift(-1) - sorted_df[y_col]
sorted_df["dt"] = sorted_df.groupby(track_id_col)[frame_col].shift(-1) - sorted_df[frame_col]
if verbose:
print("Step 3: Renaming columns and filling NaN...")
sorted_df = sorted_df.rename(columns={
"dx_from_previous_point": "dx from previous point",
"dy_from_previous_point": "dy from previous point",
"dt": "dt"
}).fillna(0)
if verbose:
print("Step 4: Computing cumulative time...")
# cumulative time
sorted_df["dt acc"] = sorted_df.groupby([track_id_col])[frame_col].transform("min")
sorted_df["dt acc"] = sorted_df[frame_col] - sorted_df["dt acc"]
if verbose:
print("Step 5: Computing displacement from previous point...")
sorted_df["displacement from previous point"] = np.sqrt(
sorted_df["dx from previous point"]**2 + sorted_df["dy from previous point"]**2
).fillna(0)
if verbose:
print("Step 6: Computing displacement from origin...")
sorted_df["dx from origin"] = sorted_df.groupby([track_id_col])['dx from previous point'].cumsum().fillna(0)
sorted_df["dy from origin"] = sorted_df.groupby([track_id_col])['dy from previous point'].cumsum().fillna(0)
sorted_df["displacement from origin"] = np.sqrt(
sorted_df["dx from origin"]**2 + sorted_df["dy from origin"]**2
)
if verbose:
print("Step 7: Computing absolute displacements...")
sorted_df["dx acc"] = sorted_df['dx from previous point'].abs().fillna(0)
sorted_df["dy acc"] = sorted_df['dy from previous point'].abs().fillna(0)
sorted_df["dx acc"] = sorted_df.groupby([track_id_col])['dx acc'].cumsum().fillna(0)
sorted_df["dy acc"] = sorted_df.groupby([track_id_col])['dy acc'].cumsum().fillna(0)
if verbose:
print("Step 8: Computing distance traveled and path efficiency...")
sorted_df["distance traveled"] = sorted_df.groupby([track_id_col])['displacement from previous point'].cumsum().fillna(0)
sorted_df["path efficiency"] = (sorted_df["distance traveled"]/sorted_df["displacement from origin"]).fillna(0).replace([np.inf, -np.inf], 0)
if verbose:
print("Step 9: Computing velocities and speeds...")
# velocity
sorted_df["vel_x"] = (sorted_df["dx from previous point"]/sorted_df["dt"]).fillna(0).replace([np.inf, -np.inf], 0)
sorted_df["vel_y"] = (sorted_df["dy from previous point"]/sorted_df["dt"]).fillna(0).replace([np.inf, -np.inf], 0)
sorted_df["speed"] = (sorted_df["displacement from previous point"]/sorted_df["dt"]).fillna(0).replace([np.inf, -np.inf], 0)
sorted_df["average speed"] = (sorted_df["distance traveled"]/sorted_df["dt acc"]).fillna(0).replace([np.inf, -np.inf], 0)
if verbose:
print("Step 10: Final result ready!")
print(f"Output DataFrame shape: {sorted_df.shape}")
return sorted_df
except Exception as e:
print(f"ERROR in step: {e}")
import traceback
traceback.print_exc()
raise
[docs]
def cal_msd(tracks):
if isinstance(tracks, pd.DataFrame):
tracks_ddf = dd.from_pandas(tracks, chunksize=2000)
else:
tracks_ddf = tracks
tracks_ddf["dr2"] = tracks_ddf["displacement from origin"]**2
msd_mean = tracks_ddf.groupby("frame")["dr2"].mean().to_frame(name='msd')
msd_sd = tracks_ddf.groupby("frame")["dr2"].std().to_frame(name='sd')
msd = dd.concat([msd_mean, msd_sd], axis=1)
msd = msd.reset_index().sort_values(by=["frame"])
return msd.compute()
[docs]
def compute_msd_time_lag(time_lag, data):
msd_list = []
for t in data['frame'].unique():
t1 = t
t2 = t + time_lag
if t2 in data['frame'].values:
displacement_sq = data[(data['frame'] == t2)]['displacement squared'].values
msd = np.mean(np.abs(displacement_sq - data[(data['frame'] == t1)]['displacement squared'].values))
msd_list.append(msd)
return np.mean(msd_list) if msd_list else np.nan, np.std(msd_list) if msd_list else np.nan
[docs]
def compute_msd_track_vectorized(track_data):
"""
Compute time-lagged MSD for all lags at once for a single track.
Replaces calling compute_msd_time_lag in a loop: O(T²) with vectorised
numpy inner operations instead of O(T³) with pure-Python nested loops.
Args:
track_data (pd.DataFrame): Single-track data with 'frame' and
'displacement squared' columns.
Returns:
tuple: (time_lags np.ndarray, msd_values np.ndarray)
"""
track_sorted = track_data.sort_values('frame')
frames = track_sorted['frame'].values.astype(int)
disp_sq = track_sorted['displacement squared'].values.astype(float)
f_min, f_max = frames[0], frames[-1]
n_frames = f_max - f_min + 1
# Dense array indexed by frame offset; NaN for any missing frames
dense = np.full(n_frames, np.nan)
dense[frames - f_min] = disp_sq
lag_values, msd_values = [], []
for lag in range(1, n_frames):
a, b = dense[:-lag], dense[lag:]
valid = ~np.isnan(a) & ~np.isnan(b)
if not valid.any():
break
lag_values.append(lag)
msd_values.append(float(np.mean(np.abs(b[valid] - a[valid]))))
return np.array(lag_values, dtype=float), np.array(msd_values, dtype=float)