#!/usr/bin/env python3.4
# coding: latin-1
# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2019
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
cytoflow.operations.bleedthrough_linear
---------------------------------------
'''
import os, math
from traits.api import HasStrictTraits, Str, File, Dict, Instance, \
Constant, Tuple, Float, Any, provides
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.optimize
import cytoflow.views
import cytoflow.utility as util
from .i_operation import IOperation
from .import_op import Tube, ImportOp, check_tube
[docs]@provides(IOperation)
class BleedthroughLinearOp(HasStrictTraits):
"""
Apply matrix-based bleedthrough correction to a set of fluorescence channels.
This is a traditional matrix-based compensation for bleedthrough. For each
pair of channels, the user specifies the proportion of the first channel
that bleeds through into the second; then, the module performs a matrix
multiplication to compensate the raw data.
The module can also estimate the bleedthrough matrix using one
single-color control per channel.
This works best on data that has had autofluorescence removed first;
if that is the case, then the autofluorescence will be subtracted from
the single-color controls too.
To use, set up the :attr:`controls` dict with the single color controls;
call :meth:`estimate` to parameterize the operation; check that the bleedthrough
plots look good by calling :meth:`~.BleedthroughLinearDiagnostic.plot` on
the :class:`BleedthroughLinearDiagnostic` instance returned by
:meth:`default_view`; and then :meth:`apply` on an :class:`.Experiment`.
Attributes
----------
controls : Dict(Str, File)
The channel names to correct, and corresponding single-color control
FCS files to estimate the correction splines with. Must be set to
use :meth:`estimate`.
spillover : Dict(Tuple(Str, Str), Float)
The spillover "matrix" to use to correct the data. The keys are pairs
of channels, and the values are proportions of spectral overlap. If
``("channel1", "channel2")`` is present as a key,
``("channel2", "channel1")`` must also be present. The module does not
assume that the matrix is symmetric.
control_conditions : Dict(Str, Dict(Str, Any))
Occasionally, you'll need to specify the experimental conditions that
the bleedthrough tubes were collected under (to apply the operations in the
history.) Specify them here. The key is the channel name; they value
is a dictionary of the conditions (same as you would specify for a
:class:`~.Tube` )
Examples
--------
Create a small experiment:
.. plot::
:context: close-figs
>>> import cytoflow as flow
>>> import_op = flow.ImportOp()
>>> import_op.tubes = [flow.Tube(file = "tasbe/rby.fcs")]
>>> ex = import_op.apply()
Correct for autofluorescence
.. plot::
:context: close-figs
>>> af_op = flow.AutofluorescenceOp()
>>> af_op.channels = ["Pacific Blue-A", "FITC-A", "PE-Tx-Red-YG-A"]
>>> af_op.blank_file = "tasbe/blank.fcs"
>>> af_op.estimate(ex)
>>> af_op.default_view().plot(ex)
>>> ex2 = af_op.apply(ex)
Create and parameterize the operation
.. plot::
:context: close-figs
>>> bl_op = flow.BleedthroughLinearOp()
>>> bl_op.controls = {'Pacific Blue-A' : 'tasbe/ebfp.fcs',
... 'FITC-A' : 'tasbe/eyfp.fcs',
... 'PE-Tx-Red-YG-A' : 'tasbe/mkate.fcs'}
Estimate the model parameters
.. plot::
:context: close-figs
>>> bl_op.estimate(ex2)
Plot the diagnostic plot
.. note::
The diagnostic plots look really bad in the online documentation.
They're better in a real-world example, I promise!
.. plot::
:context: close-figs
>>> bl_op.default_view().plot(ex2)
Apply the operation to the experiment
.. plot::
:context: close-figs
>>> ex2 = bl_op.apply(ex2)
"""
# traits
id = Constant('edu.mit.synbio.cytoflow.operations.bleedthrough_linear')
friendly_id = Constant("Linear Bleedthrough Correction")
name = Constant("Bleedthrough")
controls = Dict(Str, File)
spillover = Dict(Tuple(Str, Str), Float)
control_conditions = Dict(Str, Dict(Str, Any), {})
_sample = Dict(Str, Any, transient = True)
[docs] def estimate(self, experiment, subset = None):
"""
Estimate the bleedthrough from simgle-channel controls in :attr:`controls`
"""
if experiment is None:
raise util.CytoflowOpError('experiment', "No experiment specified")
channels = list(self.controls.keys())
if len(channels) < 2:
raise util.CytoflowOpError('channels',
"Need at least two channels to correct bleedthrough.")
# make sure the control files exist
for channel in channels:
if not os.path.isfile(self.controls[channel]):
raise util.CytoflowOpError('channels',
"Can't find file {0} for channel {1}."
.format(self.controls[channel], channel))
self.spillover.clear()
self._sample.clear()
for channel in channels:
# make a little Experiment
check_tube(self.controls[channel], experiment)
tube_conditions = self.control_conditions[channel] if channel in self.control_conditions else {}
exp_conditions = {k: experiment.data[k].dtype.name for k in tube_conditions.keys()}
tube_exp = ImportOp(tubes = [Tube(file = self.controls[channel],
conditions = tube_conditions)],
conditions = exp_conditions,
channels = {experiment.metadata[c]["fcs_name"] : c for c in experiment.channels},
name_metadata = experiment.metadata['name_metadata']).apply()
# apply previous operations
for op in experiment.history:
if hasattr(op, 'by'):
for by in op.by:
if 'experiment' in experiment.metadata[by]:
raise util.CytoflowOpError('experiment',
"Prior to applying this operation, "
"you must not apply any operation with 'by' "
"set to an experimental condition.")
tube_exp = op.apply(tube_exp)
# subset it
if subset:
try:
tube_exp = tube_exp.query(subset)
except Exception as exc:
raise util.CytoflowOpError('subset',
"Subset string '{0}' isn't valid"
.format(self.subset)) from exc
if len(tube_exp.data) == 0:
raise util.CytoflowOpError('subset',
"Subset string '{0}' returned no events"
.format(self.subset))
tube_data = tube_exp.data
# polyfit requires sorted data
tube_data.sort_values(channel, inplace = True)
# save a little of the data to plot later
self._sample[channel] = tube_data.sample(n = 1000)
from_channel = channel
# sometimes some of the data is off the edge of the
# plot, and this screws up a linear regression
from_min = np.min(tube_data[from_channel]) * 1.025
from_max = np.max(tube_data[from_channel]) * 0.975
tube_data[from_channel] = \
tube_data[from_channel].clip(from_min, from_max)
for to_channel in channels:
if from_channel == to_channel:
continue
to_min = np.min(tube_data[to_channel]) * 1.025
to_max = np.max(tube_data[to_channel]) * 0.975
tube_data[to_channel] = \
tube_data[to_channel].clip(to_min, to_max)
tube_data.reset_index(drop = True, inplace = True)
f = lambda x, k: x * k
popt, _ = scipy.optimize.curve_fit(f,
tube_data[from_channel],
tube_data[to_channel],
0)
self.spillover[(from_channel, to_channel)] = popt[0]
[docs] def apply(self, experiment):
"""Applies the bleedthrough correction to an experiment.
Parameters
----------
experiment : Experiment
The experiment to which this operation is applied
Returns
-------
Experiment
A new :class:`Experiment` with the bleedthrough subtracted out.
The corrected channels have the following metadata added:
- **linear_bleedthrough** : Dict(Str : Float)
The values for spillover from other channels into this channel.
- **bleedthrough_channels** : List(Str)
The channels that were used to correct this one.
- **bleedthrough_fn** : Callable (Tuple(Float) --> Float)
The function that will correct one event in this channel. Pass it
the values specified in `bleedthrough_channels` and it will return
the corrected value for this channel.
"""
if experiment is None:
raise util.CytoflowOpError('experiment', "No experiment specified")
if not self.spillover:
raise util.CytoflowOpError('spillover',
"Spillover matrix isn't set. "
"Did you forget to run estimate()?")
for (from_channel, to_channel) in self.spillover:
if not from_channel in experiment.data:
raise util.CytoflowOpError('spillover',
"Can't find channel {0} in experiment"
.format(from_channel))
if not to_channel in experiment.data:
raise util.CytoflowOpError('spillover',
"Can't find channel {0} in experiment"
.format(to_channel))
if not (to_channel, from_channel) in self.spillover:
raise util.CytoflowOpError('spillover',
"Must have both (from, to) and "
"(to, from) keys in self.spillover")
new_experiment = experiment.clone()
# the completely arbitrary ordering of the channels
channels = list(set([x for (x, _) in list(self.spillover.keys())]))
# build the spillover matrix from the spillover dictionary
a = [ [self.spillover[(y, x)] if x != y else 1.0 for x in channels]
for y in channels]
# invert it. use the pseudoinverse in case a is singular
a_inv = np.linalg.pinv(a)
# compute the corrected channels
new_channels = np.dot(experiment.data[channels], a_inv)
# and assign to the new experiment
for i, c in enumerate(channels):
new_experiment[c] = pd.Series(new_channels[:, i])
for channel in channels:
# add the spillover values to the channel's metadata
new_experiment.metadata[channel]['linear_bleedthrough'] = \
{x : self.spillover[(x, channel)]
for x in channels if x != channel}
new_experiment.metadata[channel]['bleedthrough_channels'] = list(channels)
new_experiment.metadata[channel]['bleedthrough_fn'] = lambda x, a_inv = a_inv: np.dot(x, a_inv)
new_experiment.history.append(self.clone_traits(transient = lambda _: True))
return new_experiment
[docs] def default_view(self, **kwargs):
"""
Returns a diagnostic plot to make sure spillover estimation is working.
Returns
-------
IView
An IView, call :meth:`~BleedthroughLinearDiagnostic.plot` to see the diagnostic plots
"""
# the completely arbitrary ordering of the channels
channels = list(set([x for (x, _) in list(self.spillover.keys())]))
if set(self.controls.keys()) != set(channels):
raise util.CytoflowOpError('controls',
"Must have both the controls and bleedthrough to plot")
v = BleedthroughLinearDiagnostic(op = self)
v.trait_set(**kwargs)
return v
[docs]@provides(cytoflow.views.IView)
class BleedthroughLinearDiagnostic(HasStrictTraits):
"""
Plots a scatterplot of each channel vs every other channel and the
bleedthrough line
Attributes
----------
op : Instance(BleedthroughPiecewiseOp)
The operation whose parameters we're viewing. If you made the instance
with :meth:`BleedthroughPLinearOp.default_view`, this is set for you
already.
subset : str
If set, only plot this subset of the underlying data.
"""
# traits
id = Constant("edu.mit.synbio.cytoflow.view.autofluorescencediagnosticview")
friendly_id = Constant("Autofluorescence Diagnostic")
subset = Str
# TODO - why can't I use BleedthroughPiecewiseOp here?
op = Instance(IOperation)
[docs] def plot(self, experiment = None, **kwargs):
"""
Plot a diagnostic of the bleedthrough model computation.
"""
if experiment is None:
raise util.CytoflowViewError('experiment',
"No experiment specified")
if not self.op.controls:
raise util.CytoflowViewError('op',
"No controls specified")
if not self.op.spillover:
raise util.CytoflowViewError('op',
"No spillover matrix specified")
kwargs.setdefault('histtype', 'stepfilled')
kwargs.setdefault('alpha', 0.5)
kwargs.setdefault('antialiased', True)
fig, axes2d = plt.subplots(nrows=3, ncols=3)
# the completely arbitrary ordering of the channels
channels = list(set([x for (x, _) in list(self.op.spillover.keys())]))
num_channels = len(channels)
for to_idx, row in enumerate(axes2d):
for from_idx, ax in enumerate(row):
plt.sca(ax)
from_channel = channels[from_idx]
to_channel = channels[to_idx]
if to_idx == len(axes2d) - 1 or (to_idx == len(axes2d) - 2 and from_idx == len(row) - 1):
plt.xlabel(from_channel)
if from_idx == 0 or (from_idx == 1 and to_idx == 0):
plt.ylabel(to_channel)
if from_idx == to_idx:
ax.set_visible(False)
continue
tube_data = self.op._sample[from_channel]
# for ReadTheDocs, which doesn't have swig
import sys
if sys.modules['cytoflow.utility.logicle_ext.Logicle'].__name__ != 'cytoflow.utility.logicle_ext.Logicle':
scale_name = 'log'
else:
scale_name = 'logicle'
xscale = util.scale_factory(scale_name, experiment, channel = from_channel)
yscale = util.scale_factory(scale_name, experiment, channel = to_channel)
plt.xscale(scale_name, **xscale.get_mpl_params(ax.get_xaxis()))
plt.yscale(scale_name, **yscale.get_mpl_params(ax.get_yaxis()))
plt.scatter(tube_data[from_channel],
tube_data[to_channel],
alpha = 1,
s = 1,
marker = 'o')
xs = np.logspace(-1, math.log(tube_data[from_channel].max(), 10))
ys = xs * self.op.spillover[(from_channel, to_channel)]
plt.plot(xs, ys, 'g-', lw=3)
plt.tight_layout(pad = 0.8)