Source code for cytoflow.utility.logicle_scale

#!/usr/bin/env python3.8
# coding: latin-1

# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2022
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
cytoflow.utility.logicle_scale
------------------------------
A scale that transforms the data using the ``logicle`` function.
    
`LogicleScale` -- implements `IScale`, the `cytoflow` interface for the scale.

`MatplotlibLogicleScale` -- inherits `matplotlib.scale.ScaleBase`, implements
the matplotlib interface

`LogicleMajorLocator` -- inherits `matplotlib.ticker.Locator`, lets matplotlib know where major
tics are along a plot axis.

`LogicleMinorLocator` -- inherits `matplotlib.ticker.Locator`, lets matplotlib know where minor
tics are along a plot axis

"""

import math, sys
from warnings import warn

from traits.api import (HasStrictTraits, HasTraits, Float, Property, Instance, Str,
                        cached_property, Undefined, provides, Constant,  # @UnresolvedImport
                        List, Array)
                       
import numpy as np
import pandas as pd

import matplotlib.scale
from matplotlib import transforms
from matplotlib.ticker import NullFormatter, LogFormatterMathtext
from matplotlib.ticker import Locator
import matplotlib.colors

from .scale import IScale, register_scale
from .logicle_ext.Logicle import FastLogicle
from .cytoflow_errors import CytoflowError, CytoflowWarning

[docs] @provides(IScale) class LogicleScale(HasStrictTraits): """ A scale that transforms the data using the ``logicle`` function. This scaling method implements a "linear-like" region around 0, and a "log-like" region for large values, with a very smooth transition between them. It's particularly good for compensated data, and data where you have "negative" events (events with a fluorescence of ~0.) If you don't have any data around 0, you might be better of with a more traditional log scale. The transformation has one parameter, `W`, which specifies the width of the "linear" range in log10 decades. By default, the optimal value is estimated from the data; but if you assign a value to `W` it will be used. ``0.5`` is usually a good start. Attributes ---------- experiment : Instance(cytoflow.Experiment) the `Experiment` used to estimate the scale parameters. channel : Str If set, choose scale parameters from this channel in `experiment`. One of `channel`, `condition` or `statistic` must be set. condition : Str If set, choose scale parameters from this condition in `experiment`. One of `channel`, `condition` or `statistic` must be set. statistic : Str If set, choose scale parameters from this statistic in `experiment`. One of `channel`, `condition` or `statistic` must be set. quantiles = Tuple(Float, Float) (default = (0.001, 0.999)) If there are a few very large or very small values, this can throw off matplotlib's choice of default axis ranges. Set ``quantiles`` to choose what part of the data to consider when choosing axis ranges. W : Float (default = estimated from data) The width of the linear range, in log10 decades. can estimate from data, or use a fixed value like 0.5. M : Float (default = 4.5) The width of the log portion of the display, in log10 decades. A : Float (default = 0.0) additional decades of negative data to include. the default display usually captures all the data, so 0 is fine to start. r : Float (default = 0.05) Quantile used to estimate `W`. References ---------- [1] A new "Logicle" display method avoids deceptive effects of logarithmic scaling for low signals and compensated data. Parks DR, Roederer M, Moore WA. Cytometry A. 2006 Jun;69(6):541-51. PMID: 16604519 http://onlinelibrary.wiley.com/doi/10.1002/cyto.a.20258/full [2] Update for the logicle data scale including operational code implementations. Moore WA, Parks DR. Cytometry A. 2012 Apr;81(4):273-7. doi: 10.1002/cyto.a.22030 PMID: 22411901 http://onlinelibrary.wiley.com/doi/10.1002/cyto.a.22030/full """ id = Constant("cytoflow.utility.logicle_scale") name = "logicle" experiment = Instance("cytoflow.Experiment") # what data do we use to compute scale parameters? set one. channel = Str condition = Str statistic = Str features = List(Str) data = Array W = Property(Float, depends_on = "[experiment, channel, M, _T, r]") M = Float(4.5, desc = "the width of the display in log10 decades") A = Float(0.0, desc = "additional decades of negative data to include.") r = Float(0.05, desc = "quantile to use for estimating the W parameter.") _W = Float(Undefined) _T = Property(Float, depends_on = "[experiment, condition, channel]") _logicle = Property(Instance(FastLogicle), depends_on = "[_T, W, M, A]") def __call__(self, data): """ Transforms `data` using this scale. Careful! May return `NaN` if the scale domain doesn't match the data (ie, applying a log10 scale to negative numbers.) """ try: logicle_min = self._logicle.inverse(0.0) logicle_max = self._logicle.inverse(1.0 - sys.float_info.epsilon) if isinstance(data, pd.Series): data = data.clip(logicle_min, logicle_max) return data.apply(self._logicle.scale) elif isinstance(data, np.ndarray): data = np.clip(data, logicle_min, logicle_max) scale = np.vectorize(self._logicle.scale) return scale(data) elif isinstance(data, float): data = max(min(data, logicle_max), logicle_min) return self._logicle.scale(data) elif isinstance(data, int): data = float(data) data = max(min(data, logicle_max), logicle_min) return self._logicle.scale(data) else: try: return list(map(self._logicle.scale, data)) except TypeError as e: raise CytoflowError("Unknown data type") from e except ValueError as e: raise CytoflowError(e.strerror)
[docs] def inverse(self, data): """ Transforms 'data' using the inverse of this scale. """ try: if isinstance(data, pd.Series): data = data.clip(0, 1.0 - sys.float_info.epsilon) return data.apply(self._logicle.inverse) elif isinstance(data, np.ndarray): data = np.clip(data, 0, 1.0 - sys.float_info.epsilon) inverse = np.vectorize(self._logicle.inverse) return inverse(data) elif isinstance(data, float): data = max(min(data, 1.0 - sys.float_info.epsilon), 0.0) return self._logicle.inverse(data) elif isinstance(data, int): data = float(data) data = max(min(data, 1.0 - sys.float_info.epsilon), 0.0) return self._logicle.inverse(data) else: try: return list(map(self._logicle.inverse, data)) except TypeError as e: raise CytoflowError("Unknown data type") from e except ValueError as e: raise CytoflowError(str(e))
[docs] def clip(self, data): """ Clips data to the range of the scale function """ try: logicle_min = self._logicle.inverse(0.0) logicle_max = self._logicle.inverse(1.0 - sys.float_info.epsilon) if isinstance(data, pd.Series): return data.clip(logicle_min, logicle_max) elif isinstance(data, np.ndarray): return np.clip(data, logicle_min, logicle_max) elif isinstance(data, float): return max(min(data, logicle_max), logicle_min) elif isinstance(data, int): data = float(data) return max(min(data, logicle_max), logicle_min) else: try: return [max(min(x, logicle_max), logicle_min) for x in data] except TypeError as e: raise CytoflowError("Unknown data type") from e except ValueError as e: raise CytoflowError(e.strerror)
[docs] def norm(self, vmin = None, vmax = None): """ A factory function that returns `matplotlib.colors.Normalize` instance, which normalizes values for a `matplotlib` color palette. """ # it turns out that Logicle is already defined as a normalization to # [0, 1]. vmin and vmax don't actually do anything here. class LogicleNormalize(matplotlib.colors.Normalize): def __init__(self, scale = None, vmin = None, vmax = None): super().__init__(vmin, vmax) self._scale = scale self.vmin = scale.get_transform().inverted().transform_non_affine(0.0) self.vmax = scale.get_transform().inverted().transform_non_affine(1.0 - sys.float_info.epsilon) def __call__(self, data, clip = None): # it turns out that Logicle is already defined as a # normalization to [0, 1]. ret = self._scale.get_transform().transform_non_affine(data) return np.ma.masked_array(ret) def inverse(self, value): return self._scale.get_transform().inverted().transform_non_affine(value) return LogicleNormalize(scale = MatplotlibLogicleScale(None, logicle = self._logicle), vmin = vmin, vmax = vmax)
@cached_property def _get__T(self): "The range of possible data values" if self.experiment is None: return Undefined if self.channel and self.channel in self.experiment.channels: if "range" in self.experiment.metadata[self.channel]: return float(self.experiment.metadata[self.channel]["range"]) else: return float(self.experiment.data[self.channel].max()) elif self.condition and self.condition in self.experiment.conditions: return float(self.experiment.data[self.condition].max()) elif self.statistic in self.experiment.statistics: stat = self.experiment.statistics[self.statistic][self.features] return float(stat.max().max()) elif self.data.size > 0: return float(self.data.max()) else: return Undefined @cached_property def _get_W(self): if self.experiment is None: return Undefined if self._W is not Undefined: return self._W if self.channel and self.channel in self.experiment.channels: data = self.experiment[self.channel] if self.r <= 0 or self.r >= 1: raise CytoflowError("r must be between 0 and 1") # get the range by finding the rth quantile of the negative values neg_values = data[data < 0] if(not neg_values.empty): r_value = neg_values.quantile(self.r) W = (self.M - math.log10(self._T/math.fabs(r_value)))/2 if W <= 0: warn("Channel {0} doesn't have enough negative data. " "Try a log transform instead." .format(self.channel), CytoflowWarning) return 0.5 elif W > 0.5 * self.M: warn("Channel {0} W is too large -- setting to 0.5 * M. This probably means there's a bug somewhere!", CytoflowWarning) return 0.49 * self.M else: return W else: # ... unless there aren't any negative values, in which case # you probably shouldn't use this transform warn("Channel {0} doesn't have any negative data. " "Try a log transform instead." .format(self.channel), CytoflowWarning) return 0.5 else: return 0.5 # a reasonable default for non-channel scales def _set_W(self, value): self._W = value @cached_property def _get__logicle(self): if self.W is Undefined or self._T is Undefined: return Undefined if self._T <= 0: raise CytoflowError("Logicle range must be > 0") if self.W < 0: raise CytoflowError("Logicle param W must be >= 0") if self.M <= 0: raise CytoflowError("Logicle param M must be > 0") if (2 * self.W > self.M): raise CytoflowError("Logicle param W is too large; it must be " "less than half of param M.") if (-self.A > self.W or self.A + self.W > self.M - self.W): raise CytoflowError("Logicle param A is too large.") return FastLogicle(self._T, self.W, self.M, self.A)
[docs] def get_mpl_params(self, ax): return {"logicle" : self._logicle}
register_scale(LogicleScale)
[docs] class MatplotlibLogicleScale(HasTraits, matplotlib.scale.ScaleBase): """ A class that inherits from `matplotlib.scale.ScaleBase`, which implements all the bits for `matplotlib` to use a new scale. """ name = "logicle" logicle = Instance(FastLogicle) def __init__(self, axis, **kwargs): HasTraits.__init__(self, **kwargs) # @UndefinedVariable
[docs] def get_transform(self): """ Returns the matplotlib.transform instance that does the actual transformation """ if not self.logicle: # this usually happens when someone tries to say # plt.xscale("logicle"). you can, in fact, do that, but # you have to get a parameterized instance of the transform # from utility.scale.scale_factory(). raise CytoflowError("You can't set a 'logicle' scale directly.") return MatplotlibLogicleScale.LogicleTransform(logicle = self.logicle)
[docs] def set_default_locators_and_formatters(self, axis): """ Set the locators and formatters to reasonable defaults for linear scaling. """ axis.set_major_locator(LogicleMajorLocator()) #axis.set_major_formatter(ScalarFormatter()) axis.set_major_formatter(LogFormatterMathtext(10)) axis.set_minor_locator(LogicleMinorLocator()) axis.set_minor_formatter(NullFormatter())
[docs] class LogicleTransform(HasTraits, transforms.Transform): # There are two value members that must be defined. # ``input_dims`` and ``output_dims`` specify number of input # dimensions and output dimensions to the transformation. # These are used by the transformation framework to do some # error checking and prevent incompatible transformations from # being connected together. When defining transforms for a # scale, which are, by definition, separable and have only one # dimension, these members should always be set to 1. input_dims = 1 output_dims = 1 is_separable = True has_inverse = True # the Logicle instance logicle = Instance(FastLogicle) def __init__(self, **kwargs): transforms.Transform.__init__(self) HasTraits.__init__(self, **kwargs) # @UndefinedVariable
[docs] def transform_non_affine(self, values): try: logicle_min = self.logicle.inverse(0.0) logicle_max = self.logicle.inverse(1.0 - sys.float_info.epsilon) if isinstance(values, pd.Series): if values.empty: return values values = values.clip(logicle_min, logicle_max) return values.apply(self.logicle.scale) elif isinstance(values, np.ndarray): if values.size == 0: return values values = np.clip(values, logicle_min, logicle_max) scale = np.vectorize(self.logicle.scale) return scale(values) elif isinstance(values, float): data = max(min(values, logicle_max), logicle_min) return self.logicle.scale(data) elif isinstance(values, int): data = float(data) data = max(min(values, logicle_max), logicle_min) return self.logicle.scale(data) else: raise CytoflowError("Unknown data type in MatplotlibLogicleScale.transform_non_affine") except ValueError as e: raise CytoflowError("Bad transform") from e
[docs] def inverted(self): return MatplotlibLogicleScale.InvertedLogicleTransform(logicle = self.logicle)
[docs] class InvertedLogicleTransform(HasTraits, transforms.Transform): input_dims = 1 output_dims = 1 is_separable = True has_inverse = True # the Logicle instance logicle = Instance(FastLogicle) def __init__(self, **kwargs): transforms.Transform.__init__(self) HasTraits.__init__(self, **kwargs) # @UndefinedVariable
[docs] def transform_non_affine(self, values): try: if isinstance(values, pd.Series): if values.empty(): return values values = values.clip(0, 1.0 - sys.float_info.epsilon) return values.apply(self.logicle.inverse) elif isinstance(values, np.ndarray): if values.size == 0: return values values = np.clip(values, 0, 1.0 - sys.float_info.epsilon) inverse = np.vectorize(self.logicle.inverse) return inverse(values) elif isinstance(values, float): values = max(min(values, 1.0 - sys.float_info.epsilon), 0.0) return self.logicle.inverse(values) elif isinstance(values, int): values = float(values) values = max(min(values, 1.0 - sys.float_info.epsilon), 0.0) return self.logicle.inverse(values) else: raise CytoflowError("Unknown data type in LogicleScale.inverse") except ValueError as e: raise CytoflowError("Bad transform") from e
[docs] def inverted(self): return MatplotlibLogicleScale.LogicleTransform(logicle = self.logicle)
[docs] class LogicleMajorLocator(Locator): """ Determine the tick locations for logicle axes. Based on matplotlib.LogLocator """
[docs] def set_params(self, **kwargs): """Empty""" pass
def __call__(self): 'Return the locations of the ticks' vmin, vmax = self.axis.get_view_interval() return self.tick_values(vmin, vmax)
[docs] def tick_values(self, vmin, vmax): 'Every decade, including 0 and negative' vmin, vmax = self.view_limits(vmin, vmax) logicle = self.axis._scale.logicle max_decade = np.ceil(np.log10(vmax * 1.1)) min_positive_decade = np.ceil(np.log10(logicle.T()) - logicle.M()) + 1 if vmin < 0: max_negative_decade = np.floor(np.log10(-1.0 * vmin)) major_ticks = [-1.0 * 10 ** x for x in np.arange(max_negative_decade, 1, -1)] major_ticks.append(0.0) else: max_negative_decade = "N/A" major_ticks = [0.0] if vmin == 0.0 else [] major_ticks.extend([10 ** x for x in np.arange(min_positive_decade, max_decade, 1)]) return self.raise_if_exceeds(np.asarray(major_ticks))
[docs] def view_limits(self, data_min, data_max): 'Try to choose the view limits intelligently' if data_max < data_min: data_min, data_max = data_max, data_min # get the nearest tenth-decade that contains the data if data_max > 0: logs = np.ceil(np.log10(data_max)) vmax = np.ceil(data_max / (10 ** (logs - 1))) * (10 ** (logs - 1)) else: vmax = 100 if data_min >= 0: vmin = 0 else: logs = np.ceil(np.log10(-1.0 * data_min)) vmin = np.floor(data_min / (10 ** (logs - 1))) * (10 ** (logs - 1)) return transforms.nonsingular(vmin, vmax)
[docs] class LogicleMinorLocator(Locator): """ Determine the tick locations for logicle axes. Based on matplotlib.LogLocator """
[docs] def set_params(self): """Empty""" pass
def __call__(self): 'Return the locations of the ticks' vmin, vmax = self.axis.get_view_interval() return self.tick_values(vmin, vmax)
[docs] def tick_values(self, vmin, vmax): 'Every tenth decade, including 0 and negative' vmin, vmax = self.view_limits(vmin, vmax) logicle = self.axis._scale.logicle max_decade = np.ceil(np.log10(vmax * 1.1)) + 1 min_positive_decade = np.ceil(np.log10(logicle.T()) - logicle.M()) + 1 if vmin < 0: max_negative_decade = np.floor(np.log10(-1.0 * vmin)) + 1 major_ticks = [-1.0 * 10 ** x for x in np.arange(max_negative_decade, 1, -1)] major_ticks.append(0.0) else: max_negative_decade = "N/A" major_ticks = [0.0] if vmin == 0.0 else [] major_ticks.extend([10 ** x for x in np.arange(min_positive_decade, max_decade, 1)]) major_tick_pairs = [(major_ticks[x], major_ticks[x+1]) for x in range(len(major_ticks) - 1)] minor_ticks_lol = [np.arange(x, y, max(np.abs([x, y]) / 10)) for x, y in major_tick_pairs] minor_ticks = [item for sublist in minor_ticks_lol for item in sublist] return(minor_ticks)
[docs] def view_limits(self, data_min, data_max): 'Try to choose the view limits intelligently' if data_max < data_min: data_min, data_max = data_max, data_min # get the nearest tenth-decade that contains the data if data_max > 0: logs = np.ceil(np.log10(data_max)) vmax = np.ceil(data_max / (10 ** (logs - 1))) * (10 ** (logs - 1)) else: vmax = 100 if data_min >= 0: vmin = 0 else: logs = np.ceil(np.log10(-1.0 * data_min)) vmin = np.floor(data_min / (10 ** (logs - 1))) * (10 ** (logs - 1)) return transforms.nonsingular(vmin, vmax)
matplotlib.scale.register_scale(MatplotlibLogicleScale)