Source code for cytoflow.utility.logicle_scale

#!/usr/bin/env python3.8
# coding: latin-1

# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2022
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
cytoflow.utility.logicle_scale
------------------------------
A scale that transforms the data using the ``logicle`` function.
    
`LogicleScale` -- implements `IScale`, the `cytoflow` interface for the scale.

`MatplotlibLogicleScale` -- inherits `matplotlib.scale.ScaleBase`, implements
the matplotlib interface

`LogicleMajorLocator` -- inherits `matplotlib.ticker.Locator`, lets matplotlib know where major
tics are along a plot axis.

`LogicleMinorLocator` -- inherits `matplotlib.ticker.Locator`, lets matplotlib know where minor
tics are along a plot axis

"""

import math, sys
from warnings import warn

from traits.api import (HasStrictTraits, HasTraits, Float, Property, Instance, Str,
                        cached_property, Undefined, provides, Constant, Dict,
                        Tuple, Array)
                       
import numpy as np
import pandas as pd

import matplotlib.scale
from matplotlib import transforms
from matplotlib.ticker import NullFormatter, LogFormatterMathtext
from matplotlib.ticker import Locator
import matplotlib.colors

from .scale import IScale, register_scale
from .logicle_ext.Logicle import FastLogicle
from .util_functions import is_numeric
from .cytoflow_errors import CytoflowError, CytoflowWarning

[docs]@provides(IScale) class LogicleScale(HasStrictTraits): """ A scale that transforms the data using the ``logicle`` function. This scaling method implements a "linear-like" region around 0, and a "log-like" region for large values, with a very smooth transition between them. It's particularly good for compensated data, and data where you have "negative" events (events with a fluorescence of ~0.) If you don't have any data around 0, you might be better of with a more traditional log scale. The transformation has one parameter, `W`, which specifies the width of the "linear" range in log10 decades. By default, the optimal value is estimated from the data; but if you assign a value to `W` it will be used. ``0.5`` is usually a good start. Attributes ---------- experiment : Instance(cytoflow.Experiment) the `Experiment` used to estimate the scale parameters. channel : Str If set, choose scale parameters from this channel in `experiment`. One of `channel`, `condition` or `statistic` must be set. condition : Str If set, choose scale parameters from this condition in `experiment`. One of `channel`, `condition` or `statistic` must be set. statistic : Str If set, choose scale parameters from this statistic in `experiment`. One of `channel`, `condition` or `statistic` must be set. quantiles = Tuple(Float, Float) (default = (0.001, 0.999)) If there are a few very large or very small values, this can throw off matplotlib's choice of default axis ranges. Set ``quantiles`` to choose what part of the data to consider when choosing axis ranges. W : Float (default = estimated from data) The width of the linear range, in log10 decades. can estimate from data, or use a fixed value like 0.5. M : Float (default = 4.5) The width of the log portion of the display, in log10 decades. A : Float (default = 0.0) additional decades of negative data to include. the default display usually captures all the data, so 0 is fine to start. r : Float (default = 0.05) Quantile used to estimate `W`. References ---------- [1] A new "Logicle" display method avoids deceptive effects of logarithmic scaling for low signals and compensated data. Parks DR, Roederer M, Moore WA. Cytometry A. 2006 Jun;69(6):541-51. PMID: 16604519 http://onlinelibrary.wiley.com/doi/10.1002/cyto.a.20258/full [2] Update for the logicle data scale including operational code implementations. Moore WA, Parks DR. Cytometry A. 2012 Apr;81(4):273-7. doi: 10.1002/cyto.a.22030 PMID: 22411901 http://onlinelibrary.wiley.com/doi/10.1002/cyto.a.22030/full """ id = Constant("edu.mit.synbio.cytoflow.utility.logicle_scale") name = "logicle" experiment = Instance("cytoflow.Experiment") # what data do we use to compute scale parameters? set one. channel = Str condition = Str statistic = Tuple(Str, Str) error_statistic = Tuple(Str, Str) data = Array W = Property(Float, depends_on = "[experiment, channel, M, _T, r]") M = Float(4.5, desc = "the width of the display in log10 decades") A = Float(0.0, desc = "additional decades of negative data to include.") r = Float(0.05, desc = "quantile to use for estimating the W parameter.") _W = Float(Undefined) _T = Property(Float, depends_on = "[experiment, condition, channel]") _logicle = Property(Instance(FastLogicle), depends_on = "[_T, W, M, A]") def __call__(self, data): """ Transforms `data` using this scale. Careful! May return `NaN` if the scale domain doesn't match the data (ie, applying a log10 scale to negative numbers.) """ try: logicle_min = self._logicle.inverse(0.0) logicle_max = self._logicle.inverse(1.0 - sys.float_info.epsilon) if isinstance(data, pd.Series): data = data.clip(logicle_min, logicle_max) return data.apply(self._logicle.scale) elif isinstance(data, np.ndarray): data = np.clip(data, logicle_min, logicle_max) scale = np.vectorize(self._logicle.scale) return scale(data) elif isinstance(data, float): data = max(min(data, logicle_max), logicle_min) return self._logicle.scale(data) elif isinstance(data, int): data = float(data) data = max(min(data, logicle_max), logicle_min) return self._logicle.scale(data) else: try: return list(map(self._logicle.scale, data)) except TypeError as e: raise CytoflowError("Unknown data type") from e except ValueError as e: raise CytoflowError(e.strerror)
[docs] def inverse(self, data): """ Transforms 'data' using the inverse of this scale. """ try: if isinstance(data, pd.Series): data = data.clip(0, 1.0 - sys.float_info.epsilon) return data.apply(self._logicle.inverse) elif isinstance(data, np.ndarray): data = np.clip(data, 0, 1.0 - sys.float_info.epsilon) inverse = np.vectorize(self._logicle.inverse) return inverse(data) elif isinstance(data, float): data = max(min(data, 1.0 - sys.float_info.epsilon), 0.0) return self._logicle.inverse(data) elif isinstance(data, int): data = float(data) data = max(min(data, 1.0 - sys.float_info.epsilon), 0.0) return self._logicle.inverse(data) else: try: return list(map(self._logicle.inverse, data)) except TypeError as e: raise CytoflowError("Unknown data type") from e except ValueError as e: raise CytoflowError(str(e))
[docs] def clip(self, data): """ Clips data to the range of the scale function """ try: logicle_min = self._logicle.inverse(0.0) logicle_max = self._logicle.inverse(1.0 - sys.float_info.epsilon) if isinstance(data, pd.Series): return data.clip(logicle_min, logicle_max) elif isinstance(data, np.ndarray): return np.clip(data, logicle_min, logicle_max) elif isinstance(data, float): return max(min(data, logicle_max), logicle_min) elif isinstance(data, int): data = float(data) return max(min(data, logicle_max), logicle_min) else: try: return [max(min(x, logicle_max), logicle_min) for x in data] except TypeError as e: raise CytoflowError("Unknown data type") from e except ValueError as e: raise CytoflowError(e.strerror)
[docs] def norm(self, vmin = None, vmax = None): """ A factory function that returns `matplotlib.colors.Normalize` instance, which normalizes values for a `matplotlib` color palette. """ # it turns out that Logicle is already defined as a normalization to # [0, 1]. vmin and vmax don't actually do anything here. class LogicleNormalize(matplotlib.colors.Normalize): def __init__(self, scale = None, vmin = None, vmax = None): super().__init__(vmin, vmax) self._scale = scale self.vmin = scale.inverse(0.0) self.vmax = scale.inverse(1.0 - sys.float_info.epsilon) def __call__(self, data, clip = None): # it turns out that Logicle is already defined as a # normalization to [0, 1]. ret = self._scale(data) return np.ma.masked_array(ret) return LogicleNormalize(scale = self, vmin = vmin, vmax = vmax)
@cached_property def _get__T(self): "The range of possible data values" if self.experiment is None: return Undefined if self.channel and self.channel in self.experiment.channels: if "range" in self.experiment.metadata[self.channel]: return float(self.experiment.metadata[self.channel]["range"]) else: return float(self.experiment.data[self.channel].max()) elif self.condition and self.condition in self.experiment.conditions: return float(self.experiment.data[self.condition].max()) elif self.statistic in self.experiment.statistics \ and not self.error_statistic in self.experiment.statistics: stat = self.experiment.statistics[self.statistic] assert is_numeric(stat) return float(stat.max()) elif self.statistic in self.experiment.statistics and \ self.error_statistic in self.experiment.statistics: stat = self.experiment.statistics[self.statistic] err_stat = self.experiment.statistics[self.error_statistic] try: err_max = max([max(x) for x in err_stat]) return float(err_max) except (TypeError, IndexError): err_max = err_stat.max() stat_max = stat.max() return float(stat_max + err_max) elif self.data.size > 0: return float(self.data.max()) else: return Undefined @cached_property def _get_W(self): if self.experiment is None: return Undefined if self._W is not Undefined: return self._W if self.channel and self.channel in self.experiment.channels: data = self.experiment[self.channel] if self.r <= 0 or self.r >= 1: raise CytoflowError("r must be between 0 and 1") # get the range by finding the rth quantile of the negative values neg_values = data[data < 0] if(not neg_values.empty): r_value = neg_values.quantile(self.r) W = (self.M - math.log10(self._T/math.fabs(r_value)))/2 if W <= 0: warn("Channel {0} doesn't have enough negative data. " "Try a log transform instead." .format(self.channel), CytoflowWarning) return 0.5 else: return W else: # ... unless there aren't any negative values, in which case # you probably shouldn't use this transform warn("Channel {0} doesn't have any negative data. " "Try a log transform instead." .format(self.channel), CytoflowWarning) return 0.5 else: return 0.5 # a reasonable default for non-channel scales def _set_W(self, value): self._W = value @cached_property def _get__logicle(self): if self.W is Undefined or self._T is Undefined: return Undefined if self._T <= 0: raise CytoflowError("Logicle range must be > 0") if self.W < 0: raise CytoflowError("Logicle param W must be >= 0") if self.M <= 0: raise CytoflowError("Logicle param M must be > 0") if (2 * self.W > self.M): raise CytoflowError("Logicle param W is too large; it must be " "less than half of param M.") if (-self.A > self.W or self.A + self.W > self.M - self.W): raise CytoflowError("Logicle param A is too large.") return FastLogicle(self._T, self.W, self.M, self.A)
[docs] def get_mpl_params(self, ax): return {"logicle" : self._logicle}
register_scale(LogicleScale)
[docs]class MatplotlibLogicleScale(HasTraits, matplotlib.scale.ScaleBase): """ A class that inherits from `matplotlib.scale.ScaleBase`, which implements all the bits for `matplotlib` to use a new scale. """ name = "logicle" logicle = Instance(FastLogicle) def __init__(self, axis, **kwargs): HasTraits.__init__(self, **kwargs) # @UndefinedVariable
[docs] def get_transform(self): """ Returns the matplotlib.transform instance that does the actual transformation """ if not self.logicle: # this usually happens when someone tries to say # plt.xscale("logicle"). you can, in fact, do that, but # you have to get a parameterized instance of the transform # from utility.scale.scale_factory(). raise CytoflowError("You can't set a 'logicle' scale directly.") return MatplotlibLogicleScale.LogicleTransform(logicle = self.logicle)
[docs] def set_default_locators_and_formatters(self, axis): """ Set the locators and formatters to reasonable defaults for linear scaling. """ axis.set_major_locator(LogicleMajorLocator()) #axis.set_major_formatter(ScalarFormatter()) axis.set_major_formatter(LogFormatterMathtext(10)) axis.set_minor_locator(LogicleMinorLocator()) axis.set_minor_formatter(NullFormatter())
[docs] class LogicleTransform(HasTraits, transforms.Transform): # There are two value members that must be defined. # ``input_dims`` and ``output_dims`` specify number of input # dimensions and output dimensions to the transformation. # These are used by the transformation framework to do some # error checking and prevent incompatible transformations from # being connected together. When defining transforms for a # scale, which are, by definition, separable and have only one # dimension, these members should always be set to 1. input_dims = 1 output_dims = 1 is_separable = True has_inverse = True # the Logicle instance logicle = Instance(FastLogicle) def __init__(self, **kwargs): transforms.Transform.__init__(self) HasTraits.__init__(self, **kwargs) # @UndefinedVariable
[docs] def transform_non_affine(self, values): try: logicle_min = self.logicle.inverse(0.0) logicle_max = self.logicle.inverse(1.0 - sys.float_info.epsilon) if isinstance(values, pd.Series): values = values.clip(logicle_min, logicle_max) return values.apply(self.logicle.scale) elif isinstance(values, np.ndarray): values = np.clip(values, logicle_min, logicle_max) scale = np.vectorize(self.logicle.scale) return scale(values) elif isinstance(values, float): data = max(min(values, logicle_max), logicle_min) return self.logicle.scale(data) elif isinstance(values, int): data = float(data) data = max(min(values, logicle_max), logicle_min) return self.logicle.scale(data) else: raise CytoflowError("Unknown data type in MatplotlibLogicleScale.transform_non_affine") except ValueError as e: raise CytoflowError("Bad transform") from e
[docs] def inverted(self): return MatplotlibLogicleScale.InvertedLogicleTransform(logicle = self.logicle)
[docs] class InvertedLogicleTransform(HasTraits, transforms.Transform): input_dims = 1 output_dims = 1 is_separable = True has_inverse = True # the Logicle instance logicle = Instance(FastLogicle) def __init__(self, **kwargs): transforms.Transform.__init__(self) HasTraits.__init__(self, **kwargs) # @UndefinedVariable
[docs] def transform_non_affine(self, values): try: if isinstance(values, pd.Series): values = values.clip(0, 1.0 - sys.float_info.epsilon) return values.apply(self.logicle.inverse) elif isinstance(values, np.ndarray): values = np.clip(values, 0, 1.0 - sys.float_info.epsilon) inverse = np.vectorize(self.logicle.inverse) return inverse(values) elif isinstance(values, float): values = max(min(values, 1.0 - sys.float_info.epsilon), 0.0) return self.logicle.inverse(values) elif isinstance(values, int): values = float(values) values = max(min(values, 1.0 - sys.float_info.epsilon), 0.0) return self.logicle.inverse(values) else: raise CytoflowError("Unknown data type in LogicleScale.inverse") except ValueError as e: raise CytoflowError("Bad transform") from e
[docs] def inverted(self): return MatplotlibLogicleScale.LogicleTransform(logicle = self.logicle)
[docs]class LogicleMajorLocator(Locator): """ Determine the tick locations for logicle axes. Based on matplotlib.LogLocator """
[docs] def set_params(self, **kwargs): """Empty""" pass
def __call__(self): 'Return the locations of the ticks' vmin, vmax = self.axis.get_view_interval() return self.tick_values(vmin, vmax)
[docs] def tick_values(self, vmin, vmax): 'Every decade, including 0 and negative' vmin, vmax = self.view_limits(vmin, vmax) logicle = self.axis._scale.logicle max_decade = np.ceil(np.log10(vmax * 1.1)) min_positive_decade = np.ceil(np.log10(logicle.T()) - logicle.M()) + 1 if vmin < 0: max_negative_decade = np.floor(np.log10(-1.0 * vmin)) major_ticks = [-1.0 * 10 ** x for x in np.arange(max_negative_decade, 1, -1)] major_ticks.append(0.0) else: max_negative_decade = "N/A" major_ticks = [0.0] if vmin == 0.0 else [] major_ticks.extend([10 ** x for x in np.arange(min_positive_decade, max_decade, 1)]) return self.raise_if_exceeds(np.asarray(major_ticks))
[docs] def view_limits(self, data_min, data_max): 'Try to choose the view limits intelligently' if data_max < data_min: data_min, data_max = data_max, data_min # get the nearest tenth-decade that contains the data if data_max > 0: logs = np.ceil(np.log10(data_max)) vmax = np.ceil(data_max / (10 ** (logs - 1))) * (10 ** (logs - 1)) else: vmax = 100 if data_min >= 0: vmin = 0 else: logs = np.ceil(np.log10(-1.0 * data_min)) vmin = np.floor(data_min / (10 ** (logs - 1))) * (10 ** (logs - 1)) return transforms.nonsingular(vmin, vmax)
[docs]class LogicleMinorLocator(Locator): """ Determine the tick locations for logicle axes. Based on matplotlib.LogLocator """
[docs] def set_params(self): """Empty""" pass
def __call__(self): 'Return the locations of the ticks' vmin, vmax = self.axis.get_view_interval() return self.tick_values(vmin, vmax)
[docs] def tick_values(self, vmin, vmax): 'Every tenth decade, including 0 and negative' vmin, vmax = self.view_limits(vmin, vmax) logicle = self.axis._scale.logicle max_decade = np.ceil(np.log10(vmax * 1.1)) + 1 min_positive_decade = np.ceil(np.log10(logicle.T()) - logicle.M()) + 1 if vmin < 0: max_negative_decade = np.floor(np.log10(-1.0 * vmin)) + 1 major_ticks = [-1.0 * 10 ** x for x in np.arange(max_negative_decade, 1, -1)] major_ticks.append(0.0) else: max_negative_decade = "N/A" major_ticks = [0.0] if vmin == 0.0 else [] major_ticks.extend([10 ** x for x in np.arange(min_positive_decade, max_decade, 1)]) major_tick_pairs = [(major_ticks[x], major_ticks[x+1]) for x in range(len(major_ticks) - 1)] minor_ticks_lol = [np.arange(x, y, max(np.abs([x, y]) / 10)) for x, y in major_tick_pairs] minor_ticks = [item for sublist in minor_ticks_lol for item in sublist] return(minor_ticks)
[docs] def view_limits(self, data_min, data_max): 'Try to choose the view limits intelligently' if data_max < data_min: data_min, data_max = data_max, data_min # get the nearest tenth-decade that contains the data if data_max > 0: logs = np.ceil(np.log10(data_max)) vmax = np.ceil(data_max / (10 ** (logs - 1))) * (10 ** (logs - 1)) else: vmax = 100 if data_min >= 0: vmin = 0 else: logs = np.ceil(np.log10(-1.0 * data_min)) vmin = np.floor(data_min / (10 ** (logs - 1))) * (10 ** (logs - 1)) return transforms.nonsingular(vmin, vmax)
matplotlib.scale.register_scale(MatplotlibLogicleScale)