Source code for cytoflow.operations.flowclean

#!/usr/bin/env python3.8
# coding: latin-1

# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2022
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
cytoflow.operations.flowclean
---------------------------

The `flowclean` module has two classes:

`FlowCleanOp` -- gates an `Experiment` based on a variation in fluorescence
channels over time.

`FlowCleanDiagnostic` -- diagnostic views for FlowCleanOp.

"""

from traits.api import (HasStrictTraits, Str, Dict, Any, Instance, 
                        Constant, List, provides, Int, Float, Bool)

import math
from pathlib import Path
import numpy as np
import scipy.stats
import scipy.signal
import scipy.optimize
import sklearn.neighbors
import statsmodels.nonparametric.bandwidths
import pandas as pd
import matplotlib.pyplot as plt
from warnings import warn

from cytoflow.operations.import_op import Tube
import cytoflow.utility as util
import cytoflow.views

from .i_operation import IOperation

[docs] @provides(IOperation) class FlowCleanOp(HasStrictTraits): """ This module gates events from time slices whose density is low or whose events' fluorescence intensity varies substantially from other slices. This is often due to a bubble or transient clog in the flow cell. The operation assesses whether a tube is "clean" using an algorithm described below. If the tube is already clean, only low-density slices are gated. If the tube is not clean, then a cleaning is attempted, gating slices that are substantially statistically different than the majority. Cleanliness is then assessed again. After calling `estimate()`, `tube_status` is set for each tube, indicating whether it was ``CLEAN`` (clean before the operation), ``CLEANED`` (clean after the gated events are dropped), or ``UNCLEAN`` (still unclean after the gated events are dropped.) This estimation is applied to every tube independently -- that is, to every subset of events with a unique combination of experimental metadata (as determined by the experiment's `Experiment.metadata`). Thus, there is no ``by`` attribute, as with many other data-driven gating operations. When the `apply` function is called, a new boolean condition is created. Its name is the same as the `name` of the operation; it is ``True`` if the event remains after the tube is cleaned, and ``False`` if it was in a dropped slice. .. caution:: For some data sets, the default parameters can make this algorithm *really* zealous. Please check the diagnostic plots and the `tube_statistics` tables to make sure this operation is performing like you expect it to, and adjust the parameters accordingly! Doing so will probably require an understanding of exactly what the algorithm is doing -- see the **Notes** section below for details. Attributes ---------- name : Str The operation name; determines the name of the new metadata column. time_channel : Str The channel that represents time -- often ``HDR-T`` or similar. channels : List(Str) Which fluorescence channel or channels are analyzed for variation? scale : Dict(Str : {"linear", "logicle", "log"}) Re-scale the data in the specified channels before cleaning. If a channel is in `channels` but not in `scale`, the current package-wide default (set with `set_default_scale`) is used. .. important:: This algorithm works *much* better when fluorescence channels are scaled (and not just left ``linear``.) segment_size : Int (default = 1000) The number of events in each bin in the analysis. density_cutoff : Float (default = 0.05) The minimum density CDF to keep. max_discontinuity : Float (default = 0.1) The critical "continuity" -- determines how "different" an adjacent segment must be to be for a tube to be flagged as suspicious. max_drift : Float (default = 0.15) The maximum any individual channel can drift before being flagged as needing cleaning. max_mean_drift : Float (default = 0.13) The maximum the mean of all channels' drift can be before the tube is flagged as needing to be cleaned. segment_cutoff : Float (default = 0.05) The minimum sum-of-measures' CDF to keep. detect_worst_channels_range : Int (default = 0) Should `FlowCleanOp` try to detect the worst channels and use them to flag tubes or trim events? If this attribute is greater than 0, choose channels using the range of the mean of the bins' fluorescence distribution. Often used in combination with `detect_worst_channels_sd`. detect_worst_channels_sd : Int (default = 0) Should `FlowCleanOp` try to detect the worst channels and use them to flag tubes or trim events? If this attribute is greater than 0, choose channels using the standard deviation of the mean of the bins' fluorescence distribution. Often used in combination with `detect_worst_channels_range`. measures : List(String) (default = ("5th percentile", "20th percentile", "50th percentile", "80th percentile", "95th percentile", "mean", "variance", "skewness") ). Which measures should be considered when comparing segments? Must be a subset of the default. dont_clean : Bool (default = False) If ``True``, never clean -- just remove low-density bins. force_clean : Bool (default = False) If ``True``, force cleaning even if the tube passes the original quality checks. Remember, the operation **always** gates low-density bins. tube_status : Dict(Tube : {"CLEAN", "UNCLEAN", "CLEANED"}) Set by `estimate`, has the status of each tube. If the tube didn't need cleaning, it's set to **CLEAN**. If the tube was cleaned and then passed the drift and max discontinuity tests, it's set to **CLEANED**. Otherwise, the tube status is set to **UNCLEAN**. tube_statistics : Dict(Str : pd.DataFrame) Set by `estimate`, has statistics about the drift of each tube. Notes ----- This is inspired by the algorithm in the Bioconductor package ``flowCut`` [1]_. The algorithm works in the following way: 1. Bin the events along the time they were collected. The bin size is determined by `segment_size`. 2. Compute the density (events per unit time) of each bin, estimate a kernel density of that distribution, and remove bins whose density's CDF is less than `density_cutoff`. 3. For each channel, compute the mean intensity in each bin (after scaling the data), then compute the mean drift and the differences between adjacent bins. The mean drift is the (difference between the maximum and minimum mean) divided by the (98th - 2nd percentile difference). If the mean drift is greater than `max_drift`, set the `tube_status` to **UNCLEAN**. 4. Compute the mean drift across all channels. If the mean drift is greater than `max_mean_drift`, set the tube status to **UNCLEAN**. 5. For each channel, see if any adjacent bins have differences in their mean fluorescence more than `max_discontinuity`. If so, set the tube status as **UNCLEAN**. Otherwise, set the tube status as **CLEAN**. 6. If the tube needs to be cleaned, compute the measures from `measures` for each bin in each channel, then sum them over all the channels to obtain a single number for each bin. Estimate a kernel density of that distribution and find the peak with the largest prominence. Fit a normal curve with that peak as the center and discard any bins whose sum-of-measures' two-sided CDF is less than `segment_cutoff`. 7. Re-compute the drift in each channel, the mean drift and maximum drift, and the maximum discontinuity. If any of these are outside of spec, leave the tube status as **UNCLEAN**. Else, set the tube status as **CLEANED**. References ---------- .. [1] Meskas J, Wang S (2024). flowCut: Automated Removal of Outlier Events and Flagging of Files Based on Time Versus Fluorescence Analysis. R package version 1.16.0. https://www.bioconductor.org/packages/release/bioc/html/flowCut.html Examples -------- .. plot:: :context: close-figs Make a little data set. >>> import cytoflow as flow >>> import_op = flow.ImportOp() >>> import_op.tubes = [flow.Tube(file = "Plate01/RFP_Well_A3.fcs", ... conditions = {'Dox' : 10.0}), ... flow.Tube(file = "Plate01/CFP_Well_A4.fcs", ... conditions = {'Dox' : 1.0})] >>> import_op.conditions = {'Dox' : 'float'} >>> ex = import_op.apply() Create and parameterize the operation. .. plot:: :context: close-figs >>> fc_op = flow.FlowCleanOp(name = 'FlowClean', ... time_channel = 'HDR-T', ... channels = ['V2-A', 'Y2-A'], ... scale = {'V2-A' : 'log', ... 'Y2-A' : 'log'}) Set the gate of events that need to be cleaned .. plot:: :context: close-figs >>> fc_op.estimate(ex) Plot a diagnostic view for each tube. .. plot:: :context: close-figs >>> fc_op.default_view().plot(ex, plot_name = "Plate01/RFP_Well_A3.fcs") .. plot:: :context: close-figs >>> fc_op.default_view().plot(ex, plot_name = "Plate01/CFP_Well_A4.fcs") Apply the gate .. plot:: :context: close-figs >>> ex2 = fc_op.apply(ex) """ id = Constant('cytoflow.operations.flowclean') friendly_id = Constant("FlowClean Data Cleaning") name = Str time_channel = Str channels = List(Str) scale = Dict(Str, util.ScaleEnum) segment_size = Int(1000) density_cutoff = Float(0.05) max_drift = Float(0.15) max_mean_drift = Float(0.13) max_discontinuity = Float(0.1) segment_cutoff = Float(0.05) detect_worst_channels_range = Int(0) detect_worst_channels_sd = Int(0) measures = List(Str, value = ("5th percentile", "20th percentile", "50th percentile", "80th percentile", "95th percentile", "mean", "variance", "skewness")) force_clean = Bool(False) dont_clean = Bool(False) tube_status = Dict(Str, Str) tube_statistics = Dict(Str, Instance(pd.DataFrame), transient = True) _tube_status = Dict(Tube, Str) _tube_bins = Dict(Tube, pd.api.typing.DataFrameGroupBy, transient = True) _bin_means = Dict(Tube, Any, transient = True) _bin_kept = Dict(Tube, Instance(np.ndarray), transient = True) _bin_measures = Dict(Tube, Instance(np.ndarray), transient = True) _tube_channels = Dict(Tube, List(Str), transient = True) _tube_stats = Dict(Tube, Instance(pd.DataFrame), transient = True) _bin_means = Dict(Tube, Any, transient = True) _density_kde = Dict(Tube, Any, transient = True) _density_peaks = Dict(Tube, Any, transient = True) _density_pdf = Dict(Tube, Any, transient = True) _measures_kde = Dict(Tube, Any, transient = True) _measures_pdf = Dict(Tube, Any, transient = True) _measures_peaks = Dict(Tube, Any, transient = True) _scale = Dict(Str, Instance(util.IScale), transient = True)
[docs] def estimate(self, experiment, subset = None): # check that subset is None if subset is not None: raise util.CytoflowOpError(None, "'subset' must be None.") if experiment is None: raise util.CytoflowOpError('experiment', "No experiment specified") if len(self.channels) == 0: raise util.CytoflowOpError('channels', "Must set at least one channel") if len(self.channels) != len(set(self.channels)): raise util.CytoflowOpError('channels', "Must not duplicate channels") if self.time_channel not in experiment.data: raise util.CytoflowOpError('time_channel', "Time channel {} not found in experiment" .format(self.time_channel)) for c in self.channels: if c not in experiment.data: raise util.CytoflowOpError('channels', "Channel {0} not found in the experiment" .format(c)) for c in self.scale: if c not in self.channels: raise util.CytoflowOpError('channels', "Scale set for channel {0}, but it isn't " "in the 'channels'" .format(c)) if self.scale[c] == "linear": warn("Linear scale set for channel {}. FlowClean works a LOT " "better on scaled data; consider scaling with log or " "logicle.".format(c), util.CytoflowOpWarning) if self.detect_worst_channels_range and self.detect_worst_channels_range > len(self.channels): raise util.CytoflowOpError('detect_worst_channels_range', "detect_worst_channels_range can't be " "more than {}".format(len(self.channels))) if self.detect_worst_channels_sd and self.detect_worst_channels_sd > len(self.channels): raise util.CytoflowOpError('detect_worst_channels_sd', "detect_worst_channels_sd can't be " "more than {}".format(len(self.channels))) if self.force_clean and self.dont_clean: raise util.CytoflowOpError('force_clean', "Can't set both `force_clean` and `dont_clean`!") # instantiate scales. estimate the scale params for the ENTIRE data set, # not subsets we get from groupby(). And we need to save it so that # the data is transformed the same way when we apply() for c in self.channels: if c in self.scale: self._scale[c] = util.scale_factory(self.scale[c], experiment, channel = c) # if self.scale[c] == 'log': # self._scale[c].mode = 'mask' else: self._scale[c] = util.scale_factory(util.get_default_scale(), experiment, channel = c) conditions = list(experiment.history[0].tubes[0].conditions.keys()) if len(experiment.history[0].tubes) > 1: g = experiment.data.groupby(conditions, observed = True) for tube in experiment.history[0].tubes: if len(experiment.history[0].tubes) > 1: tube_events = g.get_group(tuple(tube.conditions.values())) else: tube_events = experiment.data.copy(deep = True) # check that the events for a tube are monotonic in the time channel! dx = np.diff(tube_events[self.time_channel]) if not np.all(dx >= 0): warn(f"Events in tube {tube.file} do not have monotonically " "increasing time. Sorting...", util.CytoflowOpWarning) tube_events = tube_events.sort_values(by = self.time_channel) for c in self.channels: tube_events.loc[:, c] = self._scale[c](tube_events[c]) num_segments = int(len(tube_events) / self.segment_size) if num_segments < 10: warn(f"Need at least {10 * self.segment_size} events to clean; tube {tube.file} only has {len(tube_events)}. Skipping.") continue segment_size = int(len(tube_events) / num_segments) labels = np.repeat(range(0, num_segments), segment_size) if len(tube_events) > len(labels): labels = np.append(labels, [num_segments - 1] * (len(tube_events) - len(labels))) self._tube_bins[tube] = tube_events.groupby(labels, observed = True) self._bin_kept[tube] = np.full((num_segments), True) self._bin_measures[tube] = np.zeros((num_segments)) self._tube_stats[tube] = pd.DataFrame(index = self.channels, columns = ['Bin Mean Range', 'Bin Mean SD', 'Drift Pre', 'Drift Post', 'Max Discontinuity Pre', 'Max Discontinuity Post', 'Worst Channel (Range)', 'Worst Channel (SD)']) tube_stats = self._tube_stats[tube] self._tube_status[tube] = "CLEAN" # compute density for each bin bin_density = np.zeros((num_segments)) for binn, events in self._tube_bins[tube]: start_time = events[self.time_channel].iat[0] end_time = events[self.time_channel].iat[len(events) - 1] assert(end_time >= start_time) bin_density[binn] = len(events) / (end_time - start_time) ### Remove low-density bins # estimate the densith with a gaussian kernel bandwidth = statsmodels.nonparametric.bandwidths.bw_scott(bin_density, "gaussian") support_min = bin_density.min() - bandwidth * 3 support_max = bin_density.max() + bandwidth * 3 support = np.linspace(support_min, support_max, 100) kde = sklearn.neighbors.KernelDensity(kernel = "gaussian", bandwidth = bandwidth).fit(bin_density[:, np.newaxis]) log_density = kde.score_samples(support[:, np.newaxis]) density = np.exp(log_density) self._density_kde[tube] = (support, density) # find peaks in the kde density_max = np.max(density) peaks, peak_props = scipy.signal.find_peaks(density, prominence = 0.1 * density_max) peaks = [support[x] for x in peaks] self._density_peaks[tube] = peaks if len(peaks) == 0: warn("No peaks found in the density KDE for {}".format(tube.file), util.CytoflowOpWarning) continue # fit a gaussian with the mean at the maximum peak max_peak_idx = np.argmax(peak_props['prominences']) max_peak = self._density_peaks[tube][max_peak_idx] pdf = lambda x, sd: scipy.stats.norm.pdf(x, loc = max_peak, scale = sd) opt_sd = scipy.optimize.curve_fit(pdf, support, density)[0][0] # compute the probability of observing each element in measure_sum self._density_pdf[tube] = (max_peak, opt_sd) density_probability = scipy.stats.norm.cdf(bin_density, loc = max_peak, scale = opt_sd) # remove low-probability bins for binn, _ in self._tube_bins[tube]: if density_probability[binn] < self.density_cutoff: self._bin_kept[tube][binn] = False if self.dont_clean: continue ### Evaluate drift # compute bin means self._bin_means[tube] = {channel : np.zeros((num_segments)) for channel in self.channels} kept_bin_means = {} for channel in self.channels: for binn, events in self._tube_bins[tube]: self._bin_means[tube][channel][binn] = events[channel].mean() kept_bin_means[channel] = np.compress(self._bin_kept[tube], self._bin_means[tube][channel]) tube_stats.loc[channel, 'Bin Mean Range'] = np.ptp(kept_bin_means[channel]) tube_stats.loc[channel, 'Bin Mean SD'] = np.std(kept_bin_means[channel]) # find the "worst" channels worst_channels_range = [] if self.detect_worst_channels_range: worst_channels_range = list(tube_stats.sort_values(by = 'Bin Mean Range').index[-1 * self.detect_worst_channels_range :]) for channel in worst_channels_range: tube_stats.loc[channel, 'Worst Channel (Range)'] = "*" worst_channels_sd = [] if self.detect_worst_channels_sd: worst_channels_sd = list(tube_stats.sort_values(by = 'Bin Mean SD').index[-1 * self.detect_worst_channels_sd :]) for channel in worst_channels_sd: tube_stats.loc[channel, 'Worst Channel (SD)'] = "*" if worst_channels_range and not worst_channels_sd: channels = worst_channels_range elif worst_channels_sd and not worst_channels_range: channels = worst_channels_sd elif worst_channels_range and worst_channels_sd: channels = list(set(worst_channels_range).union(set(worst_channels_sd))) else: channels = self.channels self._tube_channels[tube] = channels # compute each channel's drift and see if it is in spec or whether # the tube needs cleaning for channel in channels: drift = (np.max(kept_bin_means[channel]) - np.min(kept_bin_means[channel])) / (tube_events[channel].quantile(0.98) - tube_events[channel].quantile(0.02)) tube_stats.loc[channel, "Drift Pre"] = drift if drift > self.max_drift: self._tube_status[tube] = "UNCLEAN" # compute the mean of the means and see whether it's in spec, or # the tube needs to be cleaned mean_drift = tube_stats.loc[:, "Drift Pre"].mean(skipna = True) if mean_drift > self.max_mean_drift: self._tube_status[tube] = "UNCLEAN" # check for discontinuities in the channel means for channel in channels: max_discontinuity = 0.0 for i in range(1, len(kept_bin_means[channel])): d = abs(kept_bin_means[channel][i] - kept_bin_means[channel][i-1]) / kept_bin_means[channel][i-1] if d > max_discontinuity: max_discontinuity = d tube_stats.loc[channel, "Max Discontinuity Pre"] = max_discontinuity if max_discontinuity > self.max_discontinuity: self._tube_status[tube] = "UNCLEAN" if self._tube_status[tube] == "CLEAN" and not self.force_clean: continue ### Cleaning measure_sum = np.zeros((num_segments)) for binn, events in self._tube_bins[tube]: for channel in channels: if '5th percentile' in self.measures: measure_sum[binn] += events[channel].quantile(0.05) if '20th percentile' in self.measures: measure_sum[binn] += events[channel].quantile(0.20) if '50th percentile' in self.measures: measure_sum[binn] += events[channel].quantile(0.50) if '80th percentile' in self.measures: measure_sum[binn] += events[channel].quantile(0.80) if '95th percentile' in self.measures: measure_sum[binn] += events[channel].quantile(0.95) if 'mean' in self.measures: measure_sum[binn] += events[channel].mean() if 'variance' in self.measures: measure_sum[binn] += events[channel].var() if 'skewness' in self.measures: measure_sum[binn] += events[channel].skew() # estimate the densith with a gaussian kernel bandwidth = statsmodels.nonparametric.bandwidths.bw_scott(measure_sum, "gaussian") support_min = measure_sum.min() - bandwidth * 3 support_max = measure_sum.max() + bandwidth * 3 support = np.linspace(support_min, support_max, 100) kde = sklearn.neighbors.KernelDensity(kernel = "gaussian", bandwidth = bandwidth).fit(measure_sum[:, np.newaxis]) log_density = kde.score_samples(support[:, np.newaxis]) density = np.exp(log_density) self._measures_kde[tube] = (support, density) # find peaks in the kde peaks, peak_props = scipy.signal.find_peaks(density, prominence = 0) peaks = [support[x] for x in peaks] self._measures_peaks[tube] = peaks if len(peaks) == 0: warn("No peaks found for {}".format(tube.file), util.CytoflowOpWarning) continue # fit a gaussian with the mean at the maximum peak max_peak_idx = np.argmax(peak_props['prominences']) max_peak = self._measures_peaks[tube][max_peak_idx] pdf = lambda x, sd: scipy.stats.norm.pdf(x, loc = max_peak, scale = sd) opt_sd = scipy.optimize.curve_fit(pdf, support, density)[0][0] # compute the probability of observing each element in measure_sum self._measures_pdf[tube] = (max_peak, opt_sd) measures_cdf = scipy.stats.norm.cdf(measure_sum, loc = max_peak, scale = opt_sd) # remove low-probability bins for binn, _ in self._tube_bins[tube]: if min(measures_cdf[binn], 1 - measures_cdf[binn]) < self.segment_cutoff: self._bin_kept[tube][binn] = False kept_bin_means = {} for channel in channels: kept_bin_means[channel] = np.compress(self._bin_kept[tube], self._bin_means[tube][channel]) ### Re-evaluate drift self._tube_status[tube] = "CLEANED" # compute each channel's drift and see if it is in spec or whether # the tube needs cleaning for channel in channels: drift = (np.max(kept_bin_means[channel]) - np.min(kept_bin_means[channel])) / (tube_events[channel].quantile(0.98) - tube_events[channel].quantile(0.02)) tube_stats.loc[channel, "Drift Post"] = drift if drift > self.max_drift: self._tube_status[tube] = "UNCLEAN" # compute the mean of the means and see whether it's in spec, or # the tube needs to be cleaned mean_drift = tube_stats.loc[:, "Drift Post"].mean(skipna = True) if mean_drift > self.max_mean_drift: self._tube_status[tube] = "UNCLEAN" # check for discontinuities in the channel means for channel in channels: max_discontinuity = 0.0 for i in range(1, len(kept_bin_means[channel])): d = abs(kept_bin_means[channel][i] - kept_bin_means[channel][i-1]) / kept_bin_means[channel][i-1] if d > max_discontinuity: max_discontinuity = d tube_stats.loc[channel, "Max Discontinuity Post"] = max_discontinuity if max_discontinuity > self.max_discontinuity: self._tube_status[tube] = "UNCLEAN" self.tube_statistics = {k.file : v for k, v in self._tube_stats.items()} self.tube_status = {k.file : v for k, v in self._tube_status.items()} # set atomically to trigger UI update
[docs] def apply(self, experiment): """ Creates a new condition based on whether the event was dropped by the cleaning procedure in `estimate()` -- essentially, a "gate" that cleans each tube's data Parameters ---------- experiment : `Experiment` the `Experiment` to apply the gate to. Returns ------- `Experiment` a new `Experiment` with the new condition. """ if experiment is None: raise util.CytoflowOpError('experiment', "No experiment specified") # make sure name got set! if not self.name: raise util.CytoflowOpError('name', "You have to set the gate's name " "before applying it!") if self.name != util.sanitize_identifier(self.name): raise util.CytoflowOpError('name', "Name can only contain letters, numbers and underscores." .format(self.name)) if self.name in experiment.data.columns: raise util.CytoflowOpError('name', "Experiment already has a column named {0}" .format(self.name)) if not self._tube_bins.keys(): raise util.CytoflowOpError(None, "No tubes were analyzed. Did you forget to " "call estimate()?") event_assignments = pd.Series([True] * len(experiment), dtype = "bool") for tube, tube_bins in self._tube_bins.items(): for binn, events in tube_bins: if not self._bin_kept[tube][binn]: event_assignments.loc[events.index] = False new_experiment = experiment.clone(deep = False) new_experiment.add_condition(self.name, "bool", event_assignments) new_experiment.history.append(self.clone_traits(transient = lambda _: True)) return new_experiment
[docs] def default_view(self, **kwargs): """ Returns diagnostic plots of `FlowCleanOp`'s actions. Returns ------- `IView` an `IView`, call `plot` to see the diagnostic plot. """ v = FlowCleanDiagnostic(op = self) v.trait_set(**kwargs) return v
[docs] @provides(cytoflow.views.IView) class FlowCleanDiagnostic(HasStrictTraits): """ A diagnostic view for `FlowCleanOp`. Each fluorescence channel is plotted with a scatterplot. Bin means are shown with green line segments, and dropped bins are plotted in orange instead of blue. The plot title shows the channel drift and maximum discontinuity before cleaning, as well as the drift and max discontinuity after cleaning (if cleaning was applied.) There are one or two additional distributions also plotted. The first shows the KDE estimate of bin densities, the peaks that the peak finding algorithm found, and the normal distribution used to determine if a bin density was "abnormal". If the tube needed cleaning, the second shows a KDE estimate of the sum-of-measures distribution, with peaks and the normal distribution used to determine if a bin was to be excluded. (The actual cutoffs are determined by `density_cutoff` and `segment_cutoff`, respectively. Attributes ---------- op : Instance(`FlowCleanOp`) The operation instance whose diagnostic we're plotting. Set automatically if you create the instance using `FlowCleanOp.default_view`. """ id = Constant('cytoflow.view.flowcleandiagnostic') friendly_id = Constant("FlowClean Diagnostic") op = Instance(FlowCleanOp)
[docs] def enum_plots(self, experiment): """ Returns an iterator over the possible plots that this View can produce -- in this case, the `cytoflow.operations.import_op.Tube` filenames. The values returned can be passed to the ``plot_name`` keyword of `plot`. Parameters ---------- experiment : `Experiment` The `Experiment` that will be producing the plots. """ return [Path(tube.file).name for tube in experiment.history[0].tubes]
[docs] def plot(self, experiment, **kwargs): """ Make a diagnostic plot for a `FlowCleanOp` operation. Parameters ---------- plot_name : Str The tube filename to plot. The filenames can also be retrieved from `enum_plots`. This can be a full path or just the filename. alpha : float (default = 0.25) The alpha blending value, between 0 (transparent) and 1 (opaque). s : int (default = 0.5) The size in points^2. marker : a matplotlib marker style, usually a string Specfies the glyph to draw for each point on the scatterplot. See `matplotlib.markers <http://matplotlib.org/api/markers_api.html#module-matplotlib.markers>`_ for examples. Default: 'o' Notes ----- Other ``kwargs`` are passed to `matplotlib.pyplot.scatter <https://matplotlib.org/devdocs/api/_as_gen/matplotlib.pyplot.scatter.html>`_ """ if experiment is None: raise util.CytoflowViewError('experiment', "No experiment specified") plot_name = kwargs.pop('plot_name', None) if plot_name is None: raise util.CytoflowViewError('plot_name', "'plot_name' must be a tube filename") if plot_name not in [tube.file for tube in experiment.history[0].tubes]: if plot_name not in [Path(tube.file).name for tube in experiment.history[0].tubes]: raise util.CytoflowViewError('plot_name', "'plot_name' must be a tube in `experiment`") kwargs.setdefault('alpha', 0.25) kwargs.setdefault('s', 0.5) kwargs.setdefault('marker', 'o') kwargs.setdefault('antialiased', True) tube = next((tube for tube in experiment.history[0].tubes if tube.file == plot_name or Path(tube.file).name == plot_name)) assert(tube in experiment.history[0].tubes) if tube not in self.op._tube_status: raise util.CytoflowViewError(None, "Tube {} was not found in the operation -- did you call estimate()?" .format(tube.file)) num_channels = len(self.op._tube_channels[tube]) if tube in self.op._tube_channels else 0 num_plots = num_channels + 1 # plus one for the density plot if tube in self.op._measures_kde: num_plots += 1 nrow = math.ceil(num_plots / 2) if len(experiment.history[0].tubes) > 1: experiment = experiment.subset(list(tube.conditions.keys()), tuple(tube.conditions.values())) if tube in self.op._tube_channels: for idx, channel in enumerate(self.op._tube_channels[tube]): if self.op._tube_status[tube] == "CLEANED" or self.op._tube_status[tube] == "UNCLEAN": title = "{} : {:.3f} - {:.3f} / {:.3f} - {:.3f}" \ .format(channel, self.op._tube_stats[tube].loc[channel, "Drift Pre"], self.op._tube_stats[tube].loc[channel, "Drift Post"], self.op._tube_stats[tube].loc[channel, "Max Discontinuity Pre"], self.op._tube_stats[tube].loc[channel, "Max Discontinuity Post"]) else: title = "{} : {:.3f} / {:.3f}" \ .format(channel, self.op._tube_stats[tube].loc[channel, "Drift Pre"], self.op._tube_stats[tube].loc[channel, "Max Discontinuity Pre"]) plt.subplot(nrow, 2, idx + 1, title = title) for binn, events in self.op._tube_bins[tube]: plt.scatter(x = events[self.op.time_channel], y = events[channel], c = 'tab:blue' if self.op._bin_kept[tube][binn] else 'tab:orange', **kwargs) plt.plot([min(events[self.op.time_channel]), max(events[self.op.time_channel])], [self.op._bin_means[tube][channel][binn], self.op._bin_means[tube][channel][binn]], "brown") plt.subplot(nrow, 2, num_channels + 1, title = "Bin Density Distribution") support, density = self.op._density_kde[tube] plt.plot(support, density) for peak in self.op._density_peaks[tube]: plt.axvline(peak, color = 'r') if tube in self.op._density_pdf: loc, scale = self.op._density_pdf[tube] plt.plot(support, scipy.stats.norm.pdf(support, loc = loc, scale = scale), color = 'r', linestyle = 'dotted') if tube in self.op._measures_kde: plt.subplot(nrow, 2, num_plots, title = "Measures Distribution") support, density = self.op._measures_kde[tube] plt.plot(support, density) for peak in self.op._measures_peaks[tube]: plt.axvline(peak, color = 'r') if tube in self.op._measures_pdf: loc, scale = self.op._measures_pdf[tube] plt.plot(support, scipy.stats.norm.pdf(support, loc = loc, scale = scale), color = 'r', linestyle = 'dotted') plt.suptitle("{} ({})".format(plot_name, self.op._tube_status[tube]))