Source code for cytoflow.operations.gaussian

#!/usr/bin/env python3.8
# coding: latin-1

# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2022
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
cytoflow.operations.gaussian
----------------------------

`gaussian` contains three classes:

`GaussianMixtureOp` -- an operation that fits a Gaussian mixture
model to one or more channels.

`GaussianMixture1DView` -- a diagnostic view that shows how the
`GaussianMixtureOp` estimated its model (on a 1D data set, 
using a histogram).

`GaussianMixture2DView` -- a diagnostic view that shows how the
`GaussianMixtureOp` estimated its model (on a 2D data set,
using a scatter plot).
"""
import re
from warnings import warn

import matplotlib.pyplot as plt
from matplotlib.patches import Polygon, Rectangle

from traits.api import (HasStrictTraits, Str, Dict, Any, Instance, Bool, 
                        Constant, List, provides)

import sklearn.mixture
import scipy.stats
import scipy.linalg

import pandas as pd
import numpy as np

from cytoflow.views import IView, HistogramView, ScatterplotView
import cytoflow.utility as util

from .i_operation import IOperation
from .base_op_views import By1DView, By2DView, AnnotatingView

[docs]@provides(IOperation) class GaussianMixtureOp(HasStrictTraits): """ This module fits a Gaussian mixture model with a specified number of components to one or more channels. If `num_components` ``> 1``, `apply` creates a new categorical metadata variable named ``name``, with possible values ``{name}_1`` .... ``name_n`` where ``n`` is the number of components. An event is assigned to ``name_i`` category if it has the highest posterior probability of having been produced by component ``i``. If an event has a value that is outside the range of one of the channels' scales, then it is assigned to ``{name}_None``. Optionally, if `sigma` is greater than 0, `apply` creates new ``boolean`` metadata variables named ``{name}_1`` ... ``{name}_n`` where ``n`` is the number of components. The column ``{name}_i`` is ``True`` if the event is less than `sigma` standard deviations from the mean of component ``i``. If `num_components` is ``1``, `sigma` must be greater than 0. .. note:: The `sigma` attribute does NOT affect how events are assigned to components in the new ``name`` variable. That is to say, if an event is more than `sigma` standard deviations from ALL of the components, you might expect it would be labeled as ``{name}_None``. It is *not*. An event is only labeled ``{name}_None`` if it has a value that is outside of the channels' scales. Optionally, if `posteriors` is ``True``, `apply` creates a new ``double`` metadata variables named ``{name}_1_posterior`` ... ``{name}_n_posterior`` where ``n`` is the number of components. The column ``{name}_i_posterior`` contains the posterior probability that this event is a member of component ``i``. Finally, the same mixture model (mean and standard deviation) may not be appropriate for every subset of the data. If this is the case, you can use the `by` attribute to specify metadata by which to aggregate the data before estimating (and applying) a mixture model. The number of components must be the same across each subset, though. Attributes ---------- name : Str The operation name; determines the name of the new metadata column channels : List(Str) The channels to apply the mixture model to. scale : Dict(Str : {"linear", "logicle", "log"}) Re-scale the data in the specified channels before fitting. If a channel is in `channels` but not in `scale`, the current package-wide default (set with `set_default_scale`) is used. num_components : Int (default = 1) How many components to fit to the data? Must be a positive integer. sigma : Float If not None, use this operation as a "gate": for each component, create a new boolean variable ``{name}_i`` and if the event is within `sigma` standard deviations, set that variable to ``True``. If `num_components` is ``1``, must be ``> 0``. by : List(Str) A list of metadata attributes to aggregate the data before estimating the model. For example, if the experiment has two pieces of metadata, ``Time`` and ``Dox``, setting `by` to ``["Time", "Dox"]`` will fit the model separately to each subset of the data with a unique combination of ``Time`` and ``Dox``. posteriors : Bool (default = False) If ``True``, add columns named ``{name}_{i}_posterior`` giving the posterior probability that the event is in component ``i``. Useful for filtering out low-probability events. Notes ----- We use the Mahalnobis distance as a multivariate generalization of the number of standard deviations an event is from the mean of the multivariate gaussian. If :math:`\\vec{x}` is an observation from a distribution with mean :math:`\\vec{\\mu}` and :math:`S` is the covariance matrix, then the Mahalanobis distance is :math:`\\sqrt{(x - \\mu)^T \\cdot S^{-1} \\cdot (x - \\mu)}`. Examples -------- .. plot:: :context: close-figs Make a little data set. >>> import cytoflow as flow >>> import_op = flow.ImportOp() >>> import_op.tubes = [flow.Tube(file = "Plate01/RFP_Well_A3.fcs", ... conditions = {'Dox' : 10.0}), ... flow.Tube(file = "Plate01/CFP_Well_A4.fcs", ... conditions = {'Dox' : 1.0})] >>> import_op.conditions = {'Dox' : 'float'} >>> ex = import_op.apply() Create and parameterize the operation. .. plot:: :context: close-figs >>> gm_op = flow.GaussianMixtureOp(name = 'Gauss', ... channels = ['Y2-A'], ... scale = {'Y2-A' : 'log'}, ... num_components = 2) Estimate the clusters .. plot:: :context: close-figs >>> gm_op.estimate(ex) Plot a diagnostic view .. plot:: :context: close-figs >>> gm_op.default_view().plot(ex) Apply the gate .. plot:: :context: close-figs >>> ex2 = gm_op.apply(ex) Plot a diagnostic view with the event assignments .. plot:: :context: close-figs >>> gm_op.default_view().plot(ex2) And with two channels: .. plot:: :context: close-figs >>> gm_op = flow.GaussianMixtureOp(name = 'Gauss', ... channels = ['V2-A', 'Y2-A'], ... scale = {'V2-A' : 'log', ... 'Y2-A' : 'log'}, ... num_components = 2) >>> gm_op.estimate(ex) >>> ex2 = gm_op.apply(ex) >>> gm_op.default_view().plot(ex2) """ id = Constant('edu.mit.synbio.cytoflow.operations.gaussian') friendly_id = Constant("Gaussian Mixture Model") name = Str channels = List(Str) scale = Dict(Str, util.ScaleEnum) num_components = util.PositiveInt(1, allow_zero = False) sigma = util.PositiveFloat(None, allow_zero = False, allow_none = True) by = List(Str) posteriors = Bool(False) # the key is either a single value or a tuple _gmms = Dict(Any, Instance(sklearn.mixture.GaussianMixture), transient = True) _scale = Dict(Str, Instance(util.IScale), transient = True)
[docs] def estimate(self, experiment, subset = None): """ Estimate the Gaussian mixture model parameters Parameters ---------- experiment : Experiment The data to use to estimate the mixture parameters subset : str (default = None) If set, a Python expression to determine the subset of the data to use to in the estimation. """ if experiment is None: raise util.CytoflowOpError('experiment', "No experiment specified") if len(self.channels) == 0: raise util.CytoflowOpError('channels', "Must set at least one channel") if len(self.channels) != len(set(self.channels)): raise util.CytoflowOpError('channels', "Must not duplicate channels") for c in self.channels: if c not in experiment.data: raise util.CytoflowOpError('channels', "Channel {0} not found in the experiment" .format(c)) for c in self.scale: if c not in self.channels: raise util.CytoflowOpError('channels', "Scale set for channel {0}, but it isn't " "in the experiment" .format(c)) for b in self.by: if b not in experiment.data: raise util.CytoflowOpError('by', "Aggregation metadata {} not found, " "must be one of {}" .format(b, experiment.conditions)) if subset: try: experiment = experiment.query(subset) except: raise util.CytoflowViewError('subset', "Subset string '{0}' isn't valid" .format(subset)) if len(experiment) == 0: raise util.CytoflowViewError('subset', "Subset string '{0}' returned no events" .format(subset)) if self.by: groupby = experiment.data.groupby(self.by) else: # use a lambda expression to return a group that contains # all the events groupby = experiment.data.groupby(lambda _: True) # get the scale. estimate the scale params for the ENTIRE data set, # not subsets we get from groupby(). And we need to save it so that # the data is transformed the same way when we apply() for c in self.channels: if c in self.scale: self._scale[c] = util.scale_factory(self.scale[c], experiment, channel = c) else: self._scale[c] = util.scale_factory(util.get_default_scale(), experiment, channel = c) gmms = {} for group, data_subset in groupby: if len(data_subset) == 0: raise util.CytoflowOpError(None, "Group {} had no data" .format(group)) x = data_subset.loc[:, self.channels[:]] for c in self.channels: x[c] = self._scale[c](x[c]) # drop data that isn't in the scale range for c in self.channels: x = x[~(np.isnan(x[c]))] x = x.values gmm = sklearn.mixture.GaussianMixture(n_components = self.num_components, covariance_type = "full", random_state = 1) gmm.fit(x) if not gmm.converged_: raise util.CytoflowOpError(None, "Estimator didn't converge" " for group {0}" .format(group)) # in the 1D version, we sorted the components by the means -- so # the first component has the lowest mean, the second component # has the next-lowest mean, etc. # that doesn't work in the general case. instead, we assume that # the clusters are likely (?) to be arranged along *one* of the # axes, so we take the |norm| of the mean of each cluster and # sort that way. norms = np.sum(gmm.means_ ** 2, axis = 1) ** 0.5 sort_idx = np.argsort(norms) gmm.means_ = gmm.means_[sort_idx] gmm.weights_ = gmm.weights_[sort_idx] gmm.covariances_ = gmm.covariances_[sort_idx] gmm.precisions_ = gmm.precisions_[sort_idx] gmm.precisions_cholesky_ = gmm.precisions_cholesky_[sort_idx] gmms[group] = gmm self._gmms = gmms
[docs] def apply(self, experiment): """ Assigns new metadata to events using the mixture model estimated in `estimate`. Returns ------- Experiment A new `Experiment` with the new condition variables as described in the class documentation. Also adds the following new statistics: - **mean** : Float the mean of the fitted gaussian in each channel for each component. - **sigma** : (Float, Float) the locations the mean +/- one standard deviation in each channel for each component. - **correlation** : Float the correlation coefficient between each pair of channels for each component. - **proportion** : Float the proportion of events in each component of the mixture model. only added if `num_components` ``> 1``. """ if experiment is None: raise util.CytoflowOpError('experiment', "No experiment specified") if len(self.channels) == 0: raise util.CytoflowOpError('channels', "Must set at least one channel") # make sure name got set! if not self.name: raise util.CytoflowOpError('name', "You have to set the gate's name " "before applying it!") if self.name != util.sanitize_identifier(self.name): raise util.CytoflowOpError('name', "Name can only contain letters, numbers and underscores." .format(self.name)) if self.num_components > 1 and self.name in experiment.data.columns: raise util.CytoflowOpError('name', "Experiment already has a column named {0}" .format(self.name)) if self.sigma is not None: for i in range(1, self.num_components + 1): cname = "{}_{}".format(self.name, i) if cname in experiment.data.columns: raise util.CytoflowOpError('name', "Experiment already has a column named {}" .format(cname)) if self.posteriors: for i in range(1, self.num_components + 1): cname = "{}_{}_posterior".format(self.name, i) if cname in experiment.data.columns: raise util.CytoflowOpError('name', "Experiment already has a column named {}" .format(cname)) if not self._gmms: raise util.CytoflowOpError(None, "No components found. Did you forget to " "call estimate()?") for c in self.channels: if c not in self._scale: raise util.CytoflowOpError(None, "Model scale not set. Did you forget " "to call estimate()?") for c in self.channels: if c not in experiment.channels: raise util.CytoflowOpError('channels', "Channel {0} not found in the experiment" .format(c)) for b in self.by: if b not in experiment.conditions: raise util.CytoflowOpError('by', "Aggregation metadata {} not found, " "must be one of {}" .format(b, experiment.conditions)) # # if self.num_components == 1 and self.sigma == 0.0: # raise util.CytoflowOpError('sigma', # "if num_components is 1, sigma must be > 0.0") if self.num_components == 1 and self.posteriors: warn("If num_components == 1, all posteriors will be 1", util.CytoflowOpWarning) # raise util.CytoflowOpError('posteriors', # "If num_components == 1, all posteriors will be 1.") if self.num_components > 1: event_assignments = pd.Series(["{}_None".format(self.name)] * len(experiment), dtype = "object") if self.sigma is not None: event_gate = {i : pd.Series([False] * len(experiment), dtype = "double") for i in range(self.num_components)} if self.posteriors: event_posteriors = {i : pd.Series([0.0] * len(experiment), dtype = "double") for i in range(self.num_components)} if self.by: groupby = experiment.data.groupby(self.by) else: # use a lambda expression to return a group that # contains all the events groupby = experiment.data.groupby(lambda _: True) # make the statistics components = [x + 1 for x in range(self.num_components)] prop_idx = pd.MultiIndex.from_product([experiment[x].unique() for x in self.by] + [components], names = list(self.by) + ["Component"]) prop_stat = pd.Series(name = "{} : {}".format(self.name, "proportion"), index = prop_idx, dtype = np.dtype(object)).sort_index() mean_idx = pd.MultiIndex.from_product([experiment[x].unique() for x in self.by] + [components] + [self.channels], names = list(self.by) + ["Component"] + ["Channel"]) mean_stat = pd.Series(name = "{} : {}".format(self.name, "mean"), index = mean_idx, dtype = np.dtype(object)).sort_index() sigma_stat = pd.Series(name = "{} : {}".format(self.name, "sigma"), index = mean_idx, dtype = np.dtype(object)).sort_index() interval_stat = pd.Series(name = "{} : {}".format(self.name, "interval"), index = mean_idx, dtype = np.dtype(object)).sort_index() corr_idx = pd.MultiIndex.from_product([experiment[x].unique() for x in self.by] + [components] + [self.channels] + [self.channels], names = list(self.by) + ["Component"] + ["Channel_1"] + ["Channel_2"]) corr_stat = pd.Series(name = "{} : {}".format(self.name, "correlation"), index = corr_idx, dtype = np.dtype(object)).sort_index() for group, data_subset in groupby: if group not in self._gmms: # there weren't any events in this group, so we didn't get # a gmm. continue gmm = self._gmms[group] x = data_subset.loc[:, self.channels[:]] for c in self.channels: x[c] = self._scale[c](x[c]) # which values are missing? x_na = pd.Series([False] * len(x)) for c in self.channels: x_na[np.isnan(x[c]).values] = True x = x.values x_na = x_na.values group_idx = groupby.groups[group] if self.num_components > 1: predicted = np.full(len(x), -1, "int") predicted[~x_na] = gmm.predict(x[~x_na]) predicted_str = pd.Series(["(none)"] * len(predicted)) for c in range(0, self.num_components): predicted_str[predicted == c] = "{0}_{1}".format(self.name, c + 1) predicted_str[predicted == -1] = "{0}_None".format(self.name) predicted_str.index = group_idx event_assignments.iloc[group_idx] = predicted_str # if we're doing sigma-based gating, for each component check # to see if the event is in the sigma gate. if self.sigma is not None: for c in range(self.num_components): s = np.linalg.pinv(gmm.covariances_[c]) mu = gmm.means_[c] # compute the Mahalanobis distance f = lambda x, mu, s: np.dot(np.dot((x - mu).T, s), (x - mu)) dist = np.apply_along_axis(f, 1, x, mu, s) # come up with a threshold based on sigma. you'll note we # didn't sqrt dist: that's because for a multivariate # Gaussian, the square of the Mahalanobis distance is # chi-square distributed p = (scipy.stats.norm.cdf(self.sigma) - 0.5) * 2 thresh = scipy.stats.chi2.ppf(p, 1) event_gate[c].iloc[group_idx] = np.less_equal(dist, thresh) if self.posteriors: p = np.full((len(x), self.num_components), 0.0) p[~x_na] = gmm.predict_proba(x[~x_na]) for c in range(self.num_components): event_posteriors[c].iloc[group_idx] = p[:, c] for c in range(self.num_components): if len(self.by) == 0: g = tuple([c + 1]) elif hasattr(group, '__iter__') and not isinstance(group, (str, bytes)): g = tuple(list(group) + [c + 1]) else: g = tuple([group] + [c + 1]) prop_stat.at[g] = gmm.weights_[c] for cidx1, channel1 in enumerate(self.channels): g2 = tuple(list(g) + [channel1]) mean_stat.at[g2] = self._scale[channel1].inverse(gmm.means_[c, cidx1]) s, corr = util.cov2corr(gmm.covariances_[c]) sigma_stat[g2] = (self._scale[channel1].inverse(s[cidx1])) interval_stat.at[g2] = (self._scale[channel1].inverse(gmm.means_[c, cidx1] - s[cidx1]), self._scale[channel1].inverse(gmm.means_[c, cidx1] + s[cidx1])) for cidx2, channel2 in enumerate(self.channels): g3 = tuple(list(g2) + [channel2]) corr_stat[g3] = corr[cidx1, cidx2] corr_stat.drop(tuple(list(g2) + [channel1]), inplace = True) new_experiment = experiment.clone(deep = False) if self.num_components > 1: new_experiment.add_condition(self.name, "category", event_assignments) if self.sigma is not None: for c in range(self.num_components): gate_name = "{}_{}".format(self.name, c + 1) new_experiment.add_condition(gate_name, "bool", event_gate[c]) if self.posteriors: for c in range(self.num_components): post_name = "{}_{}_posterior".format(self.name, c + 1) new_experiment.add_condition(post_name, "double", event_posteriors[c]) new_experiment.statistics[(self.name, "mean")] = pd.to_numeric(mean_stat) new_experiment.statistics[(self.name, "sigma")] = sigma_stat new_experiment.statistics[(self.name, "interval")] = interval_stat if len(corr_stat) > 0: new_experiment.statistics[(self.name, "correlation")] = pd.to_numeric(corr_stat) if self.num_components > 1: new_experiment.statistics[(self.name, "proportion")] = pd.to_numeric(prop_stat) new_experiment.history.append(self.clone_traits(transient = lambda _: True)) return new_experiment
[docs] def default_view(self, **kwargs): """ Returns a diagnostic plot of the Gaussian mixture model. Returns ------- `IView` An `IView`, call `plot` to see the diagnostic plot. """ channels = kwargs.pop('channels', self.channels) scale = kwargs.pop('scale', self.scale) for c in channels: if c not in self.channels: raise util.CytoflowViewError('channels', "Channel {} isn't in the operation's channels" .format(c)) for s in scale: if s not in self.channels: raise util.CytoflowViewError('scale', "Channel {} isn't in the operation's channels" .format(s)) for c in channels: if c not in scale: scale[c] = util.get_default_scale() if len(channels) == 0: raise util.CytoflowViewError('channels', "Must specify at least one channel for a default view") elif len(channels) == 1: v = GaussianMixture1DView(op = self) v.trait_set(channel = channels[0], scale = scale[channels[0]], **kwargs) return v elif len(channels) == 2: v = GaussianMixture2DView(op = self) v.trait_set(xchannel = channels[0], ychannel = channels[1], xscale = scale[channels[0]], yscale = scale[channels[1]], **kwargs) return v else: raise util.CytoflowViewError('channels', "Can't specify more than two channels for a default view")
[docs]@provides(IView) class GaussianMixture1DView(By1DView, AnnotatingView, HistogramView): """ A default view for `GaussianMixtureOp` that plots the histogram of a single channel, then the estimated Gaussian distributions on top of it. Attributes ---------- """ id = Constant('edu.mit.synbio.cytoflow.view.gaussianmixture1dview') friendly_id = Constant("1D Gaussian Mixture Diagnostic Plot") channel = Str scale = util.ScaleEnum
[docs] def plot(self, experiment, **kwargs): """ Plot the plots. Parameters ---------- """ if experiment is None: raise util.CytoflowViewError('experiment', "No experiment specified") if self.op.num_components == 1: annotation_facet = self.op.name + "_1" else: annotation_facet = self.op.name view, trait_name = self._strip_trait(annotation_facet) if self.channel in self.op._scale: scale = self.op._scale[self.channel] else: scale = util.scale_factory(self.scale, experiment, channel = self.channel) super(GaussianMixture1DView, view).plot(experiment, annotation_facet = annotation_facet, annotation_trait = trait_name, annotations = self.op._gmms, scale = scale, **kwargs)
def _annotation_plot(self, axes, annotation, annotation_facet, annotation_value, annotation_color, **kwargs): # annotation is an instance of mixture.GaussianMixture gmm = annotation if annotation_value is None: for i in range(len(gmm.means_)): self._annotation_plot(axes, annotation, annotation_facet, i, annotation_color, **kwargs) return elif type(annotation_value) is str: try: idx_re = re.compile(annotation_facet + '_(\d+)') idx = idx_re.match(annotation_value).group(1) idx = int(idx) - 1 except: return elif isinstance(annotation_value, np.bool_): if annotation_value: idx = 0 else: return else: idx = annotation_value kwargs.setdefault('orientation', 'vertical') if kwargs['orientation'] == 'horizontal': scale = kwargs['yscale'] patch_area = 0.0 for k in range(0, len(axes.patches)): patch = axes.patches[k] if isinstance(patch, Polygon): xy = patch.get_xy() patch_area += poly_area([scale(p[1]) for p in xy], [p[0] for p in xy]) elif isinstance(patch, Rectangle): for xy in patch.get_path().to_polygons(): patch_area += poly_area([p[1] for p in xy], [p[0] for p in xy]) plt_min, plt_max = plt.gca().get_ylim() y = scale.inverse(np.linspace(scale(scale.clip(plt_min)), scale(scale.clip(plt_max)), 500)) pdf_scale = patch_area * gmm.weights_[idx] mean = gmm.means_[idx][0] stdev = np.sqrt(gmm.covariances_[idx][0]) x = scipy.stats.norm.pdf(scale(y), mean, stdev) * pdf_scale axes.plot(x, y, color = annotation_color) else: scale = kwargs['xscale'] patch_area = 0.0 for k in range(0, len(axes.patches)): patch = axes.patches[k] if isinstance(patch, Polygon): xy = patch.get_xy() patch_area += poly_area([scale(p[0]) for p in xy], [p[1] for p in xy]) elif isinstance(patch, Rectangle): for xy in patch.get_path().to_polygons(): patch_area += poly_area([p[0] for p in xy], [p[1] for p in xy]) plt_min, plt_max = plt.gca().get_xlim() x = scale.inverse(np.linspace(scale(scale.clip(plt_min)), scale(scale.clip(plt_max)), 500)) pdf_scale = patch_area * gmm.weights_[idx] mean = gmm.means_[idx][0] stdev = np.sqrt(gmm.covariances_[idx][0]) y = scipy.stats.norm.pdf(scale(x), mean, stdev) * pdf_scale axes.plot(x, y, color = annotation_color)
# from http://stackoverflow.com/questions/24467972/calculate-area-of-polygon-given-x-y-coordinates
[docs]def poly_area(x,y): return 0.5*np.abs(np.dot(x,np.roll(y,1))-np.dot(y,np.roll(x,1)))
# a few more imports for drawing scaled ellipses import matplotlib.path as path import matplotlib.patches as patches import matplotlib.transforms as transforms
[docs]@provides(IView) class GaussianMixture2DView(By2DView, AnnotatingView, ScatterplotView): """ A default view for `GaussianMixtureOp` that plots the scatter plot of a two channels, then the estimated 2D Gaussian distributions on top of it. Attributes ---------- """ id = Constant('edu.mit.synbio.cytoflow.view.gaussianmixture2dview') friendly_id = Constant("2D Gaussian Mixture Diagnostic Plot") xchannel = Str xscale = util.ScaleEnum ychannel = Str yscale = util.ScaleEnum
[docs] def plot(self, experiment, **kwargs): """ Plot the plots. Parameters ---------- """ if experiment is None: raise util.CytoflowViewError('experiment', "No experiment specified") if self.op.num_components == 1: annotation_facet = self.op.name + "_1" else: annotation_facet = self.op.name view, trait_name = self._strip_trait(annotation_facet) if self.xchannel in self.op._scale: xscale = self.op._scale[self.xchannel] else: xscale = util.scale_factory(self.xscale, experiment, channel = self.xchannel) if self.ychannel in self.op._scale: yscale = self.op._scale[self.ychannel] else: yscale = util.scale_factory(self.yscale, experiment, channel = self.ychannel) super(GaussianMixture2DView, view).plot(experiment, annotation_facet = annotation_facet, annotation_trait = trait_name, annotations = self.op._gmms, xscale = xscale, yscale = yscale, **kwargs)
def _annotation_plot(self, axes, annotation, annotation_facet, annotation_value, annotation_color, **kwargs): # annotation is an instance of mixture.GaussianMixture gmm = annotation if annotation_value is None: for i in range(len(gmm.means_)): self._annotation_plot(axes, annotation, annotation_facet, i, annotation_color, **kwargs) return elif isinstance(annotation_value, str): try: idx_re = re.compile(annotation_facet + '_(\d+)') idx = idx_re.match(annotation_value).group(1) idx = int(idx) - 1 except: return elif isinstance(annotation_value, np.bool_): if annotation_value: idx = 0 else: return else: idx = annotation_value xscale = kwargs['xscale'] yscale = kwargs['yscale'] mean = gmm.means_[idx] covar = gmm.covariances_[idx] v, w = scipy.linalg.eigh(covar) u = w[0] / scipy.linalg.norm(w[0]) #rotation angle (in degrees) t = np.arctan(u[1] / u[0]) t = 180 * t / np.pi # in order to scale the ellipses correctly, we have to make them # ourselves out of an affine-scaled unit circle. The interface # is the same as matplotlib.patches.Ellipse _plot_ellipse(axes, xscale, yscale, mean, np.sqrt(v[0]), np.sqrt(v[1]), 180 + t, color = annotation_color, fill = False, linewidth = 2) _plot_ellipse(axes, xscale, yscale, mean, np.sqrt(v[0]) * 2, np.sqrt(v[1]) * 2, 180 + t, color = annotation_color, fill = False, linewidth = 2, alpha = 0.66) _plot_ellipse(axes, xscale, yscale, mean, np.sqrt(v[0]) * 3, np.sqrt(v[1]) * 3, 180 + t, color = annotation_color, fill = False, linewidth = 2, alpha = 0.33)
def _plot_ellipse(ax, xscale, yscale, center, width, height, angle, **kwargs): tf = transforms.Affine2D() \ .scale(width, height) \ .rotate_deg(angle) \ .translate(*center) tf_path = tf.transform_path(path.Path.unit_circle()) v = tf_path.vertices v = np.vstack((xscale.inverse(v[:, 0]), yscale.inverse(v[:, 1]))).T scaled_path = path.Path(v, tf_path.codes) scaled_patch = patches.PathPatch(scaled_path, **kwargs) ax.add_patch(scaled_patch) util.expand_class_attributes(GaussianMixture1DView) util.expand_method_parameters(GaussianMixture1DView, GaussianMixture1DView.plot) util.expand_class_attributes(GaussianMixture2DView) util.expand_method_parameters(GaussianMixture2DView, GaussianMixture2DView.plot)