#!/usr/bin/env python3.8
# coding: latin-1
# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2022
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
cytoflow.utility.logicle_scale
------------------------------
A scale that transforms the data using the ``logicle`` function.
`LogicleScale` -- implements `IScale`, the `cytoflow` interface for the scale.
`MatplotlibLogicleScale` -- inherits `matplotlib.scale.ScaleBase`, implements
the matplotlib interface
`LogicleMajorLocator` -- inherits `matplotlib.ticker.Locator`, lets matplotlib know where major
tics are along a plot axis.
`LogicleMinorLocator` -- inherits `matplotlib.ticker.Locator`, lets matplotlib know where minor
tics are along a plot axis
"""
import math, sys
from warnings import warn
from traits.api import (HasStrictTraits, HasTraits, Float, Property, Instance, Str,
cached_property, Undefined, provides, Constant, # @UnresolvedImport
List, Array)
import numpy as np
import pandas as pd
import matplotlib.scale
from matplotlib import transforms
from matplotlib.ticker import NullFormatter, LogFormatterMathtext
from matplotlib.ticker import Locator
import matplotlib.colors
from .scale import IScale, register_scale
from .logicle_ext.Logicle import FastLogicle
from .cytoflow_errors import CytoflowError, CytoflowWarning
[docs]
@provides(IScale)
class LogicleScale(HasStrictTraits):
"""
A scale that transforms the data using the ``logicle`` function.
This scaling method implements a "linear-like" region around 0, and a
"log-like" region for large values, with a very smooth transition between
them. It's particularly good for compensated data, and data where you have
"negative" events (events with a fluorescence of ~0.)
If you don't have any data around 0, you might be better of with a more
traditional log scale.
The transformation has one parameter, `W`, which specifies the width of
the "linear" range in log10 decades. By default, the optimal value is
estimated from the data; but if you assign a value to `W` it will be used.
``0.5`` is usually a good start.
Attributes
----------
experiment : Instance(cytoflow.Experiment)
the `Experiment` used to estimate the scale parameters.
channel : Str
If set, choose scale parameters from this channel in `experiment`.
One of `channel`, `condition` or `statistic` must be set.
condition : Str
If set, choose scale parameters from this condition in `experiment`.
One of `channel`, `condition` or `statistic` must be set.
statistic : Str
If set, choose scale parameters from this statistic in `experiment`.
One of `channel`, `condition` or `statistic` must be set.
quantiles = Tuple(Float, Float) (default = (0.001, 0.999))
If there are a few very large or very small values, this can throw off
matplotlib's choice of default axis ranges. Set ``quantiles`` to choose
what part of the data to consider when choosing axis ranges.
W : Float (default = estimated from data)
The width of the linear range, in log10 decades. can estimate from data,
or use a fixed value like 0.5.
M : Float (default = 4.5)
The width of the log portion of the display, in log10 decades.
A : Float (default = 0.0)
additional decades of negative data to include. the default display
usually captures all the data, so 0 is fine to start.
r : Float (default = 0.05)
Quantile used to estimate `W`.
References
----------
[1] A new "Logicle" display method avoids deceptive effects of logarithmic
scaling for low signals and compensated data.
Parks DR, Roederer M, Moore WA.
Cytometry A. 2006 Jun;69(6):541-51.
PMID: 16604519
http://onlinelibrary.wiley.com/doi/10.1002/cyto.a.20258/full
[2] Update for the logicle data scale including operational code
implementations.
Moore WA, Parks DR.
Cytometry A. 2012 Apr;81(4):273-7.
doi: 10.1002/cyto.a.22030
PMID: 22411901
http://onlinelibrary.wiley.com/doi/10.1002/cyto.a.22030/full
"""
id = Constant("cytoflow.utility.logicle_scale")
name = "logicle"
experiment = Instance("cytoflow.Experiment")
# what data do we use to compute scale parameters? set one.
channel = Str
condition = Str
statistic = Str
features = List(Str)
data = Array
W = Property(Float, depends_on = "[experiment, channel, M, _T, r]")
M = Float(4.5, desc = "the width of the display in log10 decades")
A = Float(0.0, desc = "additional decades of negative data to include.")
r = Float(0.05, desc = "quantile to use for estimating the W parameter.")
_W = Float(Undefined)
_T = Property(Float, depends_on = "[experiment, condition, channel]")
_logicle = Property(Instance(FastLogicle), depends_on = "[_T, W, M, A]")
def __call__(self, data):
"""
Transforms `data` using this scale.
Careful! May return `NaN` if the scale domain doesn't match the data
(ie, applying a log10 scale to negative numbers.)
"""
try:
logicle_min = self._logicle.inverse(0.0)
logicle_max = self._logicle.inverse(1.0 - sys.float_info.epsilon)
if isinstance(data, pd.Series):
data = data.clip(logicle_min, logicle_max)
return data.apply(self._logicle.scale)
elif isinstance(data, np.ndarray):
data = np.clip(data, logicle_min, logicle_max)
scale = np.vectorize(self._logicle.scale)
return scale(data)
elif isinstance(data, float):
data = max(min(data, logicle_max), logicle_min)
return self._logicle.scale(data)
elif isinstance(data, int):
data = float(data)
data = max(min(data, logicle_max), logicle_min)
return self._logicle.scale(data)
else:
try:
return list(map(self._logicle.scale, data))
except TypeError as e:
raise CytoflowError("Unknown data type") from e
except ValueError as e:
raise CytoflowError(e.strerror)
[docs]
def inverse(self, data):
"""
Transforms 'data' using the inverse of this scale.
"""
try:
if isinstance(data, pd.Series):
data = data.clip(0, 1.0 - sys.float_info.epsilon)
return data.apply(self._logicle.inverse)
elif isinstance(data, np.ndarray):
data = np.clip(data, 0, 1.0 - sys.float_info.epsilon)
inverse = np.vectorize(self._logicle.inverse)
return inverse(data)
elif isinstance(data, float):
data = max(min(data, 1.0 - sys.float_info.epsilon), 0.0)
return self._logicle.inverse(data)
elif isinstance(data, int):
data = float(data)
data = max(min(data, 1.0 - sys.float_info.epsilon), 0.0)
return self._logicle.inverse(data)
else:
try:
return list(map(self._logicle.inverse, data))
except TypeError as e:
raise CytoflowError("Unknown data type") from e
except ValueError as e:
raise CytoflowError(str(e))
[docs]
def clip(self, data):
"""
Clips data to the range of the scale function
"""
try:
logicle_min = self._logicle.inverse(0.0)
logicle_max = self._logicle.inverse(1.0 - sys.float_info.epsilon)
if isinstance(data, pd.Series):
return data.clip(logicle_min, logicle_max)
elif isinstance(data, np.ndarray):
return np.clip(data, logicle_min, logicle_max)
elif isinstance(data, float):
return max(min(data, logicle_max), logicle_min)
elif isinstance(data, int):
data = float(data)
return max(min(data, logicle_max), logicle_min)
else:
try:
return [max(min(x, logicle_max), logicle_min) for x in data]
except TypeError as e:
raise CytoflowError("Unknown data type") from e
except ValueError as e:
raise CytoflowError(e.strerror)
[docs]
def norm(self, vmin = None, vmax = None):
"""
A factory function that returns `matplotlib.colors.Normalize` instance,
which normalizes values for a `matplotlib` color palette.
"""
# it turns out that Logicle is already defined as a normalization to
# [0, 1]. vmin and vmax don't actually do anything here.
class LogicleNormalize(matplotlib.colors.Normalize):
def __init__(self, scale = None, vmin = None, vmax = None):
super().__init__(vmin, vmax)
self._scale = scale
self.vmin = scale.get_transform().inverted().transform_non_affine(0.0)
self.vmax = scale.get_transform().inverted().transform_non_affine(1.0 - sys.float_info.epsilon)
def __call__(self, data, clip = None):
# it turns out that Logicle is already defined as a
# normalization to [0, 1].
ret = self._scale.get_transform().transform_non_affine(data)
return np.ma.masked_array(ret)
def inverse(self, value):
return self._scale.get_transform().inverted().transform_non_affine(value)
return LogicleNormalize(scale = MatplotlibLogicleScale(None, logicle = self._logicle), vmin = vmin, vmax = vmax)
@cached_property
def _get__T(self):
"The range of possible data values"
if self.experiment is None:
return Undefined
if self.channel and self.channel in self.experiment.channels:
if "range" in self.experiment.metadata[self.channel]:
return float(self.experiment.metadata[self.channel]["range"])
else:
return float(self.experiment.data[self.channel].max())
elif self.condition and self.condition in self.experiment.conditions:
return float(self.experiment.data[self.condition].max())
elif self.statistic in self.experiment.statistics:
stat = self.experiment.statistics[self.statistic][self.features]
return float(stat.max().max())
elif self.data.size > 0:
return float(self.data.max())
else:
return Undefined
@cached_property
def _get_W(self):
if self.experiment is None:
return Undefined
if self._W is not Undefined:
return self._W
if self.channel and self.channel in self.experiment.channels:
data = self.experiment[self.channel]
if self.r <= 0 or self.r >= 1:
raise CytoflowError("r must be between 0 and 1")
# get the range by finding the rth quantile of the negative values
neg_values = data[data < 0]
if(not neg_values.empty):
r_value = neg_values.quantile(self.r)
W = (self.M - math.log10(self._T/math.fabs(r_value)))/2
if W <= 0:
warn("Channel {0} doesn't have enough negative data. "
"Try a log transform instead."
.format(self.channel),
CytoflowWarning)
return 0.5
elif W > 0.5 * self.M:
warn("Channel {0} W is too large -- setting to 0.5 * M. This probably means there's a bug somewhere!",
CytoflowWarning)
return 0.49 * self.M
else:
return W
else:
# ... unless there aren't any negative values, in which case
# you probably shouldn't use this transform
warn("Channel {0} doesn't have any negative data. "
"Try a log transform instead."
.format(self.channel),
CytoflowWarning)
return 0.5
else:
return 0.5 # a reasonable default for non-channel scales
def _set_W(self, value):
self._W = value
@cached_property
def _get__logicle(self):
if self.W is Undefined or self._T is Undefined:
return Undefined
if self._T <= 0:
raise CytoflowError("Logicle range must be > 0")
if self.W < 0:
raise CytoflowError("Logicle param W must be >= 0")
if self.M <= 0:
raise CytoflowError("Logicle param M must be > 0")
if (2 * self.W > self.M):
raise CytoflowError("Logicle param W is too large; it must be "
"less than half of param M.")
if (-self.A > self.W or self.A + self.W > self.M - self.W):
raise CytoflowError("Logicle param A is too large.")
return FastLogicle(self._T, self.W, self.M, self.A)
[docs]
def get_mpl_params(self, ax):
return {"logicle" : self._logicle}
register_scale(LogicleScale)
[docs]
class MatplotlibLogicleScale(HasTraits, matplotlib.scale.ScaleBase):
"""
A class that inherits from `matplotlib.scale.ScaleBase`, which
implements all the bits for `matplotlib` to use a new scale.
"""
name = "logicle"
logicle = Instance(FastLogicle)
def __init__(self, axis, **kwargs):
HasTraits.__init__(self, **kwargs) # @UndefinedVariable
[docs]
def get_transform(self):
"""
Returns the matplotlib.transform instance that does the actual
transformation
"""
if not self.logicle:
# this usually happens when someone tries to say
# plt.xscale("logicle"). you can, in fact, do that, but
# you have to get a parameterized instance of the transform
# from utility.scale.scale_factory().
raise CytoflowError("You can't set a 'logicle' scale directly.")
return MatplotlibLogicleScale.LogicleTransform(logicle = self.logicle)
[docs]
def set_default_locators_and_formatters(self, axis):
"""
Set the locators and formatters to reasonable defaults for
linear scaling.
"""
axis.set_major_locator(LogicleMajorLocator())
#axis.set_major_formatter(ScalarFormatter())
axis.set_major_formatter(LogFormatterMathtext(10))
axis.set_minor_locator(LogicleMinorLocator())
axis.set_minor_formatter(NullFormatter())
[docs]
class LogicleTransform(HasTraits, transforms.Transform):
# There are two value members that must be defined.
# ``input_dims`` and ``output_dims`` specify number of input
# dimensions and output dimensions to the transformation.
# These are used by the transformation framework to do some
# error checking and prevent incompatible transformations from
# being connected together. When defining transforms for a
# scale, which are, by definition, separable and have only one
# dimension, these members should always be set to 1.
input_dims = 1
output_dims = 1
is_separable = True
has_inverse = True
# the Logicle instance
logicle = Instance(FastLogicle)
def __init__(self, **kwargs):
transforms.Transform.__init__(self)
HasTraits.__init__(self, **kwargs) # @UndefinedVariable
[docs]
def transform_non_affine(self, values):
try:
logicle_min = self.logicle.inverse(0.0)
logicle_max = self.logicle.inverse(1.0 - sys.float_info.epsilon)
if isinstance(values, pd.Series):
if values.empty:
return values
values = values.clip(logicle_min, logicle_max)
return values.apply(self.logicle.scale)
elif isinstance(values, np.ndarray):
if values.size == 0:
return values
values = np.clip(values, logicle_min, logicle_max)
scale = np.vectorize(self.logicle.scale)
return scale(values)
elif isinstance(values, float):
data = max(min(values, logicle_max), logicle_min)
return self.logicle.scale(data)
elif isinstance(values, int):
data = float(data)
data = max(min(values, logicle_max), logicle_min)
return self.logicle.scale(data)
else:
raise CytoflowError("Unknown data type in MatplotlibLogicleScale.transform_non_affine")
except ValueError as e:
raise CytoflowError("Bad transform") from e
[docs]
def inverted(self):
return MatplotlibLogicleScale.InvertedLogicleTransform(logicle = self.logicle)
[docs]
class InvertedLogicleTransform(HasTraits, transforms.Transform):
input_dims = 1
output_dims = 1
is_separable = True
has_inverse = True
# the Logicle instance
logicle = Instance(FastLogicle)
def __init__(self, **kwargs):
transforms.Transform.__init__(self)
HasTraits.__init__(self, **kwargs) # @UndefinedVariable
[docs]
def transform_non_affine(self, values):
try:
if isinstance(values, pd.Series):
if values.empty():
return values
values = values.clip(0, 1.0 - sys.float_info.epsilon)
return values.apply(self.logicle.inverse)
elif isinstance(values, np.ndarray):
if values.size == 0:
return values
values = np.clip(values, 0, 1.0 - sys.float_info.epsilon)
inverse = np.vectorize(self.logicle.inverse)
return inverse(values)
elif isinstance(values, float):
values = max(min(values, 1.0 - sys.float_info.epsilon), 0.0)
return self.logicle.inverse(values)
elif isinstance(values, int):
values = float(values)
values = max(min(values, 1.0 - sys.float_info.epsilon), 0.0)
return self.logicle.inverse(values)
else:
raise CytoflowError("Unknown data type in LogicleScale.inverse")
except ValueError as e:
raise CytoflowError("Bad transform") from e
[docs]
def inverted(self):
return MatplotlibLogicleScale.LogicleTransform(logicle = self.logicle)
[docs]
class LogicleMajorLocator(Locator):
"""
Determine the tick locations for logicle axes.
Based on matplotlib.LogLocator
"""
[docs]
def set_params(self, **kwargs):
"""Empty"""
pass
def __call__(self):
'Return the locations of the ticks'
vmin, vmax = self.axis.get_view_interval()
return self.tick_values(vmin, vmax)
[docs]
def tick_values(self, vmin, vmax):
'Every decade, including 0 and negative'
vmin, vmax = self.view_limits(vmin, vmax)
logicle = self.axis._scale.logicle
max_decade = np.ceil(np.log10(vmax * 1.1))
min_positive_decade = np.ceil(np.log10(logicle.T()) - logicle.M()) + 1
if vmin < 0:
max_negative_decade = np.floor(np.log10(-1.0 * vmin))
major_ticks = [-1.0 * 10 ** x for x in np.arange(max_negative_decade, 1, -1)]
major_ticks.append(0.0)
else:
max_negative_decade = "N/A"
major_ticks = [0.0] if vmin == 0.0 else []
major_ticks.extend([10 ** x for x in np.arange(min_positive_decade, max_decade, 1)])
return self.raise_if_exceeds(np.asarray(major_ticks))
[docs]
def view_limits(self, data_min, data_max):
'Try to choose the view limits intelligently'
if data_max < data_min:
data_min, data_max = data_max, data_min
# get the nearest tenth-decade that contains the data
if data_max > 0:
logs = np.ceil(np.log10(data_max))
vmax = np.ceil(data_max / (10 ** (logs - 1))) * (10 ** (logs - 1))
else:
vmax = 100
if data_min >= 0:
vmin = 0
else:
logs = np.ceil(np.log10(-1.0 * data_min))
vmin = np.floor(data_min / (10 ** (logs - 1))) * (10 ** (logs - 1))
return transforms.nonsingular(vmin, vmax)
[docs]
class LogicleMinorLocator(Locator):
"""
Determine the tick locations for logicle axes.
Based on matplotlib.LogLocator
"""
[docs]
def set_params(self):
"""Empty"""
pass
def __call__(self):
'Return the locations of the ticks'
vmin, vmax = self.axis.get_view_interval()
return self.tick_values(vmin, vmax)
[docs]
def tick_values(self, vmin, vmax):
'Every tenth decade, including 0 and negative'
vmin, vmax = self.view_limits(vmin, vmax)
logicle = self.axis._scale.logicle
max_decade = np.ceil(np.log10(vmax * 1.1)) + 1
min_positive_decade = np.ceil(np.log10(logicle.T()) - logicle.M()) + 1
if vmin < 0:
max_negative_decade = np.floor(np.log10(-1.0 * vmin)) + 1
major_ticks = [-1.0 * 10 ** x for x in np.arange(max_negative_decade, 1, -1)]
major_ticks.append(0.0)
else:
max_negative_decade = "N/A"
major_ticks = [0.0] if vmin == 0.0 else []
major_ticks.extend([10 ** x for x in np.arange(min_positive_decade, max_decade, 1)])
major_tick_pairs = [(major_ticks[x], major_ticks[x+1]) for x in range(len(major_ticks) - 1)]
minor_ticks_lol = [np.arange(x, y, max(np.abs([x, y]) / 10)) for x, y in major_tick_pairs]
minor_ticks = [item for sublist in minor_ticks_lol for item in sublist]
return(minor_ticks)
[docs]
def view_limits(self, data_min, data_max):
'Try to choose the view limits intelligently'
if data_max < data_min:
data_min, data_max = data_max, data_min
# get the nearest tenth-decade that contains the data
if data_max > 0:
logs = np.ceil(np.log10(data_max))
vmax = np.ceil(data_max / (10 ** (logs - 1))) * (10 ** (logs - 1))
else:
vmax = 100
if data_min >= 0:
vmin = 0
else:
logs = np.ceil(np.log10(-1.0 * data_min))
vmin = np.floor(data_min / (10 ** (logs - 1))) * (10 ** (logs - 1))
return transforms.nonsingular(vmin, vmax)
matplotlib.scale.register_scale(MatplotlibLogicleScale)