#!/usr/bin/env python3.8
# coding: latin-1
# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2022
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
cytoflow.experiment
-------------------
Defines `Experiment`, `cytoflow`'s main data structure.
`Experiment` -- manages the data and metadata for a flow experiment.
"""
import pandas as pd
from natsort import natsorted
from pandas.api.types import CategoricalDtype, is_categorical_dtype
from traits.api import (HasStrictTraits, Dict, List, Instance, Str, Any,
Property, Tuple)
import cytoflow.utility as util
[docs]class Experiment(HasStrictTraits):
"""
An Experiment manages all the data and metadata for a flow experiment.
An `Experiment` is the central data structure in `cytoflow`: it
wraps a `pandas.DataFrame` containing all the data from a flow
experiment. Each row in the table is an event. Each column is either a
measurement from one of the detectors (or a "derived" measurement such as
a transformed value or a ratio), or a piece of metadata associated with
that event: which tube it came from, what the experimental conditions for
that tube were, gate membership, etc. The `Experiment` object lets
you:
- Add additional metadata to define subpopulations
- Get events that match a particular metadata signature.
Additionally, the `Experiment` object manages channel- and
experiment-level metadata in the `metadata` attribute, which is a
dictionary. This allows the rest of the modules in `cytoflow` to track
and enforce other constraints that are important in doing quantitative
flow cytometry: for example, every tube must be collected with the same
channel parameters (such as PMT voltage.)
.. note::
`Experiment` is not responsible for enforcing the constraints;
`ImportOp` and the other modules are.
Attributes
----------
data : pandas.DataFrame
All the events and metadata represented by this experiment. Each event
is a row; each column is either a measured channel (eg. a fluorescence
measurement), a derived channel (eg. the ratio between two channels),
or a piece of metadata. Metadata can be either experimental conditions
(eg. induction level, timepoint) or added by operations (eg. gate
membership).
metadata : Dict(Str : Dict(Str : Any)
Each column in `data` has an entry in `metadata` whose key
is the column name and whose value is a dict of column-specific
metadata. Metadata is added by operations, and is occasionally useful
if modules are expected to work together. See individual operations'
documentation for a list of the metadata that operation adds. The only
"required" metadata is ``type``, which can be ``channel`` (if the
column is a measured channel, or derived from one) or ``condition``
(if the column is an experimental condition, gate membership, etc.)
.. warning::
There may also be experiment-wide entries in `metadata` that
are *not* columns in `data`!
history : List(IOperation)
The `IOperation` operations that have been applied to the raw
data to result in this `Experiment`.
statistics : Dict((Str, Str) : pandas.Series)
The statistics and parameters computed by models that were fit to the
data. The key is an ``(Str, Str)`` tuple, where the first ``Str`` is
the name of the operation that supplied the statistic, and the second
``Str`` is the name of the statistic. The value is a multi-indexed
`pandas.Series`: each level of the index is a facet, and each
combination of indices is a subset for which the statistic was computed.
The values of the series, of course, are the values of the computed
parameters or statistics for each subset.
channels : List(String)
The channels that this experiment tracks (read-only).
conditions : Dict(String : pandas.Series)
The experimental conditions and analysis groups (gate membership, etc)
that this experiment tracks. The key is the name of the condition, and
the value is a `pandas.Series` with that condition's possible
values.
Notes
-----
The OOP programmer in me desperately wanted to subclass
`pandas.DataFrame`, add some flow-specific stuff, and move on with
my life. (I may still, with something like
https://github.com/dalejung/pandas-composition). A few things get in the
way of directly subclassing `pandas.DataFrame`:
- First, to enable some of the delicious syntactic sugar for accessing
its contents, `pandas.DataFrame` redefines
``__getattribute__`` and ``__setattribute__``, and making it
recognize (and maintain across copies) additional attributes is an
unsupported (non-public) API feature and introduces other
subclassing weirdness.
- Second, many of the operations (like appending!) don't happen in-place;
they return copies instead. It's cleaner to simply manage that copying
ourselves instead of making the client deal with it. We can pretend
to operate on the data in-place.
To maintain the ease of use, we'll override ``__getitem__`` and pass it
to the wrapped `pandas.DataFrame`. We'll do the same with some of
the more useful `pandas.DataFrame` API pieces (like `query`);
and of course, you can just get the data frame itself with
`Experiment.data`.
Examples
--------
>>> import cytoflow as flow
>>> tube1 = flow.Tube(file = 'cytoflow/tests/data/Plate01/RFP_Well_A3.fcs',
... conditions = {"Dox" : 10.0})
>>> tube2 = flow.Tube(file='cytoflow/tests/data/Plate01/CFP_Well_A4.fcs',
... conditions = {"Dox" : 1.0})
>>>
>>> import_op = flow.ImportOp(conditions = {"Dox" : "float"},
... tubes = [tube1, tube2])
>>>
>>> ex = import_op.apply()
>>> ex.data.shape
(20000, 17)
>>> ex.data.groupby(['Dox']).size()
Dox
1 10000
10 10000
dtype: int64
"""
# this doesn't play nice with copy.copy() (used if, say, you copy
# an Experiment with HasTraits.clone_traits()) -- instead, copy
# a reference when clone_traits() is called, then replace it with
# using pandas.DataFrame.copy(deep = False)
data = Instance(pd.DataFrame, args=(), copy = "ref")
# potentially mutable. deep copy required
metadata = Dict(Str, Any, copy = "deep")
# statistics. mutable, deep copy required
statistics = Dict(Tuple(Str, Str), pd.Series, copy = "deep")
history = List(Any, copy = "shallow")
channels = Property(List)
conditions = Property(Dict)
def __getitem__(self, key):
"""Override __getitem__ so we can reference columns like ex.column"""
return self.data.__getitem__(key)
def __setitem__(self, key, value):
"""Override __setitem__ so we can assign columns like ex.column = ..."""
if key in self.data:
self.data.drop(key, axis = 'columns', inplace = True)
return self.data.__setitem__(key, value)
def __len__(self):
"""Return the length of the underlying `pandas.DataFrame`"""
return len(self.data)
def _get_channels(self):
"""Getter for the `channels` property"""
return sorted([x for x in self.data if self.metadata[x]['type'] == "channel"])
def _get_conditions(self):
"""Getter for the `conditions` property"""
return {x : pd.Series(self.data[x].unique().copy()).sort_values() for x in self.data
if self.metadata[x]['type'] == "condition"}
[docs] def subset(self, conditions, values):
"""
Returns a subset of this experiment including only the events where
each condition in ``condition`` equals the corresponding value in
``values``.
Parameters
----------
conditions : Str or List(Str)
A condition or list of conditions
values : Any or Tuple(Any)
The value(s) of the condition(s)
Returns
-------
Experiment
A new `Experiment` containing only the events specified in
``conditions`` and ``values``.
.. note:: This is a wrapper around `pandas.DataFrame.groupby` and
`pandas.core.groupby.GroupBy.get_group`. That means
you can pass other things in `conditions` -- see
the `pandas.DataFrame.groupby` documentation
for details.
"""
conditions = list(conditions)
if isinstance(conditions, str):
c = conditions
v = values
if c not in self.conditions:
raise util.CytoflowError("{} is not a condition".format(c))
if v not in list(self.conditions[c]):
raise util.CytoflowError("{} is not a value of condition {}".format(v, c))
else:
for c, v in zip(conditions, values):
if c not in self.conditions:
raise util.CytoflowError("{} is not a condition".format(c))
if v not in list(self.conditions[c]):
raise util.CytoflowError("{} is not a value of condition {}".format(v, c))
g = self.data.groupby(conditions)
ret = self.clone(deep = False)
ret.data = g.get_group(values)
ret.data.reset_index(drop = True, inplace = True)
return ret
[docs] def query(self, expr, **kwargs):
"""
Return an experiment whose data is a subset of this one where ``expr``
evaluates to ``True``.
This method "sanitizes" column names first, replacing characters that
are not valid in a Python identifier with an underscore ``_``. So, the
column name ``a column`` becomes ``a_column``, and can be queried with
an ``a_column == True`` or such.
Parameters
----------
expr : string
The expression to pass to `pandas.DataFrame.query`. Must be
a valid Python expression, something you could pass to `eval`.
**kwargs : dict
Other named parameters to pass to `pandas.DataFrame.query`.
Returns
-------
Experiment
A new `Experiment`, a clone of this one with the data
returned by `pandas.DataFrame.query`
"""
resolvers = {}
for name, col in self.data.iteritems():
new_name = util.sanitize_identifier(name)
if new_name in resolvers:
raise util.CytoflowError("Tried to sanitize column name {0} to "
"{1} but it already existed in the "
" DataFrame."
.format(name, new_name))
else:
resolvers[new_name] = col
ret = self.clone(deep = False)
ret.data = self.data.query(expr, resolvers = ({}, resolvers), **kwargs)
ret.data.reset_index(drop = True, inplace = True)
if len(ret.data) == 0:
raise util.CytoflowError("No events matched {}".format(expr))
return ret
[docs] def clone(self, deep = True):
"""
Create a copy of this `Experiment`. `metadata`,
`statistics` and `history` are deep copies; whether
or not `data` is a deep copy depends on the value of
the ``deep`` parameter.
.. warning:: The intent is that ``deep`` is set to ``False`` by
operations that are only adding columns to the
underlying `pandas.DataFrame`. This will
improve memory performance. However, the resulting
`Experiment` **CANNOT BE MODIFIED IN-PLACE**,
because doing so will affect the other `Experiment` s
that are clones of the one being modified.
"""
new_exp = self.clone_traits()
new_exp.data = self.data.copy(deep = deep)
return new_exp
[docs] def add_condition(self, name, dtype, data = None):
"""
Add a new column of per-event metadata to this `Experiment`.
.. note::
`add_condition` operates **in place.**
There are two places to call `add_condition`.
- As you're setting up a new `Experiment`, call
`add_condition` with ``data`` set to ``None`` to specify the
conditions the new events will have.
- If you compute some new per-event metadata on an existing
`Experiment`, call `add_condition` to add it.
Parameters
----------
name : String
The name of the new column in `data`. Must be a valid Python
identifier: must start with ``[A-Za-z_]`` and contain only the
characters ``[A-Za-z0-9_]``.
dtype : String
The type of the new column in `data`. Must be a string that
`pandas.Series` recognizes as a ``dtype``: common types are
``category``, ``float``, ``int``, and ``bool``.
data : pandas.Series (default = None)
The `pandas.Series` to add to `data`. Must be the same
length as `data`, and it must be convertable to a
`pandas.Series` of type ``dtype``. If ``None``, will add an
empty column to the `Experiment` ... but the
`Experiment` must be empty to do so!
Raises
------
`.CytoflowError`
If the `pandas.Series` passed in ``data`` isn't the same
length as `data`, or isn't convertable to type ``dtype``.
Examples
--------
>>> import cytoflow as flow
>>> ex = flow.Experiment()
>>> ex.add_condition("Time", "float")
>>> ex.add_condition("Strain", "category")
"""
if name != util.sanitize_identifier(name):
raise util.CytoflowError("Name '{}' is not a valid Python identifier"
.format(name))
if name in self.data:
raise util.CytoflowError("There is already a column named {0} in self.data"
.format(name))
for col in self.data:
if util.sanitize_identifier(name) == util.sanitize_identifier(col):
raise util.CytoflowError("Name '{}' conflicts with channel or condition {} -- choose another name.")
if data is None and len(self) > 0:
raise util.CytoflowError("If data is None, self.data must be empty!")
if data is not None and len(self) != len(data):
raise util.CytoflowError("data must be the same length as self.data")
try:
if data is not None:
self.data[name] = data.astype(dtype, copy = True)
else:
self.data[name] = pd.Series(dtype = dtype)
except (ValueError, TypeError) as exc:
raise util.CytoflowError("Had trouble converting data to type {0}"
.format(dtype)) from exc
self.metadata[name] = {}
self.metadata[name]['type'] = "condition"
self.metadata[name]['dtype'] = self.data[name].dtype.name
if self.data[name].dtype.kind == 'b':
self.metadata[name]['values_type'] = 'boolean'
elif self.data[name].dtype.kind in "ifu":
self.metadata[name]['values_type'] = 'numeric'
elif self.data[name].dtype.kind in "OSU":
self.metadata[name]['values_type'] = 'categorical'
self.metadata[name]['values'] = natsorted(self.data[name].unique())
[docs] def add_channel(self, name, data = None):
"""
Add a new column of per-event data (as opposed to metadata) to this
`Experiment`: ie, something that was measured per cell, or
derived from per-cell measurements.
.. note::
`add_channel` operates *in place*.
Parameters
----------
name : String
The name of the new column to be added to `data`.
data : pandas.Series
The `pandas.Series` to add to `data`. Must be the same
length as `data`, and it must be convertable to a
dtype of ``float64``. If ``None``, will add an empty column to
the `Experiment` ... but the `Experiment` must be
empty to do so!
Raises
------
:exc:`.CytoflowError`
If the `pandas.Series` passed in ``data`` isn't the same length
as `data`, or isn't convertable to a dtype ``float64``.
Examples
--------
>>> ex.add_channel("FSC_over_2", ex.data["FSC-A"] / 2.0)
"""
if name in self.data:
raise util.CytoflowError("Already a column named {0} in self.data"
.format(name))
for col in self.data:
if util.sanitize_identifier(name) == util.sanitize_identifier(col):
raise util.CytoflowError("Name '{}' conflicts with channel or condition {} -- choose another name.")
if data is None and len(self) > 0:
raise util.CytoflowError("If data is None, self.data must be empty!")
if data is not None and len(self) != len(data):
raise util.CytoflowError("data must be the same length as self.data")
try:
if data is not None:
self.data[name] = data.astype("float64", copy = True)
else:
self.data[name] = pd.Series(dtype = "float64")
except (ValueError, TypeError) as exc:
raise util.CytoflowError("Had trouble converting data to type \"float64\"") from exc
self.metadata[name] = {}
self.metadata[name]['type'] = "channel"
[docs] def add_events(self, data, conditions):
"""
Add new events to this `Experiment`. `add_events`
operates **in place**, modifying the `Experiment` object
that it's called on.
Each new event in ``data`` is appended to `data`, and its
per-event metadata columns will be set with the values specified in
``conditions``. Thus, it is particularly useful for adding tubes of
data to new experiments, before additional per-event metadata is added
by gates, etc.
.. note::
*Every* column in `data` must be accounted for. Each column
of type ``channel`` must appear in ``data``; each column of
metadata must have a key:value pair in ``conditions``.
Parameters
----------
tube : pandas.DataFrame
A single tube or well's worth of data. Must be a DataFrame with
the same columns as `channels`
conditions : Dict(Str, Any)
A dictionary of the tube's metadata. The keys must match
`conditions`, and the values must be coercable to the
relevant ``numpy`` dtype.
Raises
------
:exc:`.CytoflowError`
`add_events` pukes if:
* there are columns in ``data`` that aren't channels in the
experiment, or vice versa.
* there are keys in ``conditions`` that aren't conditions in
the experiment, or vice versa.
* there is metadata specified in ``conditions`` that can't be
converted to the corresponding metadata ``dtype``.
Examples
--------
>>> import cytoflow as flow
>>> from fcsparser import fcsparser
>>> ex = flow.Experiment()
>>> ex.add_condition("Time", "float")
>>> ex.add_condition("Strain", "category")
>>> tube1, _ = fcparser.parse('CFP_Well_A4.fcs')
>>> tube2, _ = fcparser.parse('RFP_Well_A3.fcs')
>>> ex.add_events(tube1, {"Time" : 1, "Strain" : "BL21"})
>>> ex.add_events(tube2, {"Time" : 1, "Strain" : "Top10G"})
"""
# make sure the new tube's channels match the rest of the
# channels in the Experiment
if len(self) > 0 and set(data.columns) != set(self.channels):
raise util.CytoflowError("New events don't have the same channels")
# check that the conditions for this tube exist in the experiment
# already
if( any(True for k in conditions if k not in self.conditions) or \
any(True for k in self.conditions if k not in conditions) ):
raise util.CytoflowError("Metadata for this tube should be {}"
.format(list(self.conditions.keys())))
# add the conditions to tube's internal data frame. specify the conditions
# dtype using self.conditions. check for errors as we do so.
# take this chance to up-convert the float32s to float64.
# this happened automatically in DataFrame.append(), below, but
# only in certain cases.... :-/
# TODO - the FCS standard says you can specify the precision.
# check with int/float/double files!
new_data = data.astype("float64", copy=True)
for meta_name, meta_value in conditions.items():
meta_type = self.conditions[meta_name].dtype
if is_categorical_dtype(meta_type):
meta_type = CategoricalDtype([meta_value])
new_data[meta_name] = \
pd.Series(data = [meta_value] * len(new_data),
index = new_data.index,
dtype = meta_type)
# if we're categorical, merge the categories
if is_categorical_dtype(meta_type) and meta_name in self.data:
cats = set(self.data[meta_name].cat.categories) | set(new_data[meta_name].cat.categories)
cats = sorted(cats)
self.data[meta_name] = self.data[meta_name].cat.set_categories(cats)
new_data[meta_name] = new_data[meta_name].cat.set_categories(cats)
self.data = self.data.append(new_data, ignore_index = True, sort = True)
del new_data
# update the metadata 'values'
for condition in self.conditions:
self.metadata[condition]['values'] = natsorted(self.data[condition].unique())
if __name__ == "__main__":
from fcsparser import fcsparser
ex = Experiment()
ex.add_conditions({"time" : "category"})
tube0, _ = fcsparser.parse('../cytoflow/tests/data/tasbe/BEADS-1_H7_H07_P3.fcs')
tube1, _ = fcsparser.parse('../cytoflow/tests/data/tasbe/beads.fcs')
tube2, _ = fcsparser.parse('../cytoflow/tests/data/Plate01/RFP_Well_A3.fcs')
ex.add_tube(tube1, {"time" : "one"})
ex.add_tube(tube2, {"time" : "two"})