import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
from typing import Any
import pandas as pd
from tqdm import tqdm
import os
import operator
import numpy as np
from scipy.optimize import curve_fit
import dask.dataframe as dd
[docs]
class CrossTableOperation(object):
def __init__(self, column, operation, verbose=True):
"""
Perform cross pandas table operations with given column name
Args:
column (str): Column name of the inter table operation
operation (str): Data operation
verbose (bool): Turn on or off the processing printout
"""
self.name = "CrossTableOperation"
self.column = column
self.operation = operation
self.verbose = verbose
def __call__(self, data) -> Any:
features = data["features"]
features_out = features[0]
if self.operation == "divide":
features_out["{}_{}".format(self.column,self.operation)] = features[0][self.column]/features[1][self.column]
return {"feature": features_out}
[docs]
class FeatureFilter(object):
# Mapping of comparison symbols to corresponding functions from the 'operator' module
OPS = {
'<': operator.lt,
'>': operator.gt,
'==': operator.eq,
'!=': operator.ne,
'<=': operator.le,
'>=': operator.ge,
}
def __init__(self, column, operation, value, verbose=True):
"""
Perform feature filtering in the pandas table with given column name
Args:
column (str): Column name to apply the feature filter
operation (str): Comparison symbols in string
value (double): Feature filter value
"""
self.name = "FeatureFilter"
self.column = column
self.operation = operation
self.value = value
self.verbose = verbose
def __call__(self, data) -> Any:
feature = data["feature"]
# Perform the comparison using the specified operator
mask = self.OPS[self.operation](feature[self.column], self.value)
# Apply the mask to filter the DataFrame
feature_out = feature[mask]
return {"feature": feature_out}
[docs]
def intensity_norm(df,channel="mean", suffix="norm",lower=0,upper=1):
"""
Calculate the normalized cell signal by user given lower and upper bounds and add it as a new column in the dataframe.
Parameters:
- df: pandas DataFrame, containing the fluorescence intensity data.
- channel: str, column name for normalization to take place. (default=mean)
- suffix: str, suffix appending to the normalized channel output in format of {channel}_{suffix} (default=norm).
- lower: float, lower bound value for min clipping range (default: 0)
- upper: float, upper bound value for max clipping range (default: 1)
Returns:
- df: pandas DataFrame, with a new column '{channel}_{suffix}' containing the normalized signal.
"""
df["{}_{}".format(channel, suffix)] = df[channel].clip(lower=lower, upper=upper)
df["{}_{}".format(channel, suffix)] = (df["{}_{}".format(channel, suffix)] - df["{}_{}".format(channel, suffix)].min()) / (df["{}_norm".format(channel)].max() - df["{}_norm".format(channel)].min())
return df
[docs]
def intensity_norm_percentile(df,channel="mean", suffix="norm",percentile=1):
"""
Calculate the normalized cell signal by percentile clippings and add it as a new column in the dataframe.
Parameters:
- df: pandas DataFrame, containing the fluorescence intensity data.
- channel: str, column name for normalization to take place. (default=mean)
- suffix: str, suffix appending to the normalized channel output in format of {channel}_{suffix} (default=norm).
- percentile: float, percentile value for min/max range (default: 1)
Returns:
- df: pandas DataFrame, with a new column '{channel}_{suffix}' containing the normalized signal.
- lp: float, the lower percentile value.
- up: float, the upper percentile value.
"""
lp = df[channel].quantile(percentile/100.)
up = df[channel].quantile(1-percentile/100.)
df = intensity_norm(df, channel=channel, suffix=suffix, lower=lp, upper=up)
return df, lp, up
[docs]
def left_table_merge(df1, df2, on=None):
"""
Perform a left merge between two DataFrames (pandas or Dask),
keeping only unique columns and renaming overlapping columns.
Parameters:
df1: pandas.DataFrame or dask.dataframe.DataFrame
The left DataFrame.
df2: pandas.DataFrame or dask.dataframe.DataFrame
The right DataFrame.
on: list or str
The column(s) to merge on.
Returns:
A DataFrame (pandas or Dask) resulting from the left merge.
"""
# Perform the merge
df = df1.merge(df2, on=on, suffixes=('_left', '_right'), how='left')
# Keep only the columns from df1 and any non-conflicting columns from df2
columns_to_keep = [col for col in df.columns if not col.endswith('_right')]
# Handle column renaming for overlapping columns
rename_dict = {col: col.replace('_left', '') for col in columns_to_keep if col.endswith('_left')}
# Select and rename the columns
df = df[columns_to_keep].rename(columns=rename_dict)
return df
[docs]
def cal_viability(df,pos_cols=[],neg_cols=[]):
#TODO: speed up with dask?
def viability_helper(row):
exp_pos = 0
exp_neg = 0
for pos in pos_cols:
exp_pos += np.exp(-10*(row[pos]-0.5))
for neg in neg_cols:
exp_neg += np.exp(-10*(0.5-row[neg]))
via = 1/(1+(exp_pos+exp_neg)/(len(pos_cols)+len(neg_cols)))
return via
df["viability"] = df.apply(viability_helper, axis=1)
return df
[docs]
def calculate_cdi(df, viability_col, death_col, out_col="CDI"):
"""
Calculate the Cell Death Index (CDI) and add it as a new column in the dataframe.
CDI = I_death_norm / (I_death_norm + I_live_norm)
Parameters
----------
df : pandas.DataFrame
DataFrame containing the fluorescence intensity data.
viability_col : str
Column name for the viability channel (e.g., live cell marker).
death_col : str
Column name for the death channel (e.g., dead cell marker).
out_col : str
Column name for CDI output (default: ``"CDI"``).
Returns
-------
pandas.DataFrame
Input DataFrame with a new column containing the calculated CDI values.
"""
# Avoid division by zero by adding a small value to the denominator
epsilon = 1e-10
df[out_col] = df[death_col] / (df[viability_col] + df[death_col] + epsilon)
return df
[docs]
def gaussian_kernel(size, sigma):
"""Creates a 1D Gaussian kernel."""
x = np.arange(-size // 2 + 1, size // 2 + 1)
kernel = np.exp(-x**2 / (2 * sigma**2))
return kernel / kernel.sum()
[docs]
def apply_gaussian_smoothing(values, sigma):
"""Applies Gaussian smoothing using NumPy."""
kernel_size = int(6 * sigma + 1) # Common heuristic for kernel size
kernel = gaussian_kernel(kernel_size, sigma)
smoothed_values = np.convolve(values, kernel, mode='same')
return smoothed_values
[docs]
def compute_central_difference(values):
"""Computes the gradient using the central finite difference method."""
gradient = np.zeros_like(values)
gradient[1:-1] = (values[2:] - values[:-2]) / 2 # Central difference
gradient[0] = values[1] - values[0] # Forward difference for the first element
gradient[-1] = values[-1] - values[-2] # Backward difference for the last element
return gradient
[docs]
def compute_moving_average(data, window_size):
"""Calculates the moving average of a given data array."""
return np.convolve(data, np.ones(window_size) / window_size, mode='same')
[docs]
def model_func(x, a, b, c):
"""A sample model function for fitting (e.g., a quadratic model)."""
return a * x**2 + b * x + c # Change this model based on your needs
[docs]
def fit_model(x, y):
"""Fit the model to the data using Levenberg-Marquardt and return fitted values."""
# 2nd order LM fit requires min of 3 points
if len(x) < 3:
return y
params, _ = curve_fit(model_func, x, y)
return model_func(x, *params)
[docs]
def compute_smoothed_gradient(df, track_id_col='track_id', frame_col='frame', value_col='scalar_value', sigma=2, ma_window=10, smooth_method="gaussian", grad_method="forward"):
"""
Computes a smoothed time gradient for noisy scalar values in a pandas DataFrame.
Parameters:
- df: pd.DataFrame containing the data with columns for track ID, frame, and scalar values.
- track_col: str, name of the column representing track IDs.
- frame_col: str, name of the column representing frames (time points).
- value_col: str, name of the column representing scalar values.
- sigma: float, the standard deviation for Gaussian smoothing (controls the smoothness).
- ma_window: int, moving average window size
- smooth_method: str, smooth method between Gaussian smoothing, Levenberg Marquardt method and moving average filter, allowed options: "gaussian", "lm", "ma"
- grad_method: str, numerical gradient difference method, allow options: "forward", "central"
Returns:
- pd.DataFrame with additional columns for smoothed scalar values and gradients.
"""
# if isinstance(df, pd.DataFrame):
# # Estimate the size of the DataFrame in bytes
# df_size = df.memory_usage(deep=True).sum()
# # Define a target partition size (e.g., 1 GB)
# target_partition_size = 0.1*1024 * 1024 * 1024 # 1GB
# # Calculate the number of partitions
# npartitions = max(1, int(df_size / target_partition_size)) # At least 1 partition
# print("npartitions:", npartitions)
# df = dd.from_pandas(df,npartitions)
# Step 1: Sort the DataFrame by track_id and frame
df_sorted = df.sort_values(by=[track_id_col, frame_col])
# Step 2: Interpolate missing frames within each track_id
df_interpolated = df_sorted.groupby(track_id_col).apply(
lambda group: group.set_index(frame_col).reindex(
range(group[frame_col].min(), group[frame_col].max() + 1)
).interpolate().reset_index()
).reset_index(drop=True)
# Step 3: Apply Gaussian smoothing to the scalar values
if smooth_method == "gaussian":
df_interpolated["{}_smoothed".format(value_col)] = df_interpolated.groupby(track_id_col)[value_col].transform(
lambda x: apply_gaussian_smoothing(x, sigma=sigma)
)
elif smooth_method == "lm":
df_interpolated["{}_smoothed".format(value_col)] = df_interpolated.groupby(track_id_col).apply(lambda df: fit_model(df[frame_col].values,df[value_col].values))
elif smooth_method == "ma":
df_interpolated["{}_smoothed".format(value_col)] = df_interpolated.groupby(track_id_col)[value_col].transform(
lambda x: compute_moving_average(x, window_size=ma_window)
)
# Step 4: Compute the gradient (rate of change) of the smoothed scalar values
if grad_method == "forward":
df_interpolated['{}_grad'.format(value_col)] = df_interpolated.groupby(track_id_col)["{}_smoothed".format(value_col)].transform(np.gradient)
elif grad_method == "central":
# central finite difference method
df_interpolated['{}_grad'.format(value_col)] = df_interpolated.groupby(track_id_col)["{}_smoothed".format(value_col)].transform(
lambda x: compute_central_difference(x.values)
)
# Step 5: Merge the interpolated data back to the original dataframe based on track_id and frame
result = pd.merge(df, df_interpolated[[track_id_col, frame_col, "{}_smoothed".format(value_col), '{}_grad'.format(value_col)]],
on=[track_id_col, frame_col], how='left')
return result
from scipy.signal import savgol_filter
[docs]
def compute_savgol_filter(df, track_id_col="track_id", frame_col="frame", value_col='scalar_value', window_length=10, polyorder=3):
"""
Applies the Savitzky-Golay filter to smooth the signal and compute its derivative.
Parameters:
-----------
df : pd.DataFrame
DataFrame containing the time and signal columns.
track_id_col: str
Column name of the cell track id in the DataFrame.
frame_col : str
Column name of the time data in the DataFrame.
value_col : str
Column name of the value data in the DataFrame.
window_length : int, optional
The length of the filter window (must be odd), default is 11.
polyorder : int, optional
The order of the polynomial used to fit the samples, default is 3.
Returns:
--------
df : pd.DataFrame
The input DataFrame with additional columns for the smoothed signal and its derivative.
"""
if isinstance(df, pd.DataFrame):
# Estimate the size of the DataFrame in bytes
df_size = df.memory_usage(deep=True).sum()
# Define a target partition size (e.g., 1 GB)
target_partition_size = 0.1*1024 * 1024 * 1024 # 1GB
# Calculate the number of partitions
npartitions = max(1, int(df_size / target_partition_size)) # At least 1 partition
print("npartitions:", npartitions)
ddf = dd.from_pandas(df,npartitions)
else:
ddf = df
# Step 1: Sort the DataFrame by track_id and frame
ddf_sorted = ddf.sort_values(by=[track_id_col, frame_col])
# Step 2: Interpolate missing frames within each track_id
def interpolate(group):
# Remove duplicate indices before reindexing
group = group.drop_duplicates(subset=[frame_col])
res = group.set_index(frame_col).reindex(
range(group[frame_col].min(), group[frame_col].max() + 1)
)
res[value_col] = res[value_col].interpolate()
# Reset the index
res = res.reset_index()
return res
df_interpolated = ddf_sorted.groupby(track_id_col).apply(interpolate).reset_index(drop=True)
# Step 3: Compute smoothed signal using Savitzky-Golay filter
def apply_savgol(group):
# Compute smoothed signal for the current group
if polyorder >= len(group[value_col]):
polyorder_ = len(group[value_col])-1
else:
polyorder_ = polyorder
if len(group[value_col]) < window_length:
group["{}_smoothed".format(value_col)] = savgol_filter(group[value_col], window_length=len(group[value_col]), polyorder=polyorder_)
else:
group["{}_smoothed".format(value_col)] = savgol_filter(group[value_col], window_length=window_length, polyorder=polyorder_)
# group["{}_smoothed".format(value_col)] = group[value_col]
# Compute the first derivative
# group["{}_grad".format(value_col)] = savgol_filter(group[value_col], window_length=window_length, polyorder=polyorder, deriv=1, delta=1)
if len(group["{}_smoothed".format(value_col)]) >= 2:
group["{}_grad".format(value_col)] = np.gradient(group["{}_smoothed".format(value_col)])
else:
group["{}_grad".format(value_col)] = np.nan
return group
df_interpolated = df_interpolated.groupby(track_id_col).apply(apply_savgol).reset_index(drop=True)
# Step 5: Merge the interpolated data back to the original dataframe based on track_id and frame
# keep same column types
df[track_id_col] = df[track_id_col].astype(int)
df[frame_col] = df[frame_col].astype(int)
df_interpolated[track_id_col] = df_interpolated[track_id_col].astype(int)
df_interpolated[frame_col] = df_interpolated[frame_col].astype(int)
result = dd.merge(df, df_interpolated[[track_id_col, frame_col, "{}_smoothed".format(value_col), '{}_grad'.format(value_col)]],
on=[track_id_col, frame_col], how='left')
return result