Source code for cyto.utils.kinematics

import pandas as pd
import dask.dataframe as dd
import numpy as np
import dask.array as da

[docs] def cal_kinematics(tracks, x_col="x", y_col="y", frame_col="frame", track_id_col="track_id",chuck_size=2000, verbose=False): if isinstance(tracks, pd.DataFrame): tracks_df = tracks if verbose: print(f"Input: pandas DataFrame with {len(tracks_df)} rows") else: tracks_df = tracks.compute() if hasattr(tracks, 'compute') else tracks if verbose: print(f"Input: converted to pandas DataFrame with {len(tracks_df)} rows") try: if verbose: print("Step 1: Sorting by track and frame...") # Direct sort is orders of magnitude faster than groupby().apply() sorted_df = tracks_df.sort_values([track_id_col, frame_col]).reset_index(drop=True) if verbose: print("Step 2: Computing dx, dy, dt from previous points...") # Compute shifts sorted_df["dx_from_previous_point"] = sorted_df.groupby(track_id_col)[x_col].shift(-1) - sorted_df[x_col] sorted_df["dy_from_previous_point"] = sorted_df.groupby(track_id_col)[y_col].shift(-1) - sorted_df[y_col] sorted_df["dt"] = sorted_df.groupby(track_id_col)[frame_col].shift(-1) - sorted_df[frame_col] if verbose: print("Step 3: Renaming columns and filling NaN...") sorted_df = sorted_df.rename(columns={ "dx_from_previous_point": "dx from previous point", "dy_from_previous_point": "dy from previous point", "dt": "dt" }).fillna(0) if verbose: print("Step 4: Computing cumulative time...") # cumulative time sorted_df["dt acc"] = sorted_df.groupby([track_id_col])[frame_col].transform("min") sorted_df["dt acc"] = sorted_df[frame_col] - sorted_df["dt acc"] if verbose: print("Step 5: Computing displacement from previous point...") sorted_df["displacement from previous point"] = np.sqrt( sorted_df["dx from previous point"]**2 + sorted_df["dy from previous point"]**2 ).fillna(0) if verbose: print("Step 6: Computing displacement from origin...") sorted_df["dx from origin"] = sorted_df.groupby([track_id_col])['dx from previous point'].cumsum().fillna(0) sorted_df["dy from origin"] = sorted_df.groupby([track_id_col])['dy from previous point'].cumsum().fillna(0) sorted_df["displacement from origin"] = np.sqrt( sorted_df["dx from origin"]**2 + sorted_df["dy from origin"]**2 ) if verbose: print("Step 7: Computing absolute displacements...") sorted_df["dx acc"] = sorted_df['dx from previous point'].abs().fillna(0) sorted_df["dy acc"] = sorted_df['dy from previous point'].abs().fillna(0) sorted_df["dx acc"] = sorted_df.groupby([track_id_col])['dx acc'].cumsum().fillna(0) sorted_df["dy acc"] = sorted_df.groupby([track_id_col])['dy acc'].cumsum().fillna(0) if verbose: print("Step 8: Computing distance traveled and path efficiency...") sorted_df["distance traveled"] = sorted_df.groupby([track_id_col])['displacement from previous point'].cumsum().fillna(0) sorted_df["path efficiency"] = (sorted_df["distance traveled"]/sorted_df["displacement from origin"]).fillna(0).replace([np.inf, -np.inf], 0) if verbose: print("Step 9: Computing velocities and speeds...") # velocity sorted_df["vel_x"] = (sorted_df["dx from previous point"]/sorted_df["dt"]).fillna(0).replace([np.inf, -np.inf], 0) sorted_df["vel_y"] = (sorted_df["dy from previous point"]/sorted_df["dt"]).fillna(0).replace([np.inf, -np.inf], 0) sorted_df["speed"] = (sorted_df["displacement from previous point"]/sorted_df["dt"]).fillna(0).replace([np.inf, -np.inf], 0) sorted_df["average speed"] = (sorted_df["distance traveled"]/sorted_df["dt acc"]).fillna(0).replace([np.inf, -np.inf], 0) if verbose: print("Step 10: Final result ready!") print(f"Output DataFrame shape: {sorted_df.shape}") return sorted_df except Exception as e: print(f"ERROR in step: {e}") import traceback traceback.print_exc() raise
[docs] def cal_msd(tracks): if isinstance(tracks, pd.DataFrame): tracks_ddf = dd.from_pandas(tracks, chunksize=2000) else: tracks_ddf = tracks tracks_ddf["dr2"] = tracks_ddf["displacement from origin"]**2 msd_mean = tracks_ddf.groupby("frame")["dr2"].mean().to_frame(name='msd') msd_sd = tracks_ddf.groupby("frame")["dr2"].std().to_frame(name='sd') msd = dd.concat([msd_mean, msd_sd], axis=1) msd = msd.reset_index().sort_values(by=["frame"]) return msd.compute()
[docs] def compute_msd_time_lag(time_lag, data): msd_list = [] for t in data['frame'].unique(): t1 = t t2 = t + time_lag if t2 in data['frame'].values: displacement_sq = data[(data['frame'] == t2)]['displacement squared'].values msd = np.mean(np.abs(displacement_sq - data[(data['frame'] == t1)]['displacement squared'].values)) msd_list.append(msd) return np.mean(msd_list) if msd_list else np.nan, np.std(msd_list) if msd_list else np.nan
[docs] def compute_msd_track_vectorized(track_data): """ Compute time-lagged MSD for all lags at once for a single track. Replaces calling compute_msd_time_lag in a loop: O(T²) with vectorised numpy inner operations instead of O(T³) with pure-Python nested loops. Args: track_data (pd.DataFrame): Single-track data with 'frame' and 'displacement squared' columns. Returns: tuple: (time_lags np.ndarray, msd_values np.ndarray) """ track_sorted = track_data.sort_values('frame') frames = track_sorted['frame'].values.astype(int) disp_sq = track_sorted['displacement squared'].values.astype(float) f_min, f_max = frames[0], frames[-1] n_frames = f_max - f_min + 1 # Dense array indexed by frame offset; NaN for any missing frames dense = np.full(n_frames, np.nan) dense[frames - f_min] = disp_sq lag_values, msd_values = [], [] for lag in range(1, n_frames): a, b = dense[:-lag], dense[lag:] valid = ~np.isnan(a) & ~np.isnan(b) if not valid.any(): break lag_values.append(lag) msd_values.append(float(np.mean(np.abs(b[valid] - a[valid])))) return np.array(lag_values, dtype=float), np.array(msd_values, dtype=float)