Source code for libgunshotmatch.peak

#!/usr/bin/env python3
#
#  peak.py
"""
Classes representing peaks, and functions for peak filtering.
"""
#
#  Copyright © 2020-2023 Dominic Davis-Foster <dominic@davis-foster.co.uk>
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to deal
#  in the Software without restriction, including without limitation the rights
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#  copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
#  The above copyright notice and this permission notice shall be included in all
#  copies or substantial portions of the Software.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
#  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
#  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
#  DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
#  OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
#  OR OTHER DEALINGS IN THE SOFTWARE.
#

# stdlib
from typing import TYPE_CHECKING, Any, Collection, Dict, List, Mapping, Optional, Sequence, Type, Union

# 3rd party
import numpy
import pandas  # type: ignore[import-untyped]
import sdjson
from domdf_python_tools.paths import PathPlus
from domdf_python_tools.stringlist import StringList
from domdf_python_tools.typing import PathLike
from pyms.BillerBiemann import num_ions_threshold
from pyms.DPA.Alignment import exprl2alignment
from pyms.DPA.PairwiseAlignment import Alignment, PairwiseAlignment, align_with_tree
from pyms.Experiment import Experiment
from pyms.IonChromatogram import IonChromatogram
from pyms.Noise.Analysis import window_analyzer
from pyms.Peak.Class import AbstractPeak, ICPeak, Peak
from pyms.Peak.List import composite_peak
from pyms.Peak.List.Function import sele_peaks_by_rt
from pyms.Spectrum import MassSpectrum
from pyms_nist_search import SearchResult

if TYPE_CHECKING:
	# this package
	from libgunshotmatch.project import Project

__all__ = (
		"PeakList",
		"QualifiedPeak",
		"QualifiedPeakList",
		"align_peaks",
		"filter_aligned_peaks",
		"filter_peaks",
		"peak_from_dict",
		"write_alignment",
		"write_project_alignment",
		"base_peak_mass",
		)


[docs]class QualifiedPeak(Peak): """ A Peak that has been identified using NIST MS Search and contains a list of possible identities. :param rt: Retention time. :param ms: The mass spectrum at the apex of the peak. :param minutes: Retention time units flag. If :py:obj:`True`, retention time is in minutes; if :py:obj:`False` retention time is in seconds. :param outlier: Whether the peak is an outlier. :param hits: List of possible identities for the peak. :param peak_number: Optional numerical identifier for the :class:`~pyms.Peak.Class.Peak`, such as in an :class:`~.pyms.DPA.Alignment.Alignment`. """ #: List of possible identities for the peak. hits: List[SearchResult] #: Optional numerical identifier for the peak, such as in an :class:`~.pyms.DPA.Alignment.Alignment`. peak_number: Optional[int] def __init__( self, rt: float = 0.0, ms: Optional[MassSpectrum] = None, minutes: bool = False, outlier: bool = False, hits: Optional[List[SearchResult]] = None, peak_number: Optional[int] = None, ): Peak.__init__(self, rt, ms, minutes, outlier) if hits is not None: if not isinstance(hits, list) or not isinstance(hits[0], SearchResult): raise TypeError("'hits' must be a list of SearchResult objects") if hits is None: self.hits = [] else: self.hits = hits self.peak_number = peak_number def __new__( # noqa: D102 cls, rt: float = 0.0, ms: Optional[MassSpectrum] = None, minutes: bool = False, outlier: bool = False, hits: Optional[List[SearchResult]] = None, peak_number: Optional[int] = None, ): # Overrides __new__ method which warns if the Peak is for an IC (not applicable here) obj = object.__new__(cls) obj.__init__(rt, ms, minutes, outlier, hits, peak_number) # type: ignore[misc] return obj def __eq__(self, other) -> bool: # noqa: MAN001 """ Return whether this QualifiedPeak object is equal to another object. :param other: The other object to test equality with. """ if isinstance(other, self.__class__): return ( self.UID == other.UID and self.bounds == other.bounds and self.rt == other.rt and self.mass_spectrum == other.mass_spectrum and self.area == other.area and self.hits == other.hits and self.peak_number == other.peak_number ) return NotImplemented
[docs] @classmethod def from_peak(cls: Type["QualifiedPeak"], peak: Peak) -> "QualifiedPeak": """ Construct :class:`~.QualifiedPeak` from a :class:`~pyms.Peak.Class.Peak`. The resulting :class:`~.QualifiedPeak` will not have :attr:`~.hits` or :attr:`~.peak_number` set, but those attributes can be set after calling this method. :param peak: """ if not isinstance(peak, AbstractPeak): raise TypeError("'peak' must be a Peak object") if isinstance(peak, ICPeak): new_peak = cls(peak.rt, peak.ic_mass, False, peak.is_outlier) # type: ignore[arg-type] # TODO else: new_peak = cls(peak.rt, peak.mass_spectrum, False, peak.is_outlier) assert peak.area is not None assert peak.bounds is not None new_peak.area = peak.area new_peak.bounds = peak.bounds new_peak._UID = peak.UID return new_peak
def __repr__(self) -> str: return f"<Qualified Peak: {self.rt}>" def __str__(self) -> str: return self.__repr__()
[docs] def to_dict(self) -> Dict[str, Any]: """ Returns a dictionary representation of this peak. All keys are native, JSON-serializable, Python objects. """ try: ion_areas: Union[Dict, float] = self.ion_areas except ValueError: ion_areas = 0 return { "UID": self.UID, "area": self.area, "bounds": self.bounds, "ion_areas": ion_areas, "is_outlier": self.is_outlier, "mass_spectrum": { "intensity_list": self.mass_spectrum.intensity_list, "mass_list": self.mass_spectrum.mass_list, }, "rt": self.rt, "hits": [hit.to_dict() for hit in self.hits], "peak_number": self.peak_number, }
[docs] @classmethod def from_dict(cls: Type["QualifiedPeak"], d: Mapping[str, Any]) -> "QualifiedPeak": """ Construct a :class:`~.QualifiedPeak` from a dictionary. :param d: """ hits = [SearchResult.from_dict(hit) for hit in d["hits"]] peak_obj = QualifiedPeak( d["rt"], MassSpectrum(**d["mass_spectrum"]), outlier=d["is_outlier"], hits=hits, peak_number=d["peak_number"], ) peak_obj.bounds = d["bounds"] peak_obj._UID = d["UID"] peak_obj.area = d["area"] if d["ion_areas"]: peak_obj.ion_areas = d["ion_areas"] return peak_obj
# @prettify_docstrings
[docs]class PeakList(List[Peak]): """ Represents a list of peaks. .. autosummary-widths:: 35/100 """ #: String identifier for the datafile the peaks were detected in. datafile_name: Optional[str] = None def __repr__(self) -> str: if self.datafile_name: return f"{self.__class__.__name__}(datafile={self.datafile_name}; <{len(self)} peaks>)" else: return f"{self.__class__.__name__}(<{len(self)} peaks>)" def __str__(self) -> str: return self.__repr__()
[docs] def to_list(self) -> List[Dict[str, Any]]: """ Return a list of pure-Python dictionaries representing the peaks and their mass spectra. """ peaks_as_pure_list: List[Dict[str, Any]] = [] for peak in self: if peak is None: peaks_as_pure_list.append(None) continue try: ion_areas = peak.ion_areas except ValueError: ion_areas = None peaks_as_pure_list.append({ "UID": peak.UID, "area": peak.area, "bounds": peak.bounds, "ion_areas": ion_areas, "is_outlier": peak.is_outlier, "mass_spectrum": { "intensity_list": peak.mass_spectrum.intensity_list, "mass_list": peak.mass_spectrum.mass_list, }, "rt": peak.rt, }) return peaks_as_pure_list
# @prettify_docstrings
[docs]class QualifiedPeakList(List[QualifiedPeak]): """ Represents a list of qualified peaks. """ #: String identifier for the datafile the peaks were detected in. datafile_name: Optional[str] = None def __repr__(self) -> str: if self.datafile_name: return f"{self.__class__.__name__}(datafile={self.datafile_name}; <{len(self)} peaks>)" else: return f"{self.__class__.__name__}(<{len(self)} peaks>)" def __str__(self) -> str: return self.__repr__()
[docs] def to_list(self) -> List[Dict[str, Any]]: """ Return a list of pure-Python dictionaries representing the peaks and their mass spectra. """ return [qp.to_dict() for qp in self]
[docs]def filter_peaks( peak_list: List[Peak], tic: IonChromatogram, noise_filter: bool = True, noise_threshold: int = 2, base_peak_filter: Collection[int] = (73, 147), rt_range: Optional[Sequence[float]] = None, ) -> PeakList: """ Filter a list of peaks to remove noise and peaks due to e.g. column bleed. :param peak_list: :param tic: The TIC of the GC-MS data from which these peaks were identified. :param noise_filter: Whether to perform automatic noise filtering of the peak list. :param noise_threshold: The minimum number of ions that must have intensities above the noise floor, otherwise the peak is excluded. :param base_peak_filter: Peaks whose base peak is at one of the listed masses (*m/z*) are excluded. :param rt_range: Optional retention time range (in minutes) to filter the peak list to. """ if noise_filter: # Filtering peak lists with automatic noise filtering noise_level = window_analyzer(tic) # should we also do rel_threshold() here? # https://pymassspec.readthedocs.io/en/master/pyms/BillerBiemann.html#pyms.BillerBiemann.rel_threshold peak_list = num_ions_threshold(peak_list, noise_threshold, noise_level) if rt_range is not None: # Crop time range peak_list = PeakList(sele_peaks_by_rt(peak_list, [f"{rt_range[0]}m", f"{rt_range[1]}m"])) final_peak_list = PeakList() for peak in peak_list: # Get mass and intensity lists for the mass spectrum at the apex of the peak apex_mass_list = peak.mass_spectrum.mass_list apex_mass_spec = peak.mass_spectrum.mass_spec # Determine the intensity of the base peak in the mass spectrum base_peak_intensity = max(apex_mass_spec) # Determine the index of the base peak in the mass spectrum base_peak_index = [ index for index, intensity in enumerate(apex_mass_spec) if intensity == base_peak_intensity ][0] # Finally, determine the mass of the base peak base_peak_mass = apex_mass_list[base_peak_index] # skip the peak if the base peak is at e.g. m/z 73, i.e. septum bleed if base_peak_mass in base_peak_filter: # method.base_peak_filter: continue final_peak_list.append(peak) return final_peak_list
@sdjson.register_encoder(numpy.int64) def _encode_numpy_int64(obj: numpy.int64) -> int: # Necessary as to_dict() sometimes includes int64 which isn't natively serializable. return int(obj)
[docs]def align_peaks( peaks: List[PeakList], rt_modulation: float = 2.5, gap_penalty: float = 0.3, min_peaks: int = 1, ) -> Alignment: """ Perform peak alignment. :param peaks: List of list of identified peaks. Each :class:`~.PeakList` must have its :attr:`~.PeakList.datafile_name` attribute set. :param rt_modulation: Retention time tolerance parameter for pairwise alignments. :param gap_penalty: Gap parameter for pairwise alignments. :param min_peaks: Minimum number of peaks required for the alignment position to survive filtering. If set to ``-1`` the number of repeats in the project are used. :rtype: .. latex:clearpage:: """ print("\nAligning\n") expr_list = [] for peak_list in peaks: if peak_list.datafile_name is None: raise ValueError("Cannot align peaks with PeakList.datafile_name unset") else: expr_list.append(Experiment(peak_list.datafile_name, peak_list)) F1: List[Alignment] = exprl2alignment(expr_list) # F1: List[Alignment] = exprl2alignment([Experiment(d["datafile"].name, d["peak_list"]) for d in expr_list]) T1 = PairwiseAlignment( F1, rt_modulation, gap_penalty, ) if min_peaks == -1: min_peaks = len(expr_list) A1: Alignment = align_with_tree(T1, min_peaks=min_peaks) return A1
def _format_rt(rt: Optional[float]) -> str: return "NA" if rt is None or numpy.isnan(rt) else f"{rt:.3f}" def _format_area(area: Optional[float]) -> str: return "NA" if area is None else f"{area:.0f}" def _alignment_write_csv( project: "Project", output_dir_p: PathPlus, ) -> None: # Sort expr_code and peakpos into order from datafile_data desired_order = list(project.datafile_data) sort_map = [project.alignment.expr_code.index(code) for code in desired_order] expr_code = [project.alignment.expr_code[idx] for idx in sort_map] peakpos = [project.alignment.peakpos[idx] for idx in sort_map] assert desired_order == expr_code # write headers header = ["UID", "RTavg", *(f'"{item}"' for item in project.datafile_data)] rt_stringlist = StringList([','.join(header)]) area_stringlist = StringList([','.join(header)]) # for each alignment position write alignment's peak and area for peak_idx in range(len(peakpos[0])): # loop through peak lists (rows) rts, areas, new_peak_list = [], [], [] for row in peakpos: peak = row[peak_idx] if peak is None: rts.append(None) areas.append(None) else: rts.append(peak.rt / 60) areas.append(peak.area) new_peak_list.append(peak) compo_peak = composite_peak(new_peak_list) if compo_peak is None: continue uid, mean_rt = compo_peak.UID, f"{float(compo_peak.rt / 60):.3f}" rt_stringlist.append(','.join([uid, mean_rt, *map(_format_rt, rts)])) area_stringlist.append(','.join([uid, mean_rt, *map(_format_area, areas)])) (output_dir_p / f"{project.name}_alignment_rt.csv").write_lines(rt_stringlist) (output_dir_p / f"{project.name}_alignment_area.csv").write_lines(area_stringlist)
[docs]def write_project_alignment( project: "Project", output_dir: PathLike, require_all_datafiles: bool = False, ) -> None: """ Write the alignment data (retention times, peak areas, mass spectra) to disk. The output files are as follows: * :file:`{{project.name}}_alignment_rt.csv`, containing the aligned retention times. * :file:`{{project.name}}_alignment_area.csv`, containing the peak areas for the corresponding aligned retention times. * :file:`{{project.name}}_alignment_rt.json`, containing the aligned retention times. * :file:`{{project.name}}_alignment_area.json`, containing the peak areas for the corresponding aligned retention times. * :file:`{{project.name}}_alignment_ms.json`, containing the mass spectra for the corresponding aligned retention times. :param project: :param output_dir: Directory to store the output files in. :param require_all_datafiles: Whether the peak must be present in all experiments to be included in the data frame. :rtype: .. versionadded:: 0.12.0 Added as an alternative to :func:`~.write_alignment`. This function sorts the columns to match the order of ``project.datafile_data``. """ output_dir_p = PathPlus(output_dir) _alignment_write_csv(project, output_dir_p) rt_alignment = project.alignment.get_peak_alignment(require_all_expr=require_all_datafiles) rt_alignment_filename = output_dir_p / f"{project.name}_alignment_rt.json" rt_alignment_filename.write_clean(rt_alignment.to_json(indent=2)) area_alignment = project.alignment.get_area_alignment(require_all_expr=require_all_datafiles) area_alignment_filename = output_dir_p / f"{project.name}_alignment_area.json" area_alignment_filename.write_clean(area_alignment.to_json(indent=2)) ms_alignment = project.alignment.get_ms_alignment(require_all_expr=require_all_datafiles) # ms_alignment.to_json(output_dir_p / 'alignment_ms.json') alignment_ms_filename = (output_dir_p / f"{project.name}_alignment_ms.json") alignment_ms_filename.dump_json( ms_alignment.to_dict(), json_library=sdjson, # type: ignore[arg-type] indent=2, )
[docs]def write_alignment( alignment: Alignment, project_name: str, output_dir: PathLike, require_all_datafiles: bool = False, ) -> None: """ Write the alignment data (retention times, peak areas, mass spectra) to disk. The output files are as follows: * :file:`{{project_name}}_alignment_rt.csv`, containing the aligned retention times. * :file:`{{project_name}}_alignment_area.csv`, containing the peak areas for the corresponding aligned retention times. * :file:`{{project_name}}_alignment_rt.json`, containing the aligned retention times. * :file:`{{project_name}}_alignment_area.json`, containing the peak areas for the corresponding aligned retention times. * :file:`{{project_name}}_alignment_ms.json`, containing the mass spectra for the corresponding aligned retention times. :param alignment: :param project_name: The name of the project. Prefixed to all filenames. :param output_dir: Directory to store the output files in. :param require_all_datafiles: Whether the peak must be present in all experiments to be included in the data frame. """ output_dir_p = PathPlus(output_dir) alignment.write_csv( output_dir_p / f"{project_name}_alignment_rt.csv", output_dir_p / f"{project_name}_alignment_area.csv", ) rt_alignment = alignment.get_peak_alignment(require_all_expr=require_all_datafiles) rt_alignment_filename = output_dir_p / f"{project_name}_alignment_rt.json" rt_alignment_filename.write_clean(rt_alignment.to_json(indent=2)) area_alignment = alignment.get_area_alignment(require_all_expr=require_all_datafiles) area_alignment_filename = output_dir_p / f"{project_name}_alignment_area.json" area_alignment_filename.write_clean(area_alignment.to_json(indent=2)) ms_alignment = alignment.get_ms_alignment(require_all_expr=require_all_datafiles) # ms_alignment.to_json(output_dir_p / 'alignment_ms.json') alignment_ms_filename = (output_dir_p / f"{project_name}_alignment_ms.json") alignment_ms_filename.dump_json( ms_alignment.to_dict(), json_library=sdjson, # type: ignore[arg-type] )
[docs]def filter_aligned_peaks( alignment: Alignment, top_n_peaks: int = 80, min_peak_area: float = 0, ) -> pandas.DataFrame: """ Filter aligned peaks by minimum average peak area, and to the top ``n`` largest peaks. :param alignment: :param top_n_peaks: Filter to the largest ``n`` peaks. If ``0`` all peaks are included. :param min_peak_area: Exclude aligned peaks with an average peak area below this threshold. :returns: :class:`pandas.DataFrame` giving the retention times of the aligned peaks. """ # Get peak area and retention times from Alignment area_alignment: pandas.DataFrame = alignment.get_area_alignment(require_all_expr=False) rt_alignment: pandas.DataFrame = alignment.get_peak_alignment(require_all_expr=False) # Calculate average peak area for each of the aligned peaks area_alignment["mean"] = area_alignment[alignment.expr_code].mean(axis=1) area_alignment["stdev"] = area_alignment[alignment.expr_code].std(axis=1) # print(area_alignment[["mean", "stdev"]]) # Sort mean_peak_areas from largest (top) to smallest area_alignment = area_alignment.sort_values(by="mean") ######## # Get indices of largest n peaks based on `ident_top_peaks` top_peaks_indices = [] if top_n_peaks: print(f"Filtering to the largest {top_n_peaks} peaks with an average peak area above {min_peak_area}") # print("tail of area_alignment=", area_alignment.tail(top_n_peaks)) # Limit to the largest `top_n_peaks` peaks for peak_no, areas in area_alignment.tail(top_n_peaks).iterrows(): # Ignore peak if average peak area is less then min_peak_area if areas["mean"] >= min_peak_area: top_peaks_indices.append(peak_no) else: print(f"Filtering to peaks with an average peak area above {min_peak_area}") for peak_no, areas in area_alignment.iterrows(): # Ignore peak if average peak area is less then min_peak_area if areas["mean"] >= min_peak_area: top_peaks_indices.append(peak_no) # Remove peaks from rt_alignment if they are not in top_peaks_indices, # i.e. they are one of the largest n largest peaks rt_alignment = rt_alignment.filter(top_peaks_indices, axis=0) return rt_alignment
[docs]def peak_from_dict(d: Dict[str, Any]) -> Peak: """ Construct a :class:`~pyms.Peak.Class.Peak` from a dictionary. :param d: :rtype: .. latex:clearpage:: """ peak_obj = Peak(d["rt"], MassSpectrum(**d["mass_spectrum"]), outlier=d["is_outlier"]) peak_obj.bounds = d["bounds"] peak_obj._UID = d["UID"] peak_obj.area = d["area"] if d["ion_areas"]: peak_obj.ion_areas = d["ion_areas"] return peak_obj
def _to_peak_list(a_list: List[Peak]) -> PeakList: """ Internal utility to coerce a list of peaks to an actual :class:`~.PeakList`. :param a_list: """ if isinstance(a_list, PeakList): return a_list else: return PeakList(a_list)
[docs]def base_peak_mass(peak: Peak) -> float: """ Returns the mass of the largest fragment in the peak's mass spectrum. :param peak: .. versionadded:: v0.11.0 """ apex_mass_list = peak.mass_spectrum.mass_list apex_mass_spec = peak.mass_spectrum.mass_spec # Determine the intensity of the base peak in the mass spectrum base_peak_intensity = max(apex_mass_spec) # Determine the index of the base peak in the mass spectrum base_peak_index = apex_mass_spec.index(base_peak_intensity) # Finally, determine the mass of the base peak return apex_mass_list[base_peak_index]