Source code for pyimzml.ImzMLParser

# -*- coding: utf-8 -*-

# Copyright 2015 Dominik Fay
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from bisect import bisect_left, bisect_right
import sys
import re
from pathlib import Path

from warnings import warn
import numpy as np

from pyimzml.metadata import Metadata, SpectrumData
from pyimzml.ontology.ontology import convert_cv_param

PRECISION_DICT = {"32-bit float": 'f', "64-bit float": 'd', "32-bit integer": 'i', "64-bit integer": 'l'}
SIZE_DICT = {'f': 4, 'd': 8, 'i': 4, 'l': 8}
INFER_IBD_FROM_IMZML = object()
XMLNS_PREFIX = "{http://psi.hupo.org/ms/mzml}"

param_group_elname = "referenceableParamGroup"
data_processing_elname = "dataProcessing"
instrument_confid_elname = "instrumentConfiguration"


[docs]def choose_iterparse(parse_lib=None): if parse_lib == 'ElementTree': from xml.etree.ElementTree import iterparse elif parse_lib == 'lxml': from lxml.etree import iterparse else: from xml.etree.ElementTree import iterparse return iterparse
def _get_cv_param(elem, accession, deep=False, convert=False): base = './/' if deep else '' node = elem.find('%s%scvParam[@accession="%s"]' % (base, XMLNS_PREFIX, accession)) if node is not None: if convert: return convert_cv_param(accession, node.get('value')) return node.get('value')
[docs]class ImzMLParser: """ Parser for imzML 1.1.0 files (see specification here: https://ms-imaging.org/wp-content/uploads/2009/08/specifications_imzML1.1.0_RC1.pdf ). Iteratively reads the .imzML file into memory while pruning the per-spectrum metadata (everything in <spectrumList> elements) during initialization. Returns a spectrum upon calling getspectrum(i). The binary file is read in every call of getspectrum(i). Use enumerate(parser.coordinates) to get all coordinates with their respective index. Coordinates are always 3-dimensional. If the third spatial dimension is not present in the data, it will be set to zero. The global metadata fields in the imzML file are stored in parser.metadata. Spectrum-specific metadata fields are not stored by default due to avoid memory issues, use the `include_spectra_metadata` parameter if spectrum-specific metadata is needed. """ def __init__( self, filename, parse_lib=None, ibd_file=INFER_IBD_FROM_IMZML, include_spectra_metadata=None, ): """ Opens the two files corresponding to the file name, reads the entire .imzML file and extracts required attributes. Does not read any binary data, yet. :param filename: name of the XML file. Must end with .imzML. Binary data file must be named equally but ending with .ibd Alternatively an open file or Buffer Protocol object can be supplied, if ibd_file is also supplied :param parse_lib: XML-parsing library to use: 'ElementTree' or 'lxml', the later will be used if argument not provided :param ibd_file: File or Buffer Protocol object for the .ibd file. Leave blank to infer it from the imzml filename. Set to None if no data from the .ibd file is needed (getspectrum calls will not work) :param include_spectra_metadata: None, 'full', or a list/set of accession IDs. If 'full' is given, parser.spectrum_full_metadata will be populated with a list of complex objects containing the full metadata for each spectrum. If a list or set is given, parser.spectrum_metadata_fields will be populated with a dict mapping accession IDs to lists. Each list will contain the values for that accession ID for each spectrum. Note that for performance reasons, this mode only searches the spectrum itself for the value. It won't check any referenced referenceable param groups if the accession ID isn't present in the spectrum metadata. """ # ElementTree requires the schema location for finding tags (why?) but # fails to read it from the root element. As this should be identical # for all imzML files, it is hard-coded here and prepended before every tag self.sl = "{http://psi.hupo.org/ms/mzml}" # maps each imzML number format to its struct equivalent self.precisionDict = dict(PRECISION_DICT) # maps each number format character to its amount of bytes used self.sizeDict = dict(SIZE_DICT) self.filename = filename self.mzOffsets = [] self.intensityOffsets = [] self.mzLengths = [] self.intensityLengths = [] # list of all (x,y,z) coordinates as tuples. self.coordinates = [] self.root = None self.metadata = None self.polarity = None if include_spectra_metadata == 'full': self.spectrum_full_metadata = [] elif include_spectra_metadata is not None: include_spectra_metadata = set(include_spectra_metadata) self.spectrum_metadata_fields = { k: [] for k in include_spectra_metadata } self.mzGroupId = self.intGroupId = self.mzPrecision = self.intensityPrecision = None self.iterparse = choose_iterparse(parse_lib) self.__iter_read_spectrum_meta(include_spectra_metadata) if ibd_file is INFER_IBD_FROM_IMZML: # name of the binary file ibd_filename = self._infer_bin_filename(self.filename) self.m = open(ibd_filename, "rb") else: self.m = ibd_file # Dict for basic imzML metadata other than those required for reading # spectra. See method __readimzmlmeta() self.imzmldict = self.__readimzmlmeta() self.imzmldict['max count of pixels z'] = np.asarray(self.coordinates)[:,2].max() @staticmethod def _infer_bin_filename(imzml_path): imzml_path = Path(imzml_path) ibd_path = [f for f in imzml_path.parent.glob('*') if re.match(r'.+\.ibd', str(f), re.IGNORECASE) and f.stem == imzml_path.stem][0] return str(ibd_path) # system method for use of 'with ... as' def __enter__(self): return self # system method for use of 'with ... as' def __exit__(self, exc_t, exc_v, trace): if self.m is not None: self.m.close() def __iter_read_spectrum_meta(self, include_spectra_metadata): """ This method should only be called by __init__. Reads the data formats, coordinates and offsets from the .imzML file and initializes the respective attributes. While traversing the XML tree, the per-spectrum metadata is pruned, i.e. the <spectrumList> element(s) are left behind empty. Supported accession values for the number formats: "MS:1000521", "MS:1000523", "IMS:1000141" or "IMS:1000142". The string values are "32-bit float", "64-bit float", "32-bit integer", "64-bit integer". """ mz_group = int_group = None slist = None elem_iterator = self.iterparse(self.filename, events=("start", "end")) if sys.version_info > (3,): _, self.root = next(elem_iterator) else: _, self.root = elem_iterator.next() is_first_spectrum = True for event, elem in elem_iterator: if elem.tag == self.sl + "spectrumList" and event == "start": self.__process_metadata() slist = elem elif elem.tag == self.sl + "spectrum" and event == "end": self.__process_spectrum(elem, include_spectra_metadata) if is_first_spectrum: self.__read_polarity(elem) is_first_spectrum = False slist.remove(elem) self.__fix_offsets() def __fix_offsets(self): # clean up the mess after morons who use signed 32-bit where unsigned 64-bit is appropriate def fix(array): fixed = [] delta = 0 prev_value = float('nan') for value in array: if value < 0 and prev_value >= 0: delta += 2**32 fixed.append(value + delta) prev_value = value return fixed self.mzOffsets = fix(self.mzOffsets) self.intensityOffsets = fix(self.intensityOffsets) def __process_metadata(self): if self.metadata is None: self.metadata = Metadata(self.root) for param_id, param_group in self.metadata.referenceable_param_groups.items(): if 'm/z array' in param_group.param_by_name: self.mzGroupId = param_id for name, dtype in self.precisionDict.items(): if name in param_group.param_by_name: self.mzPrecision = dtype if 'intensity array' in param_group.param_by_name: self.intGroupId = param_id for name, dtype in self.precisionDict.items(): if name in param_group.param_by_name: self.intensityPrecision = dtype if not hasattr(self, 'mzPrecision'): raise RuntimeError("Could not determine m/z precision") if not hasattr(self, 'intensityPrecision'): raise RuntimeError("Could not determine intensity precision") def __process_spectrum(self, elem, include_spectra_metadata): arrlistelem = elem.find('%sbinaryDataArrayList' % self.sl) mz_group = None int_group = None for e in arrlistelem: ref = e.find('%sreferenceableParamGroupRef' % self.sl).attrib["ref"] if ref == self.mzGroupId: mz_group = e elif ref == self.intGroupId: int_group = e self.mzOffsets.append(int(_get_cv_param(mz_group, 'IMS:1000102'))) self.mzLengths.append(int(_get_cv_param(mz_group, 'IMS:1000103'))) self.intensityOffsets.append(int(_get_cv_param(int_group, 'IMS:1000102'))) self.intensityLengths.append(int(_get_cv_param(int_group, 'IMS:1000103'))) scan_elem = elem.find('%sscanList/%sscan' % (self.sl, self.sl)) x = _get_cv_param(scan_elem, 'IMS:1000050') y = _get_cv_param(scan_elem, 'IMS:1000051') z = _get_cv_param(scan_elem, 'IMS:1000052') if z is not None: self.coordinates.append((int(x), int(y), int(z))) else: self.coordinates.append((int(x), int(y), 1)) if include_spectra_metadata == 'full': self.spectrum_full_metadata.append( SpectrumData(elem, self.metadata.referenceable_param_groups) ) elif include_spectra_metadata: for param in include_spectra_metadata: value = _get_cv_param(elem, param, deep=True, convert=True) self.spectrum_metadata_fields[param].append(value) def __read_polarity(self, elem): # It's too slow to always check all spectra, so first check the referenceable_param_groups # in the header to see if they indicate the polarity. If not, try to detect it from # the first spectrum's full metadata. # LIMITATION: This won't detect "mixed" polarity if polarity is only specified outside the # referenceable_param_groups. param_groups = self.metadata.referenceable_param_groups.values() spectrum_metadata = SpectrumData(elem, self.metadata.referenceable_param_groups) has_positive = ( any('positive scan' in group for group in param_groups) or 'positive scan' in spectrum_metadata ) has_negative = ( any('negative scan' in group for group in param_groups) or 'negative scan' in spectrum_metadata ) if has_positive and has_negative: self.polarity = 'mixed' elif has_positive: self.polarity = 'positive' elif has_negative: self.polarity = 'negative' def __readimzmlmeta(self): """ DEPRECATED - use self.metadata instead, as it has much greater detail and allows for multiple scan settings / instruments. This method should only be called by __init__. Initializes the imzmldict with frequently used metadata from the .imzML file. :return d: dict containing above mentioned meta data :rtype: dict :raises Warning: if an xml attribute has a number format different from the imzML specification """ d = {} scan_settings_list_elem = self.root.find('%sscanSettingsList' % self.sl) instrument_config_list_elem = self.root.find('%sinstrumentConfigurationList' % self.sl) scan_settings_params = [ ("max count of pixels x", "IMS:1000042"), ("max count of pixels y", "IMS:1000043"), ("max dimension x", "IMS:1000044"), ("max dimension y", "IMS:1000045"), ("pixel size x", "IMS:1000046"), ("pixel size y", "IMS:1000047"), ("matrix solution concentration", "MS:1000835"), ] instrument_config_params = [ ("wavelength", "MS:1000843"), ("focus diameter x", "MS:1000844"), ("focus diameter y", "MS:1000845"), ("pulse energy", "MS:1000846"), ("pulse duration", "MS:1000847"), ("attenuation", "MS:1000848"), ] for name, accession in scan_settings_params: try: val = _get_cv_param(scan_settings_list_elem, accession, deep=True, convert=True) if val is not None: d[name] = val except ValueError: warn(Warning('Wrong data type in XML file. Skipped attribute "%s"' % name)) for name, accession in instrument_config_params: try: val = _get_cv_param(instrument_config_list_elem, accession, deep=True, convert=True) if val is not None: d[name] = val except ValueError: warn(Warning('Wrong data type in XML file. Skipped attribute "%s"' % name)) return d
[docs] def get_physical_coordinates(self, i): """ For a pixel index i, return the real-world coordinates in nanometers. This is equivalent to multiplying the image coordinates of the given pixel with the pixel size. :param i: the pixel index :return: a tuple of x and y coordinates. :rtype: Tuple[float] :raises KeyError: if the .imzML file does not specify the attributes "pixel size x" and "pixel size y" """ try: pixel_size_x = self.imzmldict["pixel size x"] pixel_size_y = self.imzmldict["pixel size y"] except KeyError: raise KeyError("Could not find all pixel size attributes in imzML file") image_x, image_y = self.coordinates[i][:2] return image_x * pixel_size_x, image_y * pixel_size_y
[docs] def getspectrum(self, index): """ Reads the spectrum at specified index from the .ibd file. :param index: Index of the desired spectrum in the .imzML file Output: mz_array: numpy.ndarray Sequence of m/z values representing the horizontal axis of the desired mass spectrum intensity_array: numpy.ndarray Sequence of intensity values corresponding to mz_array """ mz_bytes, intensity_bytes = self.get_spectrum_as_string(index) mz_array = np.frombuffer(mz_bytes, dtype=self.mzPrecision) intensity_array = np.frombuffer(intensity_bytes, dtype=self.intensityPrecision) return mz_array, intensity_array
[docs] def get_spectrum_as_string(self, index): """ Reads m/z array and intensity array of the spectrum at specified location from the binary file as a byte string. The string can be unpacked by the struct module. To get the arrays as numbers, use getspectrum :param index: Index of the desired spectrum in the .imzML file :rtype: Tuple[str, str] Output: mz_string: string where each character represents a byte of the mz array of the spectrum intensity_string: string where each character represents a byte of the intensity array of the spectrum """ offsets = [self.mzOffsets[index], self.intensityOffsets[index]] lengths = [self.mzLengths[index], self.intensityLengths[index]] lengths[0] *= self.sizeDict[self.mzPrecision] lengths[1] *= self.sizeDict[self.intensityPrecision] self.m.seek(offsets[0]) mz_string = self.m.read(lengths[0]) self.m.seek(offsets[1]) intensity_string = self.m.read(lengths[1]) return mz_string, intensity_string
[docs] def portable_spectrum_reader(self): """ Builds a PortableSpectrumReader that holds the coordinates list and spectrum offsets in the .ibd file so that the .ibd file can be read without opening the .imzML file again. The PortableSpectrumReader can be safely pickled and unpickled, making it useful for reading the spectra in a distributed environment such as PySpark or PyWren. """ return PortableSpectrumReader(self.coordinates, self.mzPrecision, self.mzOffsets, self.mzLengths, self.intensityPrecision, self.intensityOffsets, self.intensityLengths)
[docs]def getionimage(p, mz_value, tol=0.1, z=1, reduce_func=sum): """ Get an image representation of the intensity distribution of the ion with specified m/z value. By default, the intensity values within the tolerance region are summed. :param p: the ImzMLParser (or anything else with similar attributes) for the desired dataset :param mz_value: m/z value for which the ion image shall be returned :param tol: Absolute tolerance for the m/z value, such that all ions with values mz_value-|tol| <= x <= mz_value+|tol| are included. Defaults to 0.1 :param z: z Value if spectrogram is 3-dimensional. :param reduce_func: the bahaviour for reducing the intensities between mz_value-|tol| and mz_value+|tol| to a single value. Must be a function that takes a sequence as input and outputs a number. By default, the values are summed. :return: numpy matrix with each element representing the ion intensity in this pixel. Can be easily plotted with matplotlib """ tol = abs(tol) im = np.zeros((p.imzmldict["max count of pixels y"], p.imzmldict["max count of pixels x"])) for i, (x, y, z_) in enumerate(p.coordinates): if z_ == 0: UserWarning("z coordinate = 0 present, if you're getting blank images set getionimage(.., .., z=0)") if z_ == z: mzs, ints = map(lambda x: np.asarray(x), p.getspectrum(i)) min_i, max_i = _bisect_spectrum(mzs, mz_value, tol) im[y - 1, x - 1] = reduce_func(ints[min_i:max_i+1]) return im
[docs]def browse(p): """ Create a per-spectrum metadata browser for the parser. Usage:: # get a list of the instrument configurations used in the first pixel instrument_configurations = browse(p).for_spectrum(0).get_ids("instrumentConfiguration") Currently, ``instrumentConfiguration``, ``dataProcessing`` and ``referenceableParamGroup`` are supported. For browsing all spectra iteratively, you should by all means use **ascending** indices. Doing otherwise can result in quadratic runtime. The following example shows how to retrieve all unique instrumentConfigurations used:: browser = browse(p) all_config_ids = set() for i, _ in enumerate(p.coordinates): all_config_ids.update(browser.for_spectrum(i).get_ids("instrumentConfiguration")) This is a list of ids with which you can find the corresponding ``<instrumentConfiguration>`` tag in the xml tree. :param p: the parser :return: the browser """ return _ImzMLMetaDataBrowser(p.root, p.filename, p.sl)
def _bisect_spectrum(mzs, mz_value, tol): ix_l, ix_u = bisect_left(mzs, mz_value - tol), bisect_right(mzs, mz_value + tol) - 1 if ix_l == len(mzs): return len(mzs), len(mzs) if ix_u < 1: return 0, 0 if ix_u == len(mzs): ix_u -= 1 if mzs[ix_l] < (mz_value - tol): ix_l += 1 if mzs[ix_u] > (mz_value + tol): ix_u -= 1 return ix_l, ix_u class _ImzMLMetaDataBrowser(object): def __init__(self, root, fn, sl): self._root = root self._sl = sl self._fn = fn self._iter, self._previous, self._list_elem = None, None, None self.iterparse = choose_iterparse() def for_spectrum(self, i): if self._previous is None or i <= self._previous: self._iter = self.iterparse(self._fn, events=("start", "end")) for event, s in self._iter: if s.tag == self._sl + "spectrumList" and event == "start": self._list_elem = s elif s.tag == self._sl + "spectrum" and event == "end": self._list_elem.remove(s) if s.attrib["index"] == str(i): self._previous = i return _SpectrumMetaDataBrowser(self._root, self._sl, s) class _SpectrumMetaDataBrowser(object): def __init__(self, root, sl, spectrum): self._root = root self._sl = sl self._spectrum = spectrum def get_ids(self, element): param_methods = { param_group_elname: self._find_referenceable_param_groups, data_processing_elname: self._find_data_processing, instrument_confid_elname: self._find_instrument_configurations, } try: return param_methods[element]() except KeyError as e: raise ValueError("Unsupported element: " + str(element)) def _find_referenceable_param_groups(self): param_group_refs = self._spectrum.findall("%sreferenceableParamGroupRef" % self._sl) ids = map(lambda g: g.attrib["ref"], param_group_refs) return ids def _find_instrument_configurations(self): ids = None scan_list = self._spectrum.find("%sscanList" % self._sl) if scan_list: scans = scan_list.findall("%sscan[@instrumentConfigurationRef]" % self._sl) ids = map(lambda s: s.attrib["instrumentConfigurationRef"], scans) if not ids: run = self._root.find("%srun") try: return [run.attrib["defaultInstrumentConfigurationRef"]] except KeyError as _: return list() else: return ids def _find_data_processing(self): try: return self._spectrum.attrib["dataProcessingRef"] except KeyError as _: spectrum_list = self._root.find("%srun/%sspectrumList" % tuple(2 * [self._sl])) try: return [spectrum_list.attrib["defaultDataProcessingRef"]] except KeyError as _: return []
[docs]class PortableSpectrumReader(object): """ A pickle-able class for holding the minimal set of data required for reading, without holding any references to open files that wouldn't survive pickling. """ def __init__(self, coordinates, mzPrecision, mzOffsets, mzLengths, intensityPrecision, intensityOffsets, intensityLengths): self.coordinates = coordinates self.mzPrecision = mzPrecision self.mzOffsets = mzOffsets self.mzLengths = mzLengths self.intensityPrecision = intensityPrecision self.intensityOffsets = intensityOffsets self.intensityLengths = intensityLengths
[docs] def read_spectrum_from_file(self, file, index): """ Reads the spectrum at specified index from the .ibd file. :param file: File or file-like object for the .ibd file :param index: Index of the desired spectrum in the .imzML file Output: mz_array: numpy.ndarray Sequence of m/z values representing the horizontal axis of the desired mass spectrum intensity_array: numpy.ndarray Sequence of intensity values corresponding to mz_array """ file.seek(self.mzOffsets[index]) mz_bytes = file.read(self.mzLengths[index] * SIZE_DICT[self.mzPrecision]) file.seek(self.intensityOffsets[index]) intensity_bytes = file.read(self.intensityLengths[index] * SIZE_DICT[self.intensityPrecision]) mz_array = np.frombuffer(mz_bytes, dtype=self.mzPrecision) intensity_array = np.frombuffer(intensity_bytes, dtype=self.intensityPrecision) return mz_array, intensity_array