Spaces:
Sleeping
Sleeping
| from dataclasses import dataclass | |
| import numpy as np | |
| import scipy.linalg as la | |
| from scipy.signal import find_peaks | |
| from math import ceil | |
| def thin_peaks(peak_list, dmin=10, voxel_size=(1,1,1), return_larger_peaks=False): | |
| """ | |
| Remove peaks within a specified distance of each other, retaining the peak with the highest intensity. | |
| Args: | |
| - peak_list (list of PeakData): Each element contains: | |
| - pos (list of float): 3D coordinates of the peak. | |
| - intensity (float): The intensity value of the peak. | |
| - key (tuple): A unique identifier or index for the peak (#trace, #peak) | |
| - dmin (float, optional): Minimum distance between peaks. peaks closer than this threshold will be thinned. Defaults to 10. | |
| - return_larger_peaks (bool, optional): Indicate larger peak for each thinned peak | |
| Returns: | |
| - list of tuples: A list containing keys of the removed peaks. | |
| if return_larger_peaks | |
| - list of tuples: A list containing the keys of the larger peak causing the peak to be removed | |
| Notes: | |
| - The function uses the L2 norm (Euclidean distance) to compute the distance between peaks. | |
| - When two peaks are within `dmin` distance, the peak with the lower intensity is removed. | |
| """ | |
| removed_peaks = [] | |
| removed_larger_peaks = [] | |
| for i in range(len(peak_list)): | |
| if peak_list[i].key in removed_peaks: | |
| continue | |
| for j in range(len(peak_list)): | |
| if i==j: | |
| continue | |
| if peak_list[j].key in removed_peaks: | |
| continue | |
| d = (np.array(peak_list[i].pos) - np.array(peak_list[j].pos))*np.array(voxel_size) | |
| d = la.norm(d) | |
| if d<dmin: | |
| hi = peak_list[i].intensity | |
| hj = peak_list[j].intensity | |
| if hi<hj: | |
| removed_peaks.append(peak_list[i].key) | |
| removed_larger_peaks.append(peak_list[j].key) | |
| break | |
| else: | |
| removed_peaks.append(peak_list[j].key) | |
| removed_larger_peaks.append(peak_list[i].key) | |
| if return_larger_peaks: | |
| return removed_peaks, removed_larger_peaks | |
| else: | |
| return removed_peaks | |
| class CellData(object): | |
| """Represents data related to a single cell. | |
| Attributes: | |
| pathdata_list (list): A list of PathData objects representing the various paths associated with the cell. | |
| """ | |
| pathdata_list: list | |
| class RemovedPeakData(object): | |
| """Represents data related to a removed peak | |
| Attributes: | |
| idx (int): Index of peak along path | |
| screening_peak (tuple): (path_idx, position along path) for screening peak | |
| """ | |
| idx: int | |
| screening_peak: tuple | |
| class PathData(object): | |
| """Represents data related to a specific path in the cell. | |
| This dataclass encapsulates information about the peaks, | |
| the defining points, the fluorescence values, and the path length of a specific path. | |
| Attributes: peaks (list): List of peaks in the path (indicies of positions in points, o_intensity). | |
| removed_peaks (list): List of peaks in the path which have been removed because of a nearby larger peak | |
| points (list): List of points defining the path. | |
| o_intensity (list): List of (unnormalized) fluorescence intensity values along the path | |
| SC_length (float): Length of the path. | |
| """ | |
| peaks: list | |
| removed_peaks: list | |
| points: list | |
| o_intensity: list | |
| SC_length: float | |
| class PeakData(object): | |
| pos: tuple | |
| intensity: float | |
| key: tuple | |
| def find_peaks2(v, distance=5, prominence=0.5): | |
| """ | |
| Find peaks in a 1D array with extended boundary handling. | |
| The function pads the input array at both ends to handle boundary peaks. It then identifies peaks in the extended array | |
| and maps them back to the original input array. | |
| Args: | |
| - v (numpy.ndarray): 1D input array in which to find peaks. | |
| - distance (int, optional): Minimum number of array elements that separate two peaks. Defaults to 5. | |
| - prominence (float, optional): Minimum prominence required for a peak to be identified. Defaults to 0.5. | |
| Returns: | |
| - list of int: List containing the indices of the identified peaks in the original input array. | |
| - dict: Information about the properties of the identified peaks (as returned by scipy.signal.find_peaks). | |
| """ | |
| pad = int(ceil(distance))+1 | |
| v_ext = np.concatenate([np.ones((pad,), dtype=v.dtype)*np.min(v), v, np.ones((pad,), dtype=v.dtype)*np.min(v)]) | |
| assert(len(v_ext) == len(v)+2*pad) | |
| peaks, _ = find_peaks(v_ext, distance=distance, prominence=prominence) | |
| peaks = peaks - pad | |
| n_peaks = [] | |
| for i in peaks: | |
| if 0<=i<len(v): | |
| n_peaks.append(i) | |
| else: | |
| raise Exception | |
| return n_peaks, _ | |
| def process_cell_traces(all_paths, path_lengths, measured_trace_fluorescence, dmin=10): | |
| """ | |
| Process traces of cells to extract peak information and organize the data. | |
| The function normalizes fluorescence data, finds peaks, refines peak information, | |
| removes unwanted peaks that might be due to close proximity of bright peaks from | |
| other paths, and organizes all the information into a structured data format. | |
| Args: | |
| all_paths (list of list of tuples): A list containing paths, where each path is | |
| represented as a list of 3D coordinate tuples. | |
| path_lengths (list of float): List of path lengths corresponding to the provided paths. | |
| measured_trace_fluorescence (list of list of float): A list containing fluorescence | |
| data corresponding to each path point. | |
| dmin (float): Distance below which brighter peaks screen less bright ones. | |
| Returns: | |
| CellData: An object containing organized peak and path data for a given cell. | |
| Note: | |
| - The function assumes that each path and its corresponding length and fluorescence data | |
| are positioned at the same index in their respective lists. | |
| """ | |
| cell_peaks = [] | |
| for points, o_intensity in zip(all_paths, measured_trace_fluorescence): | |
| # For peak determination normalize each trace to have mean zero and s.d. 1 | |
| intensity_normalized = (o_intensity - np.mean(o_intensity))/np.std(o_intensity) | |
| # Find peaks - these will be further refined later | |
| p,_ = find_peaks2(intensity_normalized, distance=5, prominence=0.5*np.std(intensity_normalized)) | |
| peaks = np.array(p, dtype=np.int32) | |
| # Store peak data - using original values, not normalized ones | |
| peak_mean_heights = [ o_intensity[u] for u in peaks ] | |
| peak_points = [ points[u] for u in peaks ] | |
| cell_peaks.append((peaks, peak_points, peak_mean_heights)) | |
| # Eliminate peaks which have another larger peak nearby (in 3D space, on any chromosome). | |
| # This aims to remove small peaks in the mean intensity generated when an SC passes close | |
| # to a bright peak on another SC - this is nearby in space, but brighter. | |
| to_thin = [] | |
| for k in range(len(cell_peaks)): | |
| for u in range(len(cell_peaks[k][0])): | |
| to_thin.append(PeakData(pos=cell_peaks[k][1][u], intensity=cell_peaks[k][2][u], key=(k, u))) | |
| # Exclude any peak with a nearby brighter peak (on any SC) | |
| removed_peaks, removed_larger_peaks = thin_peaks(to_thin, return_larger_peaks=True, dmin=dmin) | |
| # Clean up and remove these peaks | |
| new_cell_peaks = [] | |
| removed_cell_peaks = [] | |
| removed_cell_peaks_larger = [] | |
| for path_idx in range(len(cell_peaks)): | |
| path_retained_peaks = [] | |
| path_removed_peaks = [] | |
| path_peaks = cell_peaks[path_idx][0] | |
| for peak_idx in range(len(path_peaks)): | |
| if (path_idx, peak_idx) not in removed_peaks: | |
| path_retained_peaks.append(path_peaks[peak_idx]) | |
| else: | |
| # What's the larger point? | |
| idx = removed_peaks.index((path_idx, peak_idx)) | |
| larger_path, larger_idx = removed_larger_peaks[idx] | |
| path_removed_peaks.append(RemovedPeakData(idx=path_peaks[peak_idx], screening_peak=(larger_path, cell_peaks[larger_path][0][larger_idx]))) | |
| ### | |
| new_cell_peaks.append(path_retained_peaks) | |
| removed_cell_peaks.append(path_removed_peaks) | |
| cell_peaks = new_cell_peaks | |
| pd_list = [] | |
| # Save peak positions, absolute intensity intensities, and length for each SC | |
| for k in range(len(all_paths)): | |
| points, o_intensity = all_paths[k], measured_trace_fluorescence[k] | |
| peaks = cell_peaks[k] | |
| removed_peaks = removed_cell_peaks[k] | |
| pd = PathData(peaks=peaks, removed_peaks=removed_peaks, points=points, o_intensity=o_intensity, SC_length=path_lengths[k]) | |
| pd_list.append(pd) | |
| cd = CellData(pathdata_list=pd_list) | |
| return cd | |
| alpha_max = 0.4 | |
| # Criterion used for identifying peak as a focus - normalized (with mean and s.d.) | |
| # intensity levels being above 0.4 time maximum peak level | |
| def focus_criterion(pos, v, alpha=alpha_max): | |
| """ | |
| Identify and return positions where values in the array `v` exceed a certain threshold. | |
| The threshold is computed as `alpha` times the maximum value in `v`. | |
| Args: | |
| - pos (numpy.ndarray): Array of positions. | |
| - v (numpy.ndarray): 1D array of values, e.g., intensities. | |
| - alpha (float, optional): A scaling factor for the threshold. Defaults to `alpha_max`. | |
| Returns: | |
| - numpy.ndarray: Array of positions where corresponding values in `v` exceed the threshold. | |
| """ | |
| if len(v): | |
| idx = (v>=alpha*np.max(v)) | |
| return np.array(pos[idx]) | |
| else: | |
| return np.array([], dtype=np.int32) | |
| def analyse_celldata(cell_data, config): | |
| """ | |
| Analyse the provided cell data to extract focus-related information. | |
| Args: | |
| cd (CellData): An instance of the CellData class containing path data information. | |
| config (dictionary): Configuration dictionary containing 'peak_threshold' and 'threshold_type' | |
| 'peak_threshold' (float) - threshold for calling peaks as foci | |
| 'threshold_type' (str) = 'per-trace', 'per-foci' | |
| Returns: | |
| tuple: A tuple containing: | |
| - foci_rel_intensity (list): List of relative intensities for the detected foci. | |
| - foci_pos (list): List of absolute positions of the detected foci. | |
| - foci_pos_index (list): List of indices of the detected foci. | |
| - screened_foci_data (list): List of RemovedPeakData indicating positions of removed peaks and the index of the larger peak | |
| - trace_median_intensities (list): Per-trace median intensity | |
| - trace_thresholds (list): Per-trace absolute threshold for calling peaks as foci | |
| """ | |
| foci_abs_intensity = [] | |
| foci_pos = [] | |
| foci_pos_index = [] | |
| screened_foci_data = [] | |
| trace_median_intensities = [] | |
| trace_thresholds = [] | |
| peak_threshold = config['peak_threshold'] | |
| threshold_type = config['threshold_type'] | |
| if threshold_type == 'per-trace': | |
| """ | |
| Call extracted peaks as foci if intensity - trace_mean > peak_threshold * (trace_max_foci_intensity - trace_mean) | |
| """ | |
| for path_data in cell_data.pathdata_list: | |
| peaks = np.array(path_data.peaks, dtype=np.int32) | |
| # Normalize extracted fluorescent intensities by subtracting mean (and dividing | |
| # by standard deviation - note that the latter should have no effect on the results). | |
| h = np.array(path_data.o_intensity) | |
| h = h - np.mean(h) | |
| h = h/np.std(h) | |
| # Extract foci according to criterion | |
| foci_idx = focus_criterion(peaks, h[peaks], peak_threshold) | |
| # | |
| removed_peaks = path_data.removed_peaks | |
| removed_peaks_idx = np.array([u.idx for u in removed_peaks], dtype=np.int32) | |
| if len(peaks): | |
| trace_thresholds.append((1-peak_threshold)*np.mean(path_data.o_intensity) + peak_threshold*np.max(np.array(path_data.o_intensity)[peaks])) | |
| else: | |
| trace_thresholds.append(None) | |
| if len(removed_peaks): | |
| if len(peaks): | |
| threshold = (1-peak_threshold)*np.mean(path_data.o_intensity) + peak_threshold*np.max(np.array(path_data.o_intensity)[peaks]) | |
| else: | |
| threshold = float('-inf') | |
| removed_peak_heights = np.array(path_data.o_intensity)[removed_peaks_idx] | |
| screened_foci_idx = np.where(removed_peak_heights>threshold)[0] | |
| screened_foci_data.append([removed_peaks[i] for i in screened_foci_idx]) | |
| else: | |
| screened_foci_data.append([]) | |
| pos_abs = (foci_idx/len(path_data.points))*path_data.SC_length | |
| foci_pos.append(pos_abs) | |
| foci_abs_intensity.append(np.array(path_data.o_intensity)[foci_idx]) | |
| foci_pos_index.append(foci_idx) | |
| trace_median_intensities.append(np.median(path_data.o_intensity)) | |
| elif threshold_type == 'per-cell': | |
| """ | |
| Call extracted peaks as foci if intensity - trace_mean > peak_threshold * max(intensity - trace_mean) | |
| """ | |
| max_cell_intensity = float("-inf") | |
| for path_data in cell_data.pathdata_list: | |
| # Normalize extracted fluorescent intensities by subtracting mean (and dividing | |
| # by standard deviation - note that the latter should have no effect on the results). | |
| h = np.array(path_data.o_intensity) | |
| h = h - np.mean(h) | |
| max_cell_intensity = max(max_cell_intensity, np.max(h)) | |
| for path_data in cell_data.pathdata_list: | |
| peaks = np.array(path_data.peaks, dtype=np.int32) | |
| # Normalize extracted fluorescent intensities by subtracting mean (and dividing | |
| # by standard deviation - note that the latter should have no effect on the results). | |
| h = np.array(path_data.o_intensity) | |
| h = h - np.mean(h) | |
| foci_idx = peaks[h[peaks]>peak_threshold*max_cell_intensity] | |
| removed_peaks = path_data.removed_peaks | |
| removed_peaks_idx = np.array([u.idx for u in removed_peaks], dtype=np.int32) | |
| trace_thresholds.append(np.mean(path_data.o_intensity) + peak_threshold*max_cell_intensity) | |
| if len(removed_peaks): | |
| threshold = np.mean(path_data.o_intensity) + peak_threshold*max_cell_intensity | |
| removed_peak_heights = np.array(path_data.o_intensity)[removed_peaks_idx] | |
| screened_foci_idx = np.where(removed_peak_heights>threshold)[0] | |
| screened_foci_data.append([removed_peaks[i] for i in screened_foci_idx]) | |
| else: | |
| screened_foci_data.append([]) | |
| pos_abs = (foci_idx/len(path_data.points))*path_data.SC_length | |
| foci_pos.append(pos_abs) | |
| foci_abs_intensity.append(np.array(path_data.o_intensity)[foci_idx]) | |
| foci_pos_index.append(foci_idx) | |
| trace_median_intensities.append(np.median(path_data.o_intensity)) | |
| else: | |
| raise NotImplementedError | |
| return foci_abs_intensity, foci_pos, foci_pos_index, screened_foci_data, trace_median_intensities, trace_thresholds | |
| def analyse_traces(all_paths, path_lengths, measured_trace_fluorescence, config): | |
| cd = process_cell_traces(all_paths, path_lengths, measured_trace_fluorescence, dmin=config['screening_distance']) | |
| return analyse_celldata(cd, config) | |