#!/usr/bin/env python3.8
# coding: latin-1
# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2022
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
cytoflow.views.export_fcs
-------------------------
A "view" that exports events as FCS files.
`ExportFCS` -- the `IView` class that does the exporting.
"""
import re
from pathlib import Path
from copy import copy
from traits.api import (Constant, List, Str, Bool, Dict, Directory,
HasStrictTraits)
import cytoflow.utility as util
[docs]class ExportFCS(HasStrictTraits):
"""
Exports events as FCS files.
This isn't a traditional view, in that it doesn't implement `plot`.
Instead, use `enum_files` to figure out which files will be created
from a particular experiment, and `export` to export the FCS files.
The Cytoflow attributes will be encoded in keywords in the FCS TEXT
segment, starting with the characters ``CF_``. Any FCS keywords that
are the same across all the input files will also be included.
Attributes
----------
base : Str
The prefix of the FCS filenames
path : Directory
The directory to export to.
by : List(Str)
A list of conditions from `Experiment.conditions`; each unique
combination of conditions will be exported to an FCS file.
keywords : Dict(Str, Str)
If you want to add more keywords to the FCS files' TEXT segment,
specify them here.
subset : str
A Python expression used to select a subset of the data
Examples
--------
Make a little data set.
>>> import cytoflow as flow
>>> import_op = flow.ImportOp()
>>> import_op.tubes = [flow.Tube(file = "Plate01/RFP_Well_A3.fcs",
... conditions = {'Dox' : 10.0}),
... flow.Tube(file = "Plate01/CFP_Well_A4.fcs",
... conditions = {'Dox' : 1.0})]
>>> import_op.conditions = {'Dox' : 'float'}
>>> ex = import_op.apply()
Export the data
>>> import tempfile
>>> flow.ExportFCS(path = 'export/',
... by = ["Dox"],
... subset = "Dox == 10.0").export(ex)
"""
# traits
id = Constant("edu.mit.synbio.cytoflow.view.exportfcs")
friendly_id = Constant("Table View")
base = Str
path = Directory(exists = True)
by = List(Str)
keywords = Dict(Str, Str)
subset = Str
_include_by = Bool(True)
[docs] def enum_files(self, experiment):
"""
Return an iterator over the file names that this export module will
produce from a given experiment.
Parameters
----------
experiment : Experiment
The `Experiment` to export
"""
if experiment is None:
raise util.CytoflowViewError('experiment', "No experiment specified")
if len(self.by) == 0:
raise util.CytoflowViewError('by',
"You must specify some variables in `by`")
for b in self.by:
if b not in experiment.conditions:
raise util.CytoflowOpError('by',
"Aggregation metadata {} not found, "
"must be one of {}"
.format(b, experiment.conditions))
if self.subset:
try:
experiment = experiment.query(self.subset)
except util.CytoflowError as e:
raise util.CytoflowViewError('subset', str(e)) from e
except Exception as e:
raise util.CytoflowViewError('subset',
"Subset string '{0}' isn't valid"
.format(self.subset)) from e
if len(experiment) == 0:
raise util.CytoflowViewError('subset',
"Subset string '{0}' returned no events"
.format(self.subset))
class file_enum(object):
def __init__(self, by, base, _include_by, experiment):
self._iter = None
self._returned = False
self.by = by
self.base = base
self._include_by = _include_by
if by:
self._iter = experiment.data.groupby(by).__iter__()
def __iter__(self):
return self
def __next__(self):
if self._iter:
values = next(self._iter)[0]
if len(self.by) == 1:
values = [values]
parts = []
for i, name in enumerate(self.by):
if self._include_by:
parts.append(name + '_' + str(values[i]))
else:
parts.append(str(values[i]))
if self.base:
return self.base + '_' + '_'.join(parts) + '.fcs'
else:
return '_'.join(parts) + '.fcs'
else:
if self._returned:
raise StopIteration
else:
self._returned = True
return None
return file_enum(self.by, self.base, self._include_by, experiment)
[docs] def export(self, experiment):
"""
Export FCS files from an experiment.
Parameters
----------
experiment : Experiment
The `Experiment` to export
"""
if experiment is None:
raise util.CytoflowViewError('experiment', "No experiment specified")
if len(experiment) == 0:
raise util.CytoflowViewError('experiment', "No events in experiment")
if not self.path:
raise util.CytoflowViewError('path',
'Must specify an output directory')
d = Path(self.path)
if not d.is_dir():
raise util.CytoflowViewError('path',
'Output directory {} must exist')
# also tests for good experiment, self.by
for filename in self.enum_files(experiment):
p = d / filename
if p.is_file():
raise util.CytoflowViewError('path',
'File {} already exists'
.format(p))
if self.subset:
try:
experiment = experiment.query(self.subset)
except util.CytoflowError as e:
raise util.CytoflowViewError('subset', str(e)) from e
except Exception as e:
raise util.CytoflowViewError('subset',
"Subset string '{0}' isn't valid"
.format(self.subset)) from e
if len(experiment) == 0:
raise util.CytoflowViewError('subset',
"Subset string '{0}' returned no events"
.format(self.subset))
tube0, common_metadata = list(experiment.metadata['fcs_metadata'].items())[0]
common_metadata = copy(common_metadata)
exclude_keywords = ['$BEGINSTEXT', '$ENDSTEXT', '$BEGINANALYSIS',
'$ENDANALYSIS', '$BEGINDATA', '$ENDDATA',
'$BYTEORD', '$DATATYPE', '$MODE', '$NEXTDATA',
'$TOT', '$PAR']
common_metadata = {str(k) : str(v) for k, v in common_metadata.items()
if re.search('^\$P\d+[BENRDSG]$', k) is None
and k not in exclude_keywords}
for filename, metadata in experiment.metadata['fcs_metadata'].items():
if filename == tube0:
continue
for name, value in metadata.items():
if name not in common_metadata:
continue
if name not in common_metadata or value != common_metadata[name]:
del common_metadata[name]
for i, channel in enumerate(experiment.channels):
if 'voltage' in experiment.metadata[channel]:
common_metadata['$P{}V'.format(i + 1)] = experiment.metadata[channel]['voltage']
for group, data_subset in experiment.data.groupby(self.by):
data_subset = data_subset[experiment.channels]
if len(self.by) == 1:
group = [group]
parts = []
kws = copy(self.keywords)
kws.update(common_metadata)
kws = {k : str(v) for k, v in kws.items()}
for i, name in enumerate(self.by):
if self._include_by:
parts.append(name + '_' + str(group[i]))
else:
parts.append(str(group[i]))
kws["CF_" + name] = str(group[i])
if self.base:
filename = self.base + '_' + '_'.join(parts) + '.fcs'
else:
filename = '_'.join(parts) + '.fcs'
full_path = d / filename
util.write_fcs(str(full_path),
experiment.channels,
{c: experiment.metadata[c]['range'] for c in experiment.channels},
data_subset.values,
compat_chn_names = False,
compat_negative = False,
**kws)