Source code for cytoflowgui.workflow.workflow_item

#!/usr/bin/env python3.8
# coding: latin-1

# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2022
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
cytoflowgui.workflow.workflow_item
----------------------------------

Represents one step in an analysis pipeline.  Wraps a single 
`IOperation` and any `IView` of its result.  
"""

import sys, logging, warnings, threading

import pandas as pd
import matplotlib.pyplot as plt

from traits.api import (HasStrictTraits, Instance, Str, Enum, Any, Dict, 
                        Tuple, List, DelegatesTo, ComparisonMode, Property,
                        observe, cached_property)

from cytoflow import Experiment
from cytoflow.utility import CytoflowError, CytoflowOpError, CytoflowViewError

from .serialization import camel_registry

# http://stackoverflow.com/questions/1977362/how-to-create-module-wide-variables-in-python
this = sys.modules[__name__]
this.last_view_plotted = None

logger = logging.getLogger(__name__)

[docs]class WorkflowItem(HasStrictTraits): """ The basic unit of a Workflow: wraps an operation and a list of views. This class is serialized and synchronized between the `LocalWorkflow` and the `RemoteWorkflow`. Notes ----- Because we serialize instances of this, we have to pay careful attention to which traits are ``transient`` (and aren't serialized). Additionally, traits marked as ``status`` are only serialized remote --> local. For more details about the synchronization, see the module docstring for `cytoflowgui.workflow` """ friendly_id = DelegatesTo('operation') """The operation's id""" name = DelegatesTo('operation') """The operation's name""" operation = Instance('cytoflowgui.workflow.operations.IWorkflowOperation', copy = "ref") """The operation that this `WorkflowItem` wraps""" # the IViews associated with this operation views = List(Instance('cytoflowgui.workflow.views.IWorkflowView'), copy = "ref", comparison_mode = ComparisonMode.identity) """The `IView`\'s associated with this operation""" current_view = Instance('cytoflowgui.workflow.views.IWorkflowView', copy = "ref") """The currently selected view""" result = Instance(Experiment, transient = True) """The `Experiment` that is the result of applying `operation` to the `previous_wi`'s `result`""" # the channels, conditions and statistics from result. usually these would be # Properties (ie, determined dynamically), but that's hard with the # multiprocess model. channels = List(Str, status = True) """The channels from `result`""" conditions = Dict(Str, pd.Series, status = True) """The conditions from `result`""" metadata = Dict(Str, Any, status = True) """The metadata from `result`""" statistics = Dict(Tuple(Str, Str), pd.Series, status = True) """The statistics from `result`""" default_view = Property(Instance('cytoflowgui.workflow.views.IWorkflowView'), observe = 'operation') """The default view for this workflow item (if any)""" previous_wi = Instance('WorkflowItem', transient = True) """The previous `WorkflowItem` in the workflow""" next_wi = Instance('WorkflowItem', transient = True) """The next `WorkflowItem` in the workflow""" # the workflow that we're a part of. need to make klass = HasStrictTraits because # we could be an instance of either LocalWorkflow or RemoteWorkflow workflow = Instance(HasStrictTraits, transient = True) """ The `LocalWorkflow` or `RemoteWorkflow` that this `WorkflowItem` is a part of """ # MAGIC: first value is the default status = Enum("invalid", "waiting", "estimating", "applying", "valid", "loading", status = True) """This `WorkflowItem`'s status""" # report the errors and warnings op_error = Str(status = True) """Errors from `operation`'s `IOperation.apply` method""" op_error_trait = Str(status = True) """The trait that caused the error in `op_error`""" op_warning = Str(status = True) """Warnings from `operation`'s `IOperation.apply` method""" op_warning_trait = Str(status = True) """The trait that caused the warning in `op_warning`""" estimate_error = Str(status = True) """Errors from `operation`'s `IOperation.estimate` method""" estimate_warning = Str(status = True) """Warnings from `operation`'s `IOperation.estimate` method""" view_error = Str(status = True) """Errors from the most recently plotted view's `IView.plot` method""" view_error_trait = Str(status = True) """The trait that caused the error in `view_error`""" view_warning = Str(status = True) """Warnings from the most recently plotted view's `IView.plot` method""" view_warning_trait = Str(status = True) """The trait that caused the warning in `view_warning`""" plot_names = List(Any, status = True) """ The possible values for the **plot_name** parameter of `current_view`'s `IView.plot` method. Retrieved from that view's **enum_plots()** method and updated automatically when `result` or `current_view` changes. """ plot_names_label = Str(status = True) """ The GUI label for the element that allows users to select a plot name from `plot_names`. Updated automatically when `result` or `current_view` changes. """ # synchronization primitives for plotting matplotlib_events = Any(transient = True) """`threading.Event` to synchronize matplotlib plotting across process boundaries""" plot_lock = Any(transient = True) """`threading.Lock` to synchronize matplotlib plotting across process boundaries""" lock = Instance(threading.RLock, (), transient = True) """`threading.Lock` for updating this `WorkflowItem`'s traits"""
[docs] def edit_traits(self, view = None, parent = None, kind = None, context = None, handler = None, id = "", # @ReservedAssignment scrollable=None, **args): """ Override the base `traits.has_traits.HasTraits.edit_traits` to make it go looking for views in the handler. """ if context is None: context = self view = self.trait_view(view, handler = handler) return view.ui(context, parent, kind, self.trait_view_elements(), handler, id, scrollable, args)
[docs] def trait_view(self, name = None, view_element = None, handler = None): return self.__class__._trait_view( name, view_element, self.default_traits_view, self.trait_view_elements, self.visible_traits, handler if handler else self)
def __str__(self): return "<{}: {}>".format(self.__class__.__name__, self.operation.__class__.__name__) def __repr__(self): return "<{}: {}>".format(self.__class__.__name__, self.operation.__class__.__name__) # property: default_view @cached_property def _get_default_view(self): try: return self.operation.default_view() except AttributeError: return None @observe('[result,current_view.+type]') def _update_plot_names(self, _): if self.current_view is None: return if self.result: experiment = self.result elif self.previous_wi and self.previous_wi.result: experiment = self.previous_wi.result else: return None plot_iter = self.current_view.enum_plots(experiment) plot_names = [x for x in plot_iter] if plot_names == self.plot_names: return if plot_names == [None] or plot_names == []: self.plot_names = [] self.plot_names_label = "" else: self.plot_names = plot_names self.plot_names_label = ", ".join(plot_iter.by)
[docs] def estimate(self): """Call `operation`'s `IOperation.estimate`""" logger.debug("WorkflowItem.estimate :: {}".format((self))) prev_result = self.previous_wi.result if self.previous_wi else None with warnings.catch_warnings(record = True) as w: try: self.status = "estimating" try: plt.gcf().canvas.set_working(True) except AttributeError: pass self.operation.estimate(prev_result) self.estimate_error = "" if w: self.estimate_warning = w[-1].message.__str__() else: self.estimate_warning = "" return True except CytoflowOpError as e: if e.args[0]: self.op_error_trait = e.args[0] self.estimate_error = e.args[-1] self.status = "invalid" return False except CytoflowError as e: self.estimate_error = e.__str__() self.status = "invalid" return False finally: try: plt.gcf().canvas.set_working(False) except AttributeError: pass
[docs] def apply(self): """ Calls `operation`'s `IOperation.apply`; applies this `WorkflowItem`'s operation to `previous_wi`'s result """ logger.debug("WorkflowItem.apply :: {}".format((self))) self.workflow.apply_calls += 1 prev_result = self.previous_wi.result if self.previous_wi else None with warnings.catch_warnings(record = True) as w: try: self.status = "applying" try: plt.gcf().canvas.set_working(True) except AttributeError: pass r = self.operation.apply(prev_result) self.result = r self.op_error = "" self.op_error_trait = "" if w: self.op_warning = w[-1].message.__str__() else: self.op_warning = "" self.op_warning_trait = "" self.status = "valid" except CytoflowOpError as e: self.result = None if e.args[0]: self.op_error_trait = e.args[0] self.op_error = e.args[-1] self.status = "invalid" except CytoflowError as e: self.result = None self.op_error = e.args[-1] self.status = "invalid" finally: try: plt.gcf().canvas.set_working(False) except AttributeError: pass
[docs] def plot(self): """ Call `current_view`'s `IView.plot` on `result`, or on `previous_wi`'s `result` if there's no current `result`. """ logger.debug("WorkflowItem.plot :: {}".format((self))) self.workflow.plot_calls += 1 if not self.current_view: self.plot_lock.acquire() self.matplotlib_events.clear() plt.clf() plt.show() self.matplotlib_events.set() self.plot_lock.release() return with warnings.catch_warnings(record = True) as w: try: self.plot_lock.acquire() self.matplotlib_events.clear() try: plt.gcf().canvas.set_working(True) except AttributeError: pass if this.last_view_plotted is not None and "interactive" in this.last_view_plotted.traits(): this.last_view_plotted.interactive = False plot_params = self.current_view.plot_params.trait_get() if self.result: self.current_view.plot(self.result, **plot_params) elif self.previous_wi and self.previous_wi.result: self.current_view.plot(self.previous_wi.result, **plot_params) warnings.warn("Warning: plotting previous operation's result") else: raise CytoflowViewError(None, "Nothing to plot!") self.view_error = "" self.view_error_trait = "" if "interactive" in self.current_view.traits(): self.current_view.interactive = True this.last_view_plotted = self.current_view # the remote canvas/pyplot interface of the multiprocess backend # is NOT interactive. this call lets us batch together all # the plot updates plt.show() except CytoflowViewError as e: if e.args[0]: self.view_error_trait = e.args[0] self.view_error = e.args[-1] plt.clf() plt.show() except CytoflowError as e: self.view_error = e.__str__() plt.clf() plt.show() finally: self.matplotlib_events.set() try: plt.gcf().canvas.set_working(False) except AttributeError: pass self.plot_lock.release() if w: self.view_warning = w[-1].message.__str__() else: self.view_warning = "" return True
@camel_registry.dumper(WorkflowItem, 'workflow-item', version = 4) def _dump_wi(wi): # we really don't need to keep copying around the fcs metadata # it will still get saved out in the import op if 'fcs_metadata' in wi.metadata: del wi.metadata['fcs_metadata'] return dict(operation = wi.operation, views = wi.views, channels = wi.channels, conditions = wi.conditions, metadata = wi.metadata, statistics = wi.statistics, current_view = wi.current_view) @camel_registry.dumper(WorkflowItem, 'workflow-item', version = 3) def _dump_wi_v3(wi): # we really don't need to keep copying around the fcs metadata # it will still get saved out in the import op if 'fcs_metadata' in wi.metadata: del wi.metadata['fcs_metadata'] return dict(operation = wi.operation, views = wi.views, channels = wi.channels, conditions = wi.conditions, metadata = wi.metadata, statistics = wi.statistics, current_view = wi.current_view, default_view = wi.default_view) @camel_registry.dumper(WorkflowItem, 'workflow-item', version = 2) def _dump_wi_v2(wi): # we really don't need to keep copying around the fcs metadata # it will still get saved out in the import op if 'fcs_metadata' in wi.metadata: del wi.metadata['fcs_metadata'] return dict(deletable = wi.deletable, operation = wi.operation, views = wi.views, channels = wi.channels, conditions = wi.conditions, metadata = wi.metadata, statistics = wi.statistics, current_view = wi.current_view, default_view = wi.default_view) @camel_registry.dumper(WorkflowItem, 'workflow-item', version = 1) def _dump_wi_v1(wi): return dict(deletable = False if wi.operation.id == "edu.mit.synbio.cytoflow.operations.import" else True, operation = wi.operation, views = wi.views, channels = wi.channels, conditions = wi.conditions, metadata = wi.metadata, statistics = list(wi.statistics.keys()), current_view = wi.current_view, default_view = wi.default_view) @camel_registry.loader('workflow-item', version = 1) def _load_wi_v1(data, version): data['statistics'] = {tuple(k) : pd.Series() for k in data['statistics']} del data['deletable'] del data['default_view'] ret = WorkflowItem(**data) return ret @camel_registry.loader('workflow-item', version = 2) def _load_wi_v2(data, version): del data['deletable'] del data['default_view'] return WorkflowItem(**data) @camel_registry.loader('workflow-item', version = 3) def _load_wi_v3(data, version): del data['default_view'] return WorkflowItem(**data) @camel_registry.loader('workflow-item', version = 4) def _load_wi(data, version): return WorkflowItem(**data)