Source code for ms_entropy.spectra.tools

#!/usr/bin/env python3
import numpy as np


[docs] def clean_spectrum( peaks, min_mz: float = -1.0, max_mz: float = -1.0, noise_threshold: float = 0.01, min_ms2_difference_in_da: float = 0.05, min_ms2_difference_in_ppm: float = -1.0, max_peak_num: int = -1, normalize_intensity: bool = True, **kwargs, ) -> np.ndarray: """ Clean, centroid, and normalize a spectrum with the following steps: 1. Remove empty peaks (m/z <= 0 or intensity <= 0). 2. Remove peaks with m/z >= max_mz or m/z <= min_mz. 3. Centroid the spectrum by merging peaks within min_ms2_difference_in_da. 4. Remove peaks with intensity < noise_threshold * max_intensity. 5. Keep only the top max_peak_num peaks. 6. Normalize the intensity to sum to 1. Parameters ---------- peaks : np.ndarray in shape (n_peaks, 2), dtype=np.float32 or list[list[float, float]] A 2D array of shape (n_peaks, 2) where the first column is m/z and the second column is intensity. min_mz : float, optional The minimum m/z to keep. Defaults to None, which will skip removing peaks with m/z <= min_mz. max_mz : float, optional The maximum m/z to keep. Defaults to None, which will skip removing peaks with m/z >= max_mz. noise_threshold : float, optional The minimum intensity to keep. Defaults to 0.01, which will remove peaks with intensity < 0.01 * max_intensity. min_ms2_difference_in_da : float, optional The minimum m/z difference between two peaks in the resulting spectrum. Defaults to 0.05, which will merge peaks within 0.05 Da. If a negative value is given, the min_ms2_difference_in_ppm will be used instead. min_ms2_difference_in_ppm : float, optional The minimum m/z difference between two peaks in the resulting spectrum. Defaults to -1, which will use the min_ms2_difference_in_da instead. If a negative value is given, the min_ms2_difference_in_da will be used instead. ** Note either min_ms2_difference_in_da or min_ms2_difference_in_ppm must be positive. If both are positive, min_ms2_difference_in_ppm will be used. ** max_peak_num : int, optional The maximum number of peaks to keep. Defaults to None, which will keep all peaks. normalize_intensity : bool, optional Whether to normalize the intensity to sum to 1. Defaults to True. If False, the intensity will be kept as is. **kwargs : optional Those keyword arguments will be ignored. _ Returns ------- np.ndarray in shape (n_peaks, 2), dtype=np.float32 The cleaned spectrum will be guaranteed to be sorted by m/z in ascending order. """ # Check the input spectrum and convert it to numpy array with shape (n, 2) and dtype np.float32. peaks = np.asarray(peaks, dtype=np.float32, order="C") if len(peaks) == 0: return np.zeros((0, 2), dtype=np.float32) assert peaks.ndim == 2 and peaks.shape[1] == 2, "The input spectrum must be a 2D numpy array with shape (n, 2)." # Step 1. Remove empty peaks (m/z <= 0 or intensity <= 0). peaks = peaks[np.bitwise_and(peaks[:, 0] > 0, peaks[:, 1] > 0)] # Step 2. Remove peaks with m/z >= max_mz or m/z <= min_mz. if min_mz is not None and min_mz > 0: peaks = peaks[peaks[:, 0] >= min_mz] if max_mz is not None and max_mz > 0: peaks = peaks[peaks[:, 0] <= max_mz] if peaks.shape[0] == 0: return np.zeros((0, 2), dtype=np.float32) # Step 3. Centroid the spectrum by merging peaks within min_ms2_difference_in_da. # The peaks will be sorted by m/z in ascending order. if min_ms2_difference_in_ppm > 0: peaks = centroid_spectrum(peaks, ms2_da=-1, ms2_ppm=min_ms2_difference_in_ppm) elif min_ms2_difference_in_da > 0: peaks = centroid_spectrum(peaks, ms2_da=min_ms2_difference_in_da, ms2_ppm=-1) else: raise ValueError("Either min_ms2_difference_in_da or min_ms2_difference_in_ppm must be positive.") # Step 4. Remove peaks with intensity < noise_threshold * max_intensity. if noise_threshold is not None: peaks = peaks[peaks[:, 1] >= noise_threshold * np.max(peaks[:, 1])] # Step 5. Keep only the top max_peak_num peaks. if max_peak_num is not None and max_peak_num > 0: # Sort the spectrum by intensity. peaks = peaks[np.argsort(peaks[:, 1])[-max_peak_num:]] # Sort the spectrum by m/z. peaks = peaks[np.argsort(peaks[:, 0])] # Step 6. Normalize the intensity to sum to 1. if normalize_intensity: spectrum_sum = np.sum(peaks[:, 1]) if spectrum_sum > 0: peaks[:, 1] /= spectrum_sum else: peaks = np.zeros((0, 2), dtype=np.float32) return peaks
def centroid_spectrum(peaks: np.ndarray, ms2_da: float = -1, ms2_ppm: float = -1) -> np.ndarray: """ Centroid a spectrum by merging peaks within the +/- ms2_ppm or +/- ms2_da. Parameters ---------- peaks : np.ndarray in shape (n_peaks, 2), dtype=np.float32 A 2D array of shape (n_peaks, 2) where the first column is m/z and the second column is intensity. ms2_ppm : float, optional The m/z tolerance in ppm. Defaults to -1, which will use ms2_da instead. ms2_da : float, optional The m/z tolerance in Da. Defaults to -1, which will use ms2_ppm instead. **Note: ms2_ppm and ms2_da cannot be both negative, if so, an Exception will be raised. If both are positive, ms2_da will be used.** Returns ------- np.ndarray in shape (n_peaks, 2), dtype=np.float32 The resulting spectrum after centroiding will be guarantee to have been sorted by m/z, and the difference between two peaks will be >= ms2_ppm or >= ms2_da. """ # Sort the peaks by m/z. peaks = peaks[np.argsort(peaks[:, 0])] is_centroided: int = _check_centroid(peaks, ms2_da=ms2_da, ms2_ppm=ms2_ppm) while is_centroided == 0: peaks = _centroid_spectrum(peaks, ms2_da=ms2_da, ms2_ppm=ms2_ppm) is_centroided = _check_centroid(peaks, ms2_da=ms2_da, ms2_ppm=ms2_ppm) return peaks def _centroid_spectrum(peaks: np.ndarray, ms2_da: float = -1, ms2_ppm: float = -1) -> np.ndarray: """Centroid a spectrum by merging peaks within the +/- ms2_ppm or +/- ms2_da.""" # Construct a new spectrum to avoid memory reallocation. peaks_new = np.zeros((peaks.shape[0], 2), dtype=np.float32) peaks_new_len = 0 # Get the intensity argsort order. intensity_order = np.argsort(peaks[:, 1]) mz_delta_allowed_left = ms2_da mz_delta_allowed_right = ms2_da # Iterate through the peaks from high to low intensity. for idx in intensity_order[::-1]: if ms2_ppm > 0: mz_delta_allowed_left = peaks[idx, 0] * ms2_ppm * 1e-6 # For the right boundary, the mz_delta_allowed_right = peaks[right_idx, 0] * ms2_ppm * 1e-6 = peaks[idx, 0] / (1 - ms2_ppm * 1e-6) - peaks[idx, 0] mz_delta_allowed_right = peaks[idx, 0] / (1 - ms2_ppm * 1e-6) - peaks[idx, 0] if peaks[idx, 1] > 0: # Find the left boundary. left_idx = idx - 1 while left_idx >= 0 and peaks[idx, 0] - peaks[left_idx, 0] <= mz_delta_allowed_left: left_idx -= 1 left_idx += 1 # Find the right boundary. right_idx = idx + 1 while right_idx < peaks.shape[0] and peaks[right_idx, 0] - peaks[idx, 0] <= mz_delta_allowed_right: right_idx += 1 # Merge the peaks within the boundary. The new m/z is the weighted average of the m/z and intensity. intensity_sum = np.sum(peaks[left_idx:right_idx, 1]) intensity_weighted_mz_sum = np.sum(peaks[left_idx:right_idx, 1] * peaks[left_idx:right_idx, 0]) peaks_new[peaks_new_len, 0] = intensity_weighted_mz_sum / intensity_sum peaks_new[peaks_new_len, 1] = intensity_sum peaks_new_len += 1 # Set the intensity of the merged peaks to 0. peaks[left_idx:right_idx, 1] = 0 # Return the new spectrum. peaks_new = peaks_new[:peaks_new_len] return peaks_new[np.argsort(peaks_new[:, 0])] def _check_centroid(peaks: np.ndarray, ms2_da: float = -1, ms2_ppm: float = -1) -> int: """Check if the spectrum is centroided. 0 for False and 1 for True.""" if peaks.shape[0] <= 1: return 1 if ms2_ppm >= 0: # Use ms2_ppm to check if the spectrum is centroided. return 1 if np.all(np.diff(peaks[:, 0]) >= peaks[1:, 0] * ms2_ppm * 1e-6) else 0 elif ms2_da >= 0: # Use ms2_da to check if the spectrum is centroided. return 1 if np.all(np.diff(peaks[:, 0]) >= ms2_da) else 0