#!/usr/bin/env python3.4
# coding: latin-1
# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2019
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
cytoflow.operations.import_op
-----------------------------
'''
import warnings, math
from traits.api import (HasTraits, HasStrictTraits, provides, Str, List, Any,
Dict, File, Constant, Enum, Int)
import fcsparser
import numpy as np
from pathlib import Path
import cytoflow.utility as util
from ..experiment import Experiment
from .i_operation import IOperation
# override fcsparser's broken fromfile
import numpy
def _fromfile(file, dtype, count, *args, **kwargs):
dtypes = dtype.split(',')
field_width = []
for dt in dtypes:
num_bytes = int(dt[2:])
field_width.append(num_bytes)
try:
ret = numpy.fromfile(file,
dtype=",".join(['u1'] * sum(field_width)),
count=count,
*args,
**kwargs)
except (TypeError, IOError):
ret = numpy.frombuffer(file.read(count * sum(field_width)),
dtype=",".join(['u1'] * sum(field_width)),
count=count,
*args,
**kwargs)
ret = ret.view('u1').reshape((count, sum(field_width)))
ret_dtypes = []
for field, dt in enumerate(dtypes):
dtype_type = dt[1]
dtype_endian = dt[0]
num_bytes = int(dt[2:])
while num_bytes & (num_bytes - 1) != 0:
ret = np.insert(ret, sum(field_width[0:field]), np.zeros(count), axis = 1)
num_bytes = num_bytes + 1
ret_dtypes.append(dtype_endian + dtype_type + str(num_bytes))
return ret.view(','.join(ret_dtypes)).ravel()
fcsparser.api.fromfile = _fromfile
[docs]class Tube(HasTraits):
"""
Represents a tube or plate well we want to import.
Attributes
----------
file : File
The file name of the FCS file to import
conditions : Dict(Str, Any)
A dictionary containing this tube's experimental conditions. Keys
are condition names, values are condition values.
Examples
--------
>>> tube1 = flow.Tube(file = 'RFP_Well_A3.fcs', conditions = {"Dox" : 10.0})
>>> tube2 = flow.Tube(file='CFP_Well_A4.fcs', conditions = {"Dox" : 1.0})
"""
# the file name for the FCS file to import
file = File
# a dict of experimental conditions: name --> value
conditions = Dict(Str, Any)
[docs] def conditions_equal(self, other):
return other and len(set(self.conditions.items()) ^
set(other.conditions.items())) == 0
def __eq__(self, other):
return other and (self.file == other.file and
self.conditions == other.conditions)
def __hash__(self):
return hash((self.file,
(frozenset(self.conditions.keys()),
frozenset(self.conditions.values()))))
[docs]@provides(IOperation)
class ImportOp(HasStrictTraits):
"""
An operation for importing data and making an :class:`.Experiment`.
To use, set the :attr:`conditions` dict to a mapping between condition name
and NumPy ``dtype``. Useful dtypes include ``category``, ``float``,
``int``, ``bool``.
Next, set :attr:`tubes` to a list of :class:`Tube` containing FCS filenames
and the corresponding conditions.
If you would rather not analyze every single event in every FCS file,
set :attr:`events` to the number of events from each FCS file you want to
load.
Call :meth:`apply` to load the data. The usual ``experiment`` parameter
can be ``None``.
Attributes
----------
conditions : Dict(Str, Str)
A dictionary mapping condition names (keys) to NumPy ``dtype``s (values).
Useful ``dtype``s include ``category``, ``float``, ``int``, and ``bool``.
tubes : List(Tube)
A list of :class:``Tube`` instances, which map FCS files to their corresponding
experimental conditions. Each :class:``Tube`` must have a
:attr:``~Tube.conditions`` dict whose keys match those of
:attr:`conditions`.
channels : Dict(Str, Str)
If you only need a subset of the channels available in the data set,
specify them here. Each ``(key, value)`` pair specifies a channel to
include in the output experiment. The key is the channel name in the
FCS file, and the value is the name of the channel in the Experiment.
You can use this to rename channels as you import data (because flow
channel names are frequently not terribly informative.) New channel
names must be valid Python identifiers: start with a letter or ``_``, and
all characters must be letters, numbers or ``_``. If :attr:`channels` is
empty, load all channels in the FCS files.
events : Int
If not None, import only a random subset of events of size :attr:`events`.
Presumably the analysis will go faster but less precisely; good for
interactive data exploration. Then, unset :attr:`events` and re-run
the analysis non-interactively.
name_metadata : {None, "$PnN", "$PnS"} (default = None)
Which FCS metadata is the channel name? If ``None``, attempt to
autodetect.
data_set : Int (default = 0)
The FCS standard allows you to encode multiple data sets in a single
FCS file. Some software (such as the Beckman-Coulter software)
also encode the same data in two different formats -- for example,
FCS2.0 and FCS3.0. To access a data set other than the first one,
set :attr:`data_set` to the 0-based index of the data set you
would like to use. This will be used for *all FCS files imported by
this operation.*
ignore_v : List(Str)
:class:`cytoflow` is designed to operate on an :class:`.Experiment` containing
tubes that were all collected under the same instrument settings.
In particular, the same PMT voltages ensure that data can be
compared across samples.
*Very rarely*, you may need to set up an :class:`.Experiment` with
different voltage settings on different :class:`Tube`s. This is likely
only to be the case when you are trying to figure out which voltages
should be used in future experiments. If so, set :attr:`ignore_v` to a
:class:`List` of channel names to ignore particular channels.
.. warning::
THIS WILL BREAK REAL EXPERIMENTS
Examples
--------
>>> tube1 = flow.Tube(file = 'RFP_Well_A3.fcs', conditions = {"Dox" : 10.0})
>>> tube2 = flow.Tube(file='CFP_Well_A4.fcs', conditions = {"Dox" : 1.0})
>>> import_op = flow.ImportOp(conditions = {"Dox" : "float"},
... tubes = [tube1, tube2])
>>> ex = import_op.apply()
"""
id = Constant("edu.mit.synbio.cytoflow.operations.import")
friendly_id = Constant("Import")
name = Constant("Import Data")
# experimental conditions: name --> dtype.
conditions = Dict(Str, Str)
# the tubes
tubes = List(Tube)
# which channels do we import?
channels = Dict(Str, Str)
# which FCS metadata has the channel names in it?
name_metadata = Enum(None, "$PnN", "$PnS")
# which data set to get out of the FCS files?
data_set = Int(0)
# are we subsetting?
events = util.CIntOrNone(None)
coarse_events = util.Deprecated(new = 'events')
# DON'T DO THIS
ignore_v = List(Str)
[docs] def apply(self, experiment = None, metadata_only = False):
"""
Load a new :class:`.Experiment`.
Parameters
----------
experiment : Experiment
Ignored
metadata_only : bool (default = False)
Only "import" the metadata, creating an Experiment with all the
expected metadata and structure but 0 events.
Returns
-------
Experiment
The new :class:`.Experiment`. New channels have the following
metadata:
- **voltage** - int
The voltage that this channel was collected at. Determined
by the ``$PnV`` field from the first FCS file.
- **range** - int
The maximum range of this channel. Determined by the ``$PnR``
field from the first FCS file.
New experimental conditions do not have **voltage** or **range**
metadata, obviously. Instead, they have **experiment** set to
``True``, to distinguish the experimental variables from the
conditions that were added by gates, etc.
If :attr:`ignore_v` is set, it is added as a key to the
:class:`.Experiment`-wide metadata.
"""
if not self.tubes or len(self.tubes) == 0:
raise util.CytoflowOpError('tubes',
"Must specify some tubes!")
# if we have channel renaming, make sure the new names are valid
# python identifiers
if self.channels:
for old_name, new_name in self.channels.items():
if old_name != new_name and new_name != util.sanitize_identifier(new_name):
raise util.CytoflowOpError('channels',
"Channel name {} must be a "
"valid Python identifier."
.format(new_name))
# make sure each tube has the same conditions
tube0_conditions = set(self.tubes[0].conditions)
for tube in self.tubes:
tube_conditions = set(tube.conditions)
if len(tube0_conditions ^ tube_conditions) > 0:
raise util.CytoflowOpError('tubes',
"Tube {0} didn't have the same "
"conditions as tube {1}"
.format(tube.file, self.tubes[0].file))
# make sure experimental conditions are unique
for idx, i in enumerate(self.tubes[0:-1]):
for j in self.tubes[idx+1:]:
if i.conditions_equal(j):
raise util.CytoflowOpError('tubes',
"The same conditions specified for "
"tube {0} and tube {1}"
.format(i.file, j.file))
experiment = Experiment()
experiment.metadata["ignore_v"] = self.ignore_v
for condition, dtype in list(self.conditions.items()):
experiment.add_condition(condition, dtype)
experiment.metadata[condition]['experiment'] = True
try:
# silence warnings about duplicate channels;
# we'll figure that out below
with warnings.catch_warnings():
warnings.simplefilter("ignore")
tube0_meta = fcsparser.parse(self.tubes[0].file,
data_set = self.data_set,
meta_data_only = True,
reformat_meta = True)
except Exception as e:
raise util.CytoflowOpError('tubes',
"FCS reader threw an error reading metadata "
"for tube {}: {}"
.format(self.tubes[0].file, str(e))) from e
meta_channels = tube0_meta["_channels_"]
if self.name_metadata:
experiment.metadata["name_metadata"] = self.name_metadata
else:
experiment.metadata["name_metadata"] = autodetect_name_metadata(self.tubes[0].file,
data_set = self.data_set)
meta_channels['Index'] = meta_channels.index
meta_channels.set_index(experiment.metadata["name_metadata"],
inplace = True)
channels = list(self.channels.keys()) if self.channels \
else list(meta_channels.index.values)
# make sure everything in self.channels is in the tube channels
for channel in channels:
if channel not in meta_channels.index:
raise util.CytoflowOpError('channels',
"Channel {0} not in tube {1}"
.format(channel, self.tubes[0].file))
# now that we have the metadata, load it into experiment
for channel in channels:
experiment.add_channel(channel)
experiment.metadata[channel]["fcs_name"] = channel
# keep track of the channel's PMT voltage
if("$PnV" in meta_channels.loc[channel]):
v = meta_channels.loc[channel]['$PnV']
if v: experiment.metadata[channel]["voltage"] = v
# add the maximum possible value for this channel.
data_range = meta_channels.loc[channel]['$PnR']
data_range = float(data_range)
experiment.metadata[channel]['range'] = data_range
experiment.metadata['fcs_metadata'] = {}
for tube in self.tubes:
if metadata_only:
tube_meta, tube_data = parse_tube(tube.file,
experiment,
data_set = self.data_set,
metadata_only = True)
else:
tube_meta, tube_data = parse_tube(tube.file,
experiment,
data_set = self.data_set)
if self.events:
if self.events <= len(tube_data):
tube_data = tube_data.loc[np.random.choice(tube_data.index,
self.events,
replace = False)]
else:
warnings.warn("Only {0} events in tube {1}"
.format(len(tube_data), tube.file),
util.CytoflowWarning)
experiment.add_events(tube_data[channels], tube.conditions)
# extract the row and column from wells collected on a
# BD HTS
if 'WELL ID' in tube_meta:
pos = tube_meta['WELL ID']
tube_meta['CF_Row'] = pos[0]
tube_meta['CF_Col'] = int(pos[1:3])
for i, channel in enumerate(channels):
# remove the PnV tube metadata
if '$P{}V'.format(i+1) in tube_meta:
del tube_meta['$P{}V'.format(i+1)]
# work around a bug where the PnR is sometimes not the detector range
# but the data range.
pnr = '$P{}R'.format(i+1)
if pnr in tube_meta and float(tube_meta[pnr]) > experiment.metadata[channel]['range']:
experiment.metadata[channel]['range'] = float(tube_meta[pnr])
tube_meta['CF_File'] = Path(tube.file).stem
experiment.metadata['fcs_metadata'][tube.file] = tube_meta
# import sys;sys.path.append(r'/home/brian/.p2/pool/plugins/org.python.pydev_6.1.0.201711051306/pysrc')
# import pydevd;pydevd.settrace()
for channel in channels:
if self.channels and channel in self.channels:
new_name = self.channels[channel]
if channel == new_name:
continue
experiment.data.rename(columns = {channel : new_name}, inplace = True)
experiment.metadata[new_name] = experiment.metadata[channel]
experiment.metadata[new_name]["fcs_name"] = channel
del experiment.metadata[channel]
# this catches an odd corner case where some instruments store
# instrument-specific info in the "extra" bits. we have to
# clear them out.
if tube0_meta['$DATATYPE'] == 'I':
data_bits = int(meta_channels.loc[channel]['$PnB'])
data_range = float(meta_channels.loc[channel]['$PnR'])
range_bits = int(math.log(data_range, 2))
if range_bits < data_bits:
mask = 1
for _ in range(1, range_bits):
mask = mask << 1 | 1
experiment.data[channel] = experiment.data[channel].values.astype('int') & mask
# re-scale the data to linear if if's recorded as log-scaled with
# integer channels
data_range = float(meta_channels.loc[channel]['$PnR'])
f1 = float(meta_channels.loc[channel]['$PnE'][0])
f2 = float(meta_channels.loc[channel]['$PnE'][1])
if f1 > 0.0 and f2 == 0.0:
warnings.warn('Invalid $PnE = {},{} for channel {}, changing it to {},1.0'
.format(f1, f2, channel, f1),
util.CytoflowWarning)
f2 = 1.0
if f1 > 0.0 and f2 > 0.0 and tube0_meta['$DATATYPE'] == 'I':
warnings.warn('Converting channel {} from logarithmic to linear'
.format(channel),
util.CytoflowWarning)
# experiment.data[channel] = 10 ** (f1 * experiment.data[channel] / data_range) * f2
return experiment
[docs]def check_tube(filename, experiment, data_set = 0):
if experiment is None:
raise util.CytoflowError("No experiment specified")
ignore_v = experiment.metadata['ignore_v']
try:
tube_meta = fcsparser.parse( filename,
channel_naming = experiment.metadata["name_metadata"],
data_set = data_set,
meta_data_only = True,
reformat_meta = True)
except Exception as e:
raise util.CytoflowError("FCS reader threw an error reading metadata "
"for tube {0}"
.format(filename)) from e
# first make sure the tube has the right channels
if not set([experiment.metadata[c]["fcs_name"] for c in experiment.channels]) <= set(tube_meta["_channel_names_"]):
raise util.CytoflowError("Tube {0} doesn't have the same channels"
.format(filename))
tube_channels = tube_meta["_channels_"]
tube_channels.set_index(experiment.metadata["name_metadata"],
inplace = True)
# next check the per-channel parameters
for channel in experiment.channels:
fcs_name = experiment.metadata[channel]["fcs_name"]
# first check voltage
if "voltage" in experiment.metadata[channel]:
if not "$PnV" in tube_channels.loc[fcs_name]:
raise util.CytoflowError("Didn't find a voltage for channel {0}" \
"in tube {1}".format(channel, filename))
old_v = experiment.metadata[channel]["voltage"]
new_v = tube_channels.loc[fcs_name]['$PnV']
if old_v != new_v and not channel in ignore_v:
raise util.CytoflowError("Tube {0} doesn't have the same voltages"
.format(filename))
# TODO check the delay -- and any other params?
# module-level, so we can reuse it in other modules
[docs]def parse_tube(filename, experiment = None, data_set = 0, metadata_only = False):
if experiment:
check_tube(filename, experiment)
name_metadata = experiment.metadata["name_metadata"]
else:
name_metadata = '$PnS'
try:
if metadata_only:
tube_data = None
with warnings.catch_warnings():
warnings.simplefilter("ignore")
tube_meta = fcsparser.parse(
filename,
meta_data_only = True,
data_set = data_set,
channel_naming = name_metadata)
else:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
tube_meta, tube_data = fcsparser.parse(
filename,
meta_data_only = metadata_only,
data_set = data_set,
channel_naming = name_metadata)
except Exception as e:
raise util.CytoflowError("FCS reader threw an error reading data for tube {}"
.format(filename)) from e
del tube_meta['__header__']
return tube_meta, tube_data