Source code for libgunshotmatch.datafile

#!/usr/bin/env python3
#
#  datafile.py
"""
Represents a parsed datafile.
"""
#
#  Copyright © 2020-2023 Dominic Davis-Foster <dominic@davis-foster.co.uk>
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to deal
#  in the Software without restriction, including without limitation the rights
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#  copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
#  The above copyright notice and this permission notice shall be included in all
#  copies or substantial portions of the Software.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
#  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
#  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
#  DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
#  OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
#  OR OTHER DEALINGS IN THE SOFTWARE.
#

# stdlib
import getpass
import os
import socket
from datetime import datetime
from statistics import mean, median
from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Tuple, Type, Union

# 3rd party
import attr
import pyms.Noise.SavitzkyGolay
import pyms.TopHat
from domdf_python_tools.paths import PathPlus
from domdf_python_tools.typing import PathLike
from enum_tools import IntEnum
from pyms.GCMS.Class import GCMS_data
from pyms.IntensityMatrix import IntensityMatrix, build_intensity_matrix_i

# this package
from libgunshotmatch import gzip_util
from libgunshotmatch.method import SavitzkyGolayMethod
from libgunshotmatch.peak import PeakList, QualifiedPeak, _to_peak_list, peak_from_dict

__all__ = ("Datafile", "FileType", "Repeat", "get_info_from_gcms_data", "GCMSDataInfo")


[docs]class FileType(IntEnum): """ Represents the input datafile types supported by PyMassSpec. """ #: JCAMP-DX (https://iupac.org/wp-content/uploads/2021/08/JCAMP-DX_MS_1994.pdf) JDX = 0 #: mzML (https://www.ncbi.nlm.nih.gov/pmc/articles/PMC3073315/) MZML = 1 #: Analytical Data Interchange Format for Mass Spectrometry (https://www.astm.org/e1947-98r22.html) ANDI = 2 @classmethod def _attrs_convert(cls: Type["FileType"], value: int) -> "FileType": if value in iter(cls): # type: ignore[operator] return cls(value) raise ValueError(f"Unrecognised file format {value}")
[docs]@attr.define class Datafile: """ Represents a single datafile in a project. :default user: taken from the currently logged-in us :param original_filetype: The filetype of the file the :class:`~.Datafile` was created from. :default device: taken from the current device's hostname :default date_created: is the current date and time :default date_modified: is the current date and time """ #: The name of the :class:`~.Datafile`. name: str #: The filename of the file the :class:`~.Datafile` was created from. original_filename: str original_filetype: FileType = attr.field( converter=FileType._attrs_convert, # type: ignore[misc] # doesn't like the converter ) """ The filetype of the file the :class:`~.Datafile` was created from. .. latex:clearpage:: """ #: A description of the :class:`~.Datafile`. description: str = attr.field(default='') #: PyMassSpec :class:`~pyms.IntensityMatrix.IntensityMatrix` object. intensity_matrix: Optional[IntensityMatrix] = attr.field(default=None) #: The user who created the :class:`~.Datafile`. user: str = attr.field(factory=getpass.getuser) #: The device that created the :class:`~.Datafile`. device: str = attr.field(factory=socket.gethostname) #: The date and time the :class:`~.Datafile` was created. date_created: datetime = attr.field(factory=datetime.now) #: The date and time the :class:`~.Datafile` was last modified. date_modified: datetime = attr.field(factory=datetime.now) #: File format version version: int = attr.field(default=1)
[docs] @classmethod def new(cls: Type["Datafile"], name: str, filename: PathLike) -> "Datafile": """ Construct a new :class:`~.Datafile` from a file. :param name: The name of the Datafile :param filename: """ date_modified = date_created = datetime.now() filename_p = PathPlus(filename) if filename_p.suffix.lower() == ".jdx": original_filetype = FileType.JDX elif filename_p.suffix.lower() == ".cdf": original_filetype = FileType.ANDI elif filename_p.suffix.lower() == ".mzml": original_filetype = FileType.MZML else: raise ValueError(f"Unrecognised file format for file {filename}") return cls( name=name, date_created=date_created, date_modified=date_modified, original_filename=filename_p.resolve().as_posix(), original_filetype=original_filetype, )
[docs] def load_gcms_data(self, filename: Optional[PathLike] = None) -> GCMS_data: """ Load GC-MS data from the datafile. :param filename: Alternative filename to load the data from. Useful if the file has moved since the :class:`~.Datafile` was created. .. versionchanged:: 0.4.0 Added the ``filename`` attribute. """ filename = PathPlus(filename or self.original_filename) if self.original_filetype == FileType.JDX: # 3rd party from pyms.GCMS.IO.JCAMP import JCAMP_reader gcms_data = JCAMP_reader(filename) elif self.original_filetype == FileType.MZML: # 3rd party from pyms.GCMS.IO.MZML import mzML_reader gcms_data = mzML_reader(filename) elif self.original_filetype == FileType.ANDI: # 3rd party from pyms.GCMS.IO.ANDI import ANDI_reader gcms_data = ANDI_reader(filename) else: # Shouldn't get here due to filetype validation at attrs' end raise ValueError(f"Unrecognised file format {self.original_filetype._name_}") return gcms_data
[docs] def prepare_intensity_matrix( self, gcms_data: GCMS_data, savitzky_golay: Union[bool, SavitzkyGolayMethod] = True, tophat: bool = True, tophat_structure_size: str = "1.5m", # Ignored if tophat=False crop_mass_range: Optional[Tuple[float, float]] = None, ) -> IntensityMatrix: """ Build an :class:`~pyms.IntensityMatrix.IntensityMatrix` for the datafile. :param gcms_data: :param savitzky_golay: Whether to perform Savitzky-Golay smoothing. :param tophat: Whether to perform Tophat baseline correction. :param tophat_structure_size: The structure size for Tophat baseline correction. :param crop_mass_range: The range of masses to which the GC-MS data should be limited to. """ intensity_matrix = build_intensity_matrix_i(gcms_data) # Show the m/z of the maximum and minimum bins print(f" Minimum m/z bin: {intensity_matrix.min_mass}") print(f" Maximum m/z bin: {intensity_matrix.max_mass}") # Crop masses if crop_mass_range is not None: # None means don't crop # min_mass, max_mass = 50, 400 min_mass, max_mass = crop_mass_range # Catch case where numbers are flipped if min_mass >= max_mass: raise ValueError( '\n'.join([ "Cannot crop mass range when `max mass` is less than `min mass`.\n'", "Did you put the numbers the wrong way around? The expected order is (<min>, <max>)", ]), ) if intensity_matrix.min_mass is not None and min_mass < intensity_matrix.min_mass: min_mass = intensity_matrix.min_mass if intensity_matrix.max_mass is not None and max_mass > intensity_matrix.max_mass: max_mass = intensity_matrix.max_mass intensity_matrix.crop_mass(min_mass, max_mass) # Perform Data filtering n_mz = intensity_matrix.size[1] # Iterate over each IC in the intensity matrix for index in range(n_mz): # print("\rWorking on IC#", index+1, ' ',end='') ic = intensity_matrix.get_ic_at_index(index) if savitzky_golay: # Perform Savitzky-Golay smoothing. # Note that Turbomass does not use smoothing for qualitative method. if isinstance(savitzky_golay, SavitzkyGolayMethod): ic = pyms.Noise.SavitzkyGolay.savitzky_golay( ic, window=savitzky_golay.window, degree=savitzky_golay.degree, ) else: ic = pyms.Noise.SavitzkyGolay.savitzky_golay(ic) if tophat: # Perform Tophat baseline correction # Top-hat baseline Correction seems to bring down noise, # retaining shapes, but keeps points on actual peaks # ic = tophat(ic, struct=method.tophat_struct) ic = pyms.TopHat.tophat(ic, struct=tophat_structure_size) # Set the IC in the intensity matrix to the filtered one intensity_matrix.set_ic_at_index(index, ic) self.intensity_matrix = intensity_matrix return intensity_matrix
[docs] def to_dict(self) -> Dict[str, Any]: """ Returns a dictionary representation of this :class:`~.Datafile`. All keys are native, JSON-serializable, Python objects. """ if self.intensity_matrix is None: im_as_dict = None else: im_as_dict = { "times": self.intensity_matrix.time_list, "masses": self.intensity_matrix.mass_list, "intensities": self.intensity_matrix.intensity_array.tolist(), } return { "name": self.name, "original_filename": self.original_filename, "original_filetype": int(self.original_filetype), "description": self.description, "intensity_matrix": im_as_dict, "user": self.user, "device": self.device, "date_created": self.date_created.isoformat(), "date_modified": self.date_modified.isoformat(), "version": self.version, }
[docs] @classmethod def from_dict(cls: Type["Datafile"], d: Mapping[str, Any]) -> "Datafile": """ Construct a :class:`~.Datafile` from a dictionary. :param d: """ im_as_dict = d["intensity_matrix"] if im_as_dict is None: intensity_matrix = None else: intensity_matrix = IntensityMatrix( time_list=im_as_dict["times"], mass_list=im_as_dict["masses"], intensity_array=im_as_dict["intensities"], ) optional_keys = {} if "user" in d: optional_keys["user"] = d["user"] if "device" in d: optional_keys["device"] = d["device"] if "date_created" in d: optional_keys["date_created"] = datetime.fromisoformat(d["date_created"]) if "date_modified" in d: optional_keys["date_modified"] = datetime.fromisoformat(d["date_modified"]) if "version" in d: optional_keys["version"] = d["version"] return cls( name=d["name"], original_filename=d["original_filename"], original_filetype=FileType(d["original_filetype"]), description=d["description"], intensity_matrix=intensity_matrix, **optional_keys, )
[docs] def export(self, output_dir: PathLike) -> str: """ Export as a ``.gsmd`` file and return the output filename. :param output_dir: :rtype: .. latex:clearpage:: """ export_filename = os.path.join(output_dir, f"{self.name}.gsmd") gzip_util.write_gzip_json(export_filename, self.to_dict(), indent=None) return export_filename
[docs] @classmethod def from_file(cls: Type["Datafile"], filename: PathLike) -> "Datafile": """ Parse a ``gsmd`` file. :param filename: The input filename. """ as_dict: Dict[str, Any] = gzip_util.read_gzip_json(filename) # type: ignore[assignment] return Datafile.from_dict(as_dict)
[docs]class GCMSDataInfo(NamedTuple): """ Represents information about a :class:`~pyms.GCMS.Class.GCMS_data` object returned by :func:`get_info_from_gcms_data`. """ #: The minimum and maximum retention times. rt_range: Tuple[float, float] #: The average time step between scans. time_step: float #: The standard deviation of the time steps between scans. time_step_stdev: float #: The total number of scans. n_scans: int #: The minimum and maximum mass (*m/z*) values. mz_range: Tuple[float, float] #: The mean average number of masses per scan. num_mz_mean: float #: The median number of masses per scan. num_mz_median: float
[docs]def get_info_from_gcms_data(gcms_data: GCMS_data) -> GCMSDataInfo: """ Returns a information about the data in a :class:`pyms.GCMS.Class.GCMS_data` object. :param gcms_data: """ # TODO: within pyms make read only properties for these private attributes # calculate median number of m/z values measured per scan scan_list = gcms_data.scan_list n_list = [len(scan) for scan in scan_list] n_mz_mean = mean(n_list) n_mz_median = median(n_list) min_mass = gcms_data.min_mass or -1 max_mass = gcms_data.max_mass or -1 info = GCMSDataInfo( rt_range=(gcms_data._min_rt / 60, gcms_data._max_rt / 60), time_step=gcms_data._time_step, time_step_stdev=gcms_data._time_step_std, n_scans=len(scan_list), mz_range=(min_mass, max_mass), num_mz_mean=n_mz_mean, num_mz_median=n_mz_median, ) return info
[docs]@attr.define class Repeat: """ Represents a repeat sample in a project, constructed from a datafile. :default user: taken from the currently logged-in user. :default device: taken from the current device's hostname. :default date_created: is the current date and time. :default date_modified: is the current date and time. """ #: The :class:`~.Datafile` for this repeat. datafile: Datafile peaks: PeakList = attr.field(converter=_to_peak_list) #: Peaks containing identities from library search. This is usually populated after peak alignment. qualified_peaks: Optional[List[QualifiedPeak]] = attr.field(default=None) #: The user who created the :class:`~.Repeat`. user: str = attr.field(factory=getpass.getuser) #: The device that created the :class:`~.Repeat`. device: str = attr.field(factory=socket.gethostname) #: The date and time the :class:`~.Repeat` was created. date_created: datetime = attr.field(factory=datetime.now) #: The date and time the :class:`~.Repeat` was last modified. date_modified: datetime = attr.field(factory=datetime.now) #: File format version version: int = attr.field(default=1) @property def name(self) -> str: """ The name of the :class:`~.Datafile`. :rtype: .. versionadded:: 0.4.0 """ return self.datafile.name
[docs] def to_dict(self) -> Dict[str, Any]: """ Returns a dictionary representation of this :class:`~.Repeat`. All keys are native, JSON-serializable, Python objects. """ if self.qualified_peaks is None: qualified_peaks_as_list = None else: qualified_peaks_as_list = [qp.to_dict() for qp in self.qualified_peaks] return { "datafile": self.datafile.to_dict(), "peaks": self.peaks.to_list(), "qualified_peaks": qualified_peaks_as_list, "user": self.user, "device": self.device, "date_created": self.date_created.isoformat(), "date_modified": self.date_modified.isoformat(), "version": self.version, }
[docs] @classmethod def from_dict(cls: Type["Repeat"], d: Mapping[str, Any]) -> "Repeat": """ Construct a :class:`~.Repeat` from a dictionary. :param d: """ datafile = Datafile.from_dict(d["datafile"]) peaks = PeakList(peak_from_dict(peak) for peak in d["peaks"]) peaks.datafile_name = datafile.name qualified_peaks_as_list = d["qualified_peaks"] if qualified_peaks_as_list is None: qualified_peaks = None else: qualified_peaks = [QualifiedPeak.from_dict(peak) for peak in d["qualified_peaks"]] optional_keys = {} if "user" in d: optional_keys["user"] = d["user"] if "device" in d: optional_keys["device"] = d["device"] if "date_created" in d: optional_keys["date_created"] = datetime.fromisoformat(d["date_created"]) if "date_modified" in d: optional_keys["date_modified"] = datetime.fromisoformat(d["date_modified"]) if "version" in d: optional_keys["version"] = d["version"] return cls( datafile=datafile, peaks=peaks, qualified_peaks=qualified_peaks, **optional_keys, )
[docs] def export(self, output_dir: PathLike) -> str: """ Export as a ``.gsmr`` file and return the output filename. .. versionadded:: 0.4.0 :param output_dir: """ export_filename = os.path.join(output_dir, f"{self.name}.gsmr") gzip_util.write_gzip_json(export_filename, self.to_dict(), indent=None) return export_filename
[docs] @classmethod def from_file(cls: Type["Repeat"], filename: PathLike) -> "Repeat": """ Parse a ``gsmr`` file. :param filename: The input filename. :rtype: .. versionadded:: 0.4.0 .. latex:clearpage:: """ as_dict: Dict[str, Any] = gzip_util.read_gzip_json(filename) # type: ignore[assignment] return cls.from_dict(as_dict)