Source code for cytoflow.operations.bleedthrough_linear

#!/usr/bin/env python3.4
# coding: latin-1

# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2021
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <>.

import os, math

from traits.api import HasStrictTraits, Str, File, Dict, Instance, \
                       Constant, Tuple, Float, Any, provides
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.optimize

import cytoflow.views
import cytoflow.utility as util

from .i_operation import IOperation
from .import_op import Tube, ImportOp, check_tube

[docs]@provides(IOperation) class BleedthroughLinearOp(HasStrictTraits): """ Apply matrix-based bleedthrough correction to a set of fluorescence channels. This is a traditional matrix-based compensation for bleedthrough. For each pair of channels, the user specifies the proportion of the first channel that bleeds through into the second; then, the module performs a matrix multiplication to compensate the raw data. The module can also estimate the bleedthrough matrix using one single-color control per channel. This works best on data that has had autofluorescence removed first; if that is the case, then the autofluorescence will be subtracted from the single-color controls too. To use, set up the :attr:`controls` dict with the single color controls; call :meth:`estimate` to parameterize the operation; check that the bleedthrough plots look good by calling :meth:`~.BleedthroughLinearDiagnostic.plot` on the :class:`BleedthroughLinearDiagnostic` instance returned by :meth:`default_view`; and then :meth:`apply` on an :class:`.Experiment`. Attributes ---------- controls : Dict(Str, File) The channel names to correct, and corresponding single-color control FCS files to estimate the correction splines with. Must be set to use :meth:`estimate`. spillover : Dict(Tuple(Str, Str), Float) The spillover "matrix" to use to correct the data. The keys are pairs of channels, and the values are proportions of spectral overlap. If ``("channel1", "channel2")`` is present as a key, ``("channel2", "channel1")`` must also be present. The module does not assume that the matrix is symmetric. control_conditions : Dict(Str, Dict(Str, Any)) Occasionally, you'll need to specify the experimental conditions that the bleedthrough tubes were collected under (to apply the operations in the history.) Specify them here. The key is the channel name; they value is a dictionary of the conditions (same as you would specify for a :class:`~.Tube` ) Examples -------- Create a small experiment: .. plot:: :context: close-figs >>> import cytoflow as flow >>> import_op = flow.ImportOp() >>> import_op.tubes = [flow.Tube(file = "tasbe/rby.fcs")] >>> ex = import_op.apply() Correct for autofluorescence .. plot:: :context: close-figs >>> af_op = flow.AutofluorescenceOp() >>> af_op.channels = ["Pacific Blue-A", "FITC-A", "PE-Tx-Red-YG-A"] >>> af_op.blank_file = "tasbe/blank.fcs" >>> af_op.estimate(ex) >>> af_op.default_view().plot(ex) >>> ex2 = af_op.apply(ex) Create and parameterize the operation .. plot:: :context: close-figs >>> bl_op = flow.BleedthroughLinearOp() >>> bl_op.controls = {'Pacific Blue-A' : 'tasbe/ebfp.fcs', ... 'FITC-A' : 'tasbe/eyfp.fcs', ... 'PE-Tx-Red-YG-A' : 'tasbe/mkate.fcs'} Estimate the model parameters .. plot:: :context: close-figs >>> bl_op.estimate(ex2) Plot the diagnostic plot .. note:: The diagnostic plots look really bad in the online documentation. They're better in a real-world example, I promise! .. plot:: :context: close-figs >>> bl_op.default_view().plot(ex2) Apply the operation to the experiment .. plot:: :context: close-figs >>> ex2 = bl_op.apply(ex2) """ # traits id = Constant('') friendly_id = Constant("Linear Bleedthrough Correction") name = Constant("Bleedthrough") controls = Dict(Str, File) spillover = Dict(Tuple(Str, Str), Float) control_conditions = Dict(Str, Dict(Str, Any), {}) _sample = Dict(Str, Any, transient = True)
[docs] def estimate(self, experiment, subset = None): """ Estimate the bleedthrough from simgle-channel controls in :attr:`controls` """ if experiment is None: raise util.CytoflowOpError('experiment', "No experiment specified") channels = list(self.controls.keys()) if len(channels) < 2: raise util.CytoflowOpError('channels', "Need at least two channels to correct bleedthrough.") # make sure the control files exist for channel in channels: if not os.path.isfile(self.controls[channel]): raise util.CytoflowOpError('channels', "Can't find file {0} for channel {1}." .format(self.controls[channel], channel)) self.spillover.clear() self._sample.clear() for channel in channels: # make a little Experiment check_tube(self.controls[channel], experiment) tube_conditions = self.control_conditions[channel] if channel in self.control_conditions else {} exp_conditions = {k:[k] for k in tube_conditions.keys()} tube_exp = ImportOp(tubes = [Tube(file = self.controls[channel], conditions = tube_conditions)], conditions = exp_conditions, channels = {experiment.metadata[c]["fcs_name"] : c for c in experiment.channels}, name_metadata = experiment.metadata['name_metadata']).apply() # apply previous operations for op in experiment.history: if hasattr(op, 'by'): for by in if 'experiment' in experiment.metadata[by]: raise util.CytoflowOpError('experiment', "Prior to applying this operation, " "you must not apply any operation with 'by' " "set to an experimental condition.") tube_exp = op.apply(tube_exp) # subset it if subset: try: tube_exp = tube_exp.query(subset) except Exception as exc: raise util.CytoflowOpError('subset', "Subset string '{0}' isn't valid" .format(self.subset)) from exc if len( == 0: raise util.CytoflowOpError('subset', "Subset string '{0}' returned no events" .format(self.subset)) tube_data = # polyfit requires sorted data tube_data.sort_values(channel, inplace = True) # save a little of the data to plot later self._sample[channel] = tube_data.sample(n = 1000) from_channel = channel # sometimes some of the data is off the edge of the # plot, and this screws up a linear regression from_min = np.min(tube_data[from_channel]) * 1.025 from_max = np.max(tube_data[from_channel]) * 0.975 tube_data[from_channel] = \ tube_data[from_channel].clip(from_min, from_max) for to_channel in channels: if from_channel == to_channel: continue to_min = np.min(tube_data[to_channel]) * 1.025 to_max = np.max(tube_data[to_channel]) * 0.975 tube_data[to_channel] = \ tube_data[to_channel].clip(to_min, to_max) tube_data.reset_index(drop = True, inplace = True) f = lambda x, k: x * k popt, _ = scipy.optimize.curve_fit(f, tube_data[from_channel], tube_data[to_channel], 0) self.spillover[(from_channel, to_channel)] = popt[0]
[docs] def apply(self, experiment): """Applies the bleedthrough correction to an experiment. Parameters ---------- experiment : Experiment The experiment to which this operation is applied Returns ------- Experiment A new :class:`Experiment` with the bleedthrough subtracted out. The corrected channels have the following metadata added: - **linear_bleedthrough** : Dict(Str : Float) The values for spillover from other channels into this channel. - **bleedthrough_channels** : List(Str) The channels that were used to correct this one. - **bleedthrough_fn** : Callable (Tuple(Float) --> Float) The function that will correct one event in this channel. Pass it the values specified in `bleedthrough_channels` and it will return the corrected value for this channel. """ if experiment is None: raise util.CytoflowOpError('experiment', "No experiment specified") if not self.spillover: raise util.CytoflowOpError('spillover', "Spillover matrix isn't set. " "Did you forget to run estimate()?") for (from_channel, to_channel) in self.spillover: if not from_channel in raise util.CytoflowOpError('spillover', "Can't find channel {0} in experiment" .format(from_channel)) if not to_channel in raise util.CytoflowOpError('spillover', "Can't find channel {0} in experiment" .format(to_channel)) if not (to_channel, from_channel) in self.spillover: raise util.CytoflowOpError('spillover', "Must have both (from, to) and " "(to, from) keys in self.spillover") new_experiment = experiment.clone() # the completely arbitrary ordering of the channels channels = list(set([x for (x, _) in list(self.spillover.keys())])) # build the spillover matrix from the spillover dictionary a = [ [self.spillover[(y, x)] if x != y else 1.0 for x in channels] for y in channels] # invert it. use the pseudoinverse in case a is singular a_inv = np.linalg.pinv(a) # compute the corrected channels new_channels =[channels], a_inv) # and assign to the new experiment for i, c in enumerate(channels): new_experiment[c] = pd.Series(new_channels[:, i]) # make sure the new experiment has the same column order as the old one =[list(] for channel in channels: # add the spillover values to the channel's metadata new_experiment.metadata[channel]['linear_bleedthrough'] = \ {x : self.spillover[(x, channel)] for x in channels if x != channel} new_experiment.metadata[channel]['bleedthrough_channels'] = list(channels) new_experiment.metadata[channel]['bleedthrough_fn'] = lambda x, a_inv = a_inv:, a_inv) new_experiment.history.append(self.clone_traits(transient = lambda _: True)) return new_experiment
[docs] def default_view(self, **kwargs): """ Returns a diagnostic plot to make sure spillover estimation is working. Returns ------- IView An IView, call :meth:`~BleedthroughLinearDiagnostic.plot` to see the diagnostic plots """ # the completely arbitrary ordering of the channels channels = list(set([x for (x, _) in list(self.spillover.keys())])) if set(self.controls.keys()) != set(channels): raise util.CytoflowOpError('controls', "Must have both the controls and bleedthrough to plot") v = BleedthroughLinearDiagnostic(op = self) v.trait_set(**kwargs) return v
[docs]@provides(cytoflow.views.IView) class BleedthroughLinearDiagnostic(HasStrictTraits): """ Plots a scatterplot of each channel vs every other channel and the bleedthrough line Attributes ---------- op : Instance(BleedthroughPiecewiseOp) The operation whose parameters we're viewing. If you made the instance with :meth:`BleedthroughPLinearOp.default_view`, this is set for you already. subset : str If set, only plot this subset of the underlying data. """ # traits id = Constant("") friendly_id = Constant("Autofluorescence Diagnostic") subset = Str # TODO - why can't I use BleedthroughPiecewiseOp here? op = Instance(IOperation)
[docs] def plot(self, experiment = None, **kwargs): """ Plot a diagnostic of the bleedthrough model computation. """ if experiment is None: raise util.CytoflowViewError('experiment', "No experiment specified") if not self.op.controls: raise util.CytoflowViewError('op', "No controls specified") if not self.op.spillover: raise util.CytoflowViewError('op', "No spillover matrix specified") kwargs.setdefault('histtype', 'stepfilled') kwargs.setdefault('alpha', 0.5) kwargs.setdefault('antialiased', True) fig, axes2d = plt.subplots(nrows=3, ncols=3) # the completely arbitrary ordering of the channels channels = list(set([x for (x, _) in list(self.op.spillover.keys())])) num_channels = len(channels) for to_idx, row in enumerate(axes2d): for from_idx, ax in enumerate(row): from_channel = channels[from_idx] to_channel = channels[to_idx] if to_idx == len(axes2d) - 1 or (to_idx == len(axes2d) - 2 and from_idx == len(row) - 1): plt.xlabel(from_channel) if from_idx == 0 or (from_idx == 1 and to_idx == 0): plt.ylabel(to_channel) if from_idx == to_idx: ax.set_visible(False) continue tube_data = self.op._sample[from_channel] # for ReadTheDocs, which doesn't have swig import sys if 'cytoflow.utility.logicle_ext.Logicle' in sys.modules: scale_name = 'logicle' else: scale_name = 'log' xscale = util.scale_factory(scale_name, experiment, channel = from_channel) yscale = util.scale_factory(scale_name, experiment, channel = to_channel) plt.xscale(scale_name, **xscale.get_mpl_params(ax.get_xaxis())) plt.yscale(scale_name, **yscale.get_mpl_params(ax.get_yaxis())) plt.scatter(tube_data[from_channel], tube_data[to_channel], alpha = 1, s = 1, marker = 'o') xs = np.logspace(-1, math.log(tube_data[from_channel].max(), 10)) ys = xs * self.op.spillover[(from_channel, to_channel)] plt.plot(xs, ys, 'g-', lw=3) plt.tight_layout(pad = 0.8)