Source code for cytoflowgui.workflow.workflow

#!/usr/bin/env python3.8
# coding: latin-1

# (c) Massachusetts Institute of Technology 2015-2018
# (c) Brian Teague 2018-2022
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
cytoflowgui.workflow.workflow
-----------------------------

The main model for the GUI.

At its core, the model is a list of `WorkflowItem` instances.  A `WorkflowItem`
wraps an operation, its completion status, the result of applying it to
the previous `WorkflowItem`'s result, and `IView`'s on that result.  The Workflow
also maintains a "current" or selected WorkflowItem.

The left panel of the GUI is a View on this object (viewing the list of
WorkflowItem instances), and the right panel of the GUI is a View of the
selected WorkflowItem's current view.

So far, so simple.  However, in a single-threaded GUI application, the UI
freezes when something processor-intensive is happening.  Adding another thread
doesn't help matters because of the CPython global interpreter lock; while
Python is otherwise computing, the GUI doesn't update.  To solve this, the 
Workflow maintains a copy of itself in a separate process.  The `LocalWorkflow`
is the one that is viewed by the GUI; the `RemoteWorkflow` is the one that
actually loads the data and does the processing.  Thus the GUI remains
responsive.  Changed attributes in either Workflow are noticed by a set of 
Traits handlers, which send those changes to the other process.

This process is also where the plotting happens.  For an explanation of how
the plots are ferried back to the GUI, see the module docstring for
`cytoflowgui.matplotlib_backend_local` and `cytoflowgui.matplotlib_backend_remote`
"""

'''
Data & control flow notes: 

When a local operation trait changes (with the metadata "apply" or "estimate")
1. Handler in local workflow notices
2. Trait is sent to remote workflow
3. Remote operation trait is updated
4. If the trait has metadata "apply":
   4a. Handler in remote workflow notices the updated operation trait
   4b. Handler asks operation "should I call "apply?"
   4c. If yes, handler adds an apply() call to the execution queue.
5. If the trait has metadata "estimate":
   5a. Handler in remote workflow notices the updated operation trait
   5b: Handler asks the operation "should I clear the estimate?"
   5c: If yes, handler calls operation.clear_estimate()

When a remote operation trait changes (with the metadata "status")
1. Handler in remote workflow notices
2. Trait is sent to local workflow
3. Local trait is updated
4. UI notices and reacts

When the "Estimate" button is pressed
1. The Controller fires the operation's "do_estimate" event
2. The dedicated local workflow handler notices
3. The local workflow sends an ESTIMATE message to the remote workflow
4. The remote workflow adds a call to estimate() to its execution queue

When a WorkflowItem's result changes
1. The handler in the remote workflow notices
2. It updates the WorkflowItem's channels, conditions, statistics, metadata
3. It asks the current view "should I plot?"
   3a. If yes, adds a call to plot() to the execution queue

When the previous WorkflowItem's result changes
1. The handler in the remote workflow notices
2. It asks the current operation if the estimate should be cleared?
   2a. If yes, clear the estimate
3. It asks the current operation if it should re-apply?
   3a. If yes, sets the WI status to Invalid" and adds a call to apply() to the execution queue
4. It asks if the current view should re-plot?
   4a. If yes, adds a call to plot() to the execution queue

When a remote operation trait changes (with the metadata estimate_result)
1. The handler in the remote workflow notices
2. If the operation's WorkflowItem is the current item and it has a current view:
   2a. Ask if the view should update
   2b. If yes, add a call to plot() to the execution queue
3. Ask the operation if it should apply?
   3a. If yes, add a call to apply() to the execution queue

When a local view trait changes 
1. Handler in the local workflow notices
2. Trait is sent to remote workflow
3. Remote view instance trait is updated
4. Remote workflow handler notices
5. If the WorkflowInstance is the selected one, and the view is the current view:
   5a. Asks if the view should plot?
   5b. If yes, adds the plot call to the execution queue

'''

import sys, threading, logging

from queue import Queue, PriorityQueue

from traits.api import (HasStrictTraits, Int, Bool, Instance, Any, List,
                        observe, ComparisonMode)

import matplotlib.pyplot as plt
import cytoflowgui.matplotlib_backend_remote

from cytoflowgui.utility import log_exception

from .workflow_item import WorkflowItem
from .views import IWorkflowView
from . import Changed

logger = logging.getLogger(__name__)

[docs]class Msg(object): """ Messages passed between the local and remote workflows""" # make an entirely new workflow. payload: # - a list of WorkflowItems NEW_WORKFLOW = "NEW_WORKFLOW" # add an item to the remote workflow. payload: # - the index of the new workflow item # - the workflow item itself ADD_ITEMS = "ADD_ITEMS" # remove a workflow item. payload: # - the index of the item to remove REMOVE_ITEMS = "REMOVE_ITEMS" # update which workflow item is selected. payload: # - the index of the selected workflow item SELECT = "SELECT" # send an update to an operation. payload: # - the WorkflowItem index # - the name of the updated trait # - the new trait value UPDATE_OP = "UPDATE_OP" # send an update to a view. payload: # - the WorkflowItem index # - the view id # - the name of the updated trait # - the new trait value UPDATE_VIEW = "UPDATE_VIEW" CHANGE_CURRENT_VIEW = "CHANGE_CURRENT_VIEW" CHANGE_CURRENT_PLOT = "CHANGE_CURRENT_PLOT" # send an update to a WorkflowItem. payload: # - the WorkflowItem index # - the name of the updated trait # - the new trait value UPDATE_WI = "UPDATE_WI" # CHANGE_DEFAULT_SCALE = "CHANGE_DEFAULT_SCALE" ESTIMATE = "ESTIMATE" APPLY_CALLED = "APPLY_CALLED" PLOT_CALLED = "PLOT_CALLED" # a statement to evaluate in the remote process, or the result of that # evaluation. EVAL = "EVAL" # statement execution EXEC = "EXEC" # apply & execute all RUN_ALL = "RUN_ALL" SHUTDOWN = "SHUTDOWN"
[docs]class UniquePriorityQueue(PriorityQueue): """ A PriorityQueue that only allows one copy of each item. http://stackoverflow.com/questions/5997189/how-can-i-make-a-unique-value-priority-queue-in-python """ def _init(self, maxsize): PriorityQueue._init(self, maxsize) self.values = set() def _put(self, item): if item[1] not in self.values: self.values.add(item[1]) PriorityQueue._put(self, item) else: pass def _get(self): item = PriorityQueue._get(self) self.values.remove(item[1]) return item
[docs]def filter_unpicklable(obj): """Recursively filter unpicklable items from lists and dictionaries""" if type(obj) is list: return [filter_unpicklable(x) for x in obj] elif type(obj) is dict: return {x: filter_unpicklable(obj[x]) for x in obj} else: if not hasattr(obj, '__getstate__') and not isinstance(obj, (str, int, float, tuple, list, set, dict)): return "Unpicklable: {}".format(type(obj)) else: return obj
[docs]class LocalWorkflow(HasStrictTraits): """ The workflow that is maintained in the "local" process -- ie, the same one that showing a GUI. """ workflow = List(WorkflowItem, comparison_mode = ComparisonMode.identity) """The list of `WorkflowItem`\s""" selected = Instance(WorkflowItem) """The currently-selected `WorkflowItem`""" modified = Bool """Has this workflow been modified since it was loaded?""" recv_thread = Instance(threading.Thread) """The `threading.Thread` that receives messages from the remote process""" send_thread = Instance(threading.Thread) """The `threading.Thread` that sends messages to the remote process""" message_q = Instance(Queue, ()) """The `queue.Queue` of messages to send to the remote process""" debug = Bool(False) # count the number of times the remote process calls apply() or plot(). # useful for debugging apply_calls = Int(0) plot_calls = Int(0) # evaluate an expression in the remote process. useful for debugging. eval_event = Instance(threading.Event, ()) eval_result = Any # execute an expression in the remote process. useful for debugging. exec_event = Instance(threading.Event, ()) def __init__(self, remote_workflow_connection, **kwargs): super(LocalWorkflow, self).__init__(**kwargs) self.recv_thread = threading.Thread(target = self.recv_main, name = "local workflow recv", args = [remote_workflow_connection]) self.recv_thread.daemon = True self.recv_thread.start() self.send_thread = threading.Thread(target = self.send_main, name = "local workflow send", args = [remote_workflow_connection]) self.send_thread.daemon = True self.send_thread.start()
[docs] def recv_main(self, child_conn): """ The method that runs in `recv_thread` to receive messages from the remote process. """ while child_conn.poll(None): try: (msg, payload) = child_conn.recv() except EOFError: return logger.debug("LocalWorkflow.recv_main :: {}".format((msg, payload))) try: if msg == Msg.UPDATE_WI: (idx, name, new) = payload wi = self.workflow[idx] if not wi.trait(name).status: raise RuntimeError("Tried to set a local non-status wi trait") with wi.lock: wi.trait_set(**{name : new}) elif msg == Msg.UPDATE_OP: (idx, name, new) = payload wi = self.workflow[idx] if not wi.operation.trait(name).status: raise RuntimeError("Tried to set a local non-status trait") with wi.lock: wi.operation.trait_set(**{name : new}) elif msg == Msg.UPDATE_VIEW: (idx, view_id, name, new) = payload wi = self.workflow[idx] view = next((x for x in wi.views if x.id == view_id)) if not view.trait(name).status: raise RuntimeError("Tried to set a local non-status trait") with wi.lock: view.trait_set(**{name : new}) elif msg == Msg.APPLY_CALLED: self.apply_calls = payload elif msg == Msg.PLOT_CALLED: self.plot_calls = payload elif msg == Msg.EVAL: self.eval_result = payload self.eval_event.set() elif msg == Msg.EXEC: self.exec_event.set() elif msg == Msg.SHUTDOWN: self.message_q.put(None) return else: raise RuntimeError("Bad message from remote") except Exception: log_exception()
[docs] def send_main(self, child_conn): """ The method that runs in `send_thread` to send messages from `message_q` to the remote process """ try: while True: msg = self.message_q.get() if msg == None: return child_conn.send(msg) except Exception: log_exception()
[docs] def run_all(self): """ Send the RUN_ALL message to the remote process """ self.message_q.put((Msg.RUN_ALL, None))
@observe('workflow') def _on_new_workflow(self, _): logger.debug("LocalWorkflow._on_new_workflow") self.selected = None # send the new workflow to the child process self.message_q.put((Msg.NEW_WORKFLOW, self.workflow)) @observe('workflow:items') def _on_workflow_add_remove_items(self, event): logger.debug("LocalWorkflow._on_workflow_add_remove_items :: {}" .format(event)) idx = event.index self.modified = True # remove deleted items from the linked list if event.removed: assert len(event.removed) == 1 removed = event.removed[0] if removed.previous_wi: removed.previous_wi.next_wi = removed.next_wi if removed.next_wi: removed.next_wi.previous_wi = removed.previous_wi self.message_q.put((Msg.REMOVE_ITEMS, idx)) if removed == self.selected: self.selected = None # add new items to the linked list if event.added: assert len(event.added) == 1 if idx > 0: # populate the new wi with metadata from the old one self.workflow[idx].channels = list(self.workflow[idx - 1].channels) self.workflow[idx].conditions = dict(self.workflow[idx - 1].conditions) self.workflow[idx].metadata = dict(self.workflow[idx - 1].metadata) self.workflow[idx].statistics = dict(self.workflow[idx - 1].statistics) self.workflow[idx - 1].next_wi = self.workflow[idx] self.workflow[idx].previous_wi = self.workflow[idx - 1] if idx < len(self.workflow) - 1: self.workflow[idx].next_wi = self.workflow[idx + 1] self.workflow[idx + 1].previous_wi = self.workflow[idx] self.message_q.put((Msg.ADD_ITEMS, (idx, event.added[0]))) @observe('selected') def _on_selected_changed(self, event): logger.debug("LocalWorkflow._on_selected_changed :: {}".format(event)) if event.new is None: idx = -1 else: idx = self.workflow.index(event.new) self.message_q.put((Msg.SELECT, idx)) @observe('workflow:items:operation:[+apply,+estimate]') def _on_operation_changed(self, event): logger.debug("LocalWorkflow._operation_changed :: {}".format(event)) if event.name == 'changed': event.name = event.new event.new = event.object.trait_get(event.name)[event.name] wi = next((x for x in self.workflow if x.operation is event.object)) idx = self.workflow.index(wi) self.message_q.put((Msg.UPDATE_OP, (idx, event.name, event.new))) self.modified = True @observe('workflow:items:operation:do_estimate') def _on_estimate(self, event): logger.debug("LocalWorkflow._on_estimate :: {}".format(event)) wi = next((x for x in self.workflow if x.operation is event.object)) idx = self.workflow.index(wi) self.message_q.put((Msg.ESTIMATE, idx)) @observe('workflow:items:views:items:+type') def _on_view_changed(self, event): logger.debug("LocalWorkflow._view_changed :: {}".format(event)) if event.name == 'changed': event.name = event.new event.new = event.object.trait_get(event.name)[event.name] # filter out anything that's transient (like properties) # in particular, delegate properties should NOT be sent. # they get sent over as an operation property and are properly # updated on the remote end (no need to set them to be transient) if (event.object.trait(event.name).transient or event.object.trait(event.name).status or event.object.trait(event.name).type == "delegate"): return wi = next((x for x in self.workflow if event.object in x.views)) idx = self.workflow.index(wi) self.message_q.put((Msg.UPDATE_VIEW, (idx, event.object.id, event.name, event.new))) self.modified = True @observe('workflow:items:current_view') def _on_current_view_changed(self, event): logger.debug("LocalWorkflow._on_current_view_changed :: {}" .format(event)) idx = self.workflow.index(event.object) view = event.object.current_view self.message_q.put((Msg.CHANGE_CURRENT_VIEW, (idx, view)))
[docs] def remote_eval(self, expr): """ Evaluate an expression in the remote process and return the result """ self.eval_event.clear() self.message_q.put((Msg.EVAL, expr)) self.eval_event.wait() return self.eval_result
[docs] def remote_exec(self, expr): """ Execute an expression in the remote process and wait until it completes """ self.exec_event.clear() self.message_q.put((Msg.EXEC, expr)) self.exec_event.wait() return
[docs] def wi_sync(self, wi, variable, value, timeout = 30): """ Set `WorkflowItem.status` on the remote workflow, then wait for it to propogate here. """ assert(wi.trait_get([variable])[variable] != value) idx = self.workflow.index(wi) self.remote_exec("self.workflow[{0}].trait_set({1} = '{2}')".format(idx, variable, value)) self.wi_waitfor(wi, variable, value, timeout)
[docs] def wi_waitfor(self, wi, variable, value, timeout = 30): """Waits a configurable amount of time for wi's status to change to status""" from traits.util.async_trait_wait import wait_for_condition try: wait_for_condition(lambda v: v.trait_get([variable])[variable] == value, wi, variable, timeout) except RuntimeError: logger.error("Timed out after {} seconds waiting for {} to become {}.\n" "Current value: {}\n" "WorkflowItem.op_error: {}\n" "WorkflowItem.estimate_error: {}\n" "WorkflowItem.view_error: {}" .format(timeout, variable, value, wi.trait_get([variable])[variable], wi.op_error, wi.estimate_error, wi.view_error)) raise
[docs] def shutdown_remote_process(self, remote_process): """Shut down the remote process""" # tell the remote process to shut down self.message_q.put((Msg.SHUTDOWN, None)) # the SHUTDOWN message stops the sending thread self.send_thread.join() # the reply from the remote process stops the receiving thread self.recv_thread.join() # make sure the remote process has shut down entirely remote_process.join()
[docs]class RemoteWorkflow(HasStrictTraits): """ The workflow that is maintained in the "remote" process -- ie, the one that actually does the processing. """ workflow = List(WorkflowItem, comparison_mode = ComparisonMode.identity) """The list of `WorkflowItem`'s """ selected = Instance(WorkflowItem) """The currently-selected `WorkflowItem`""" last_view_plotted = Instance(IWorkflowView) """The last `IWorkflowView` that was plotted""" send_thread = Instance(threading.Thread) """The `threading.Thread` that sends messages to the local process""" recv_thread = Instance(threading.Thread) """The `threading.Thread` that receives messages from the local process""" message_q = Instance(Queue, ()) """The `queue.Queue` of messages to send to the local process""" # synchronization primitives for plotting matplotlib_events = Any """`threading.Event` to synchronize matplotlib plotting across process boundaries""" plot_lock = Any """`threading.Lock` to synchronize matplotlib plotting across process boundaries""" exec_q = Instance(UniquePriorityQueue, ()) exec_lock = Instance(threading.Lock, ()) # for debugging apply_calls = Int(0) plot_calls = Int(0)
[docs] def run(self, parent_workflow_conn, parent_mpl_conn = None): """ The method that runs the main loop of the remote process """ # set up the plotting synchronization primitives self.matplotlib_events = threading.Event() self.plot_lock = threading.Lock() # configure matplotlib backend to use the pipe if parent_mpl_conn: plt.new_figure_manager = lambda num, parent_conn = parent_mpl_conn, process_events = self.matplotlib_events, plot_lock = self.plot_lock, *args, **kwargs: \ cytoflowgui.matplotlib_backend_remote.new_figure_manager(num, parent_conn = parent_conn, process_events = process_events, plot_lock = plot_lock, constrained_layout = True, *args, **kwargs) # start threads self.recv_thread = threading.Thread(target = self.recv_main, name = "remote recv thread", args = [parent_workflow_conn]) self.recv_thread.start() self.send_thread = threading.Thread(target = self.send_main, name = "remote send thread", args = [parent_workflow_conn]) self.send_thread.start() # loop and process updates while True: try: _, (wi, fn) = self.exec_q.get() if wi: with wi.lock: fn() else: if fn is None: self.shutdown() return else: fn() self.exec_q.task_done() except Exception: log_exception()
[docs] def recv_main(self, parent_conn): """ The method that runs in the `recv_thread` to receive messages from the local process. """ while parent_conn.poll(None): try: (msg, payload) = parent_conn.recv() except EOFError: return logger.debug("RemoteWorkflow.recv_main :: {}".format((msg, payload))) try: if msg == Msg.NEW_WORKFLOW: self.workflow = [] for new_item in payload: idx = len(self.workflow) wi = WorkflowItem(workflow = self) wi.lock.acquire() wi.matplotlib_events = self.matplotlib_events wi.plot_lock = self.plot_lock wi.copy_traits(new_item, status = lambda t: t is not True) self.workflow.append(wi) for wi in self.workflow: wi.lock.release() elif msg == Msg.RUN_ALL: for wi in self.workflow: wi.lock.acquire() for idx, wi in enumerate(self.workflow): if hasattr(wi.operation, "estimate"): self.exec_q.put((idx - 0.1, (wi, wi.estimate))) self.exec_q.put((idx, (wi, wi.apply))) for wi in self.workflow: wi.lock.release() elif msg == Msg.ADD_ITEMS: (idx, new_item) = payload wi = WorkflowItem(workflow = self) wi.lock.acquire() wi.copy_traits(new_item) wi.matplotlib_events = self.matplotlib_events wi.plot_lock = self.plot_lock self.workflow.insert(idx, wi) self.exec_q.put((idx, (wi, wi.apply))) wi.lock.release() elif msg == Msg.REMOVE_ITEMS: idx = payload self.workflow.remove(self.workflow[idx]) elif msg == Msg.SELECT: idx = payload if idx == -1: self.selected = None else: self.selected = self.workflow[idx] elif msg == Msg.UPDATE_OP: (idx, name, new) = payload wi = self.workflow[idx] with wi.lock: if wi.operation.trait(name).status: raise RuntimeError("Tried to set remote status trait '{}'".format(name)) if wi.operation.trait(name).fixed: raise RuntimeError("Tried to set remote fixed trait '{}'".format(name)) if wi.operation.trait(name).transient: raise RuntimeError("Tried to set remote transient trait '{}'") wi.operation.trait_set(**{name : new}) elif msg == Msg.UPDATE_VIEW: (idx, view_id, name, new) = payload wi = self.workflow[idx] try: view = next((x for x in wi.views if x.id == view_id)) except StopIteration: logger.warn("RemoteWorkflow: Couldn't find view '{}'".format(view_id)) continue with wi.lock: if view.trait(name).status: raise RuntimeError("Tried to set remote status trait '{}'".format(name)) if view.trait(name).fixed: raise RuntimeError("Tried to set remote fixed trait '{}'".format(name)) if view.trait(name).transient: raise RuntimeError("Tried to set remote transient trait '{}'".format(name)) view.trait_set(**{name : new}) elif msg == Msg.CHANGE_CURRENT_VIEW: (idx, view) = payload wi = self.workflow[idx] try: wi.current_view = next((x for x in wi.views if x.id == view.id)) except StopIteration: if wi.default_view and view.id == wi.default_view.id: wi.views.append(wi.default_view) wi.current_view = wi.default_view else: wi.views.append(view) wi.current_view = view elif msg == Msg.CHANGE_CURRENT_PLOT: (idx, plot) = payload wi = self.workflow[idx] wi.current_plot = plot elif msg == Msg.ESTIMATE: idx = payload wi = self.workflow[idx] self.exec_q.put((idx - 0.1, (wi, wi.estimate))) elif msg == Msg.SHUTDOWN: # tell the parent process to shut down self.exec_q.put((len(self.workflow) + 1, (None, None))) # exit the receiving thread return elif msg == Msg.EXEC: self.exec_q.put((sys.maxsize - 1, (None, lambda self = self, q = self.message_q, expr = payload: q.put((Msg.EXEC, exec(expr)))))) elif msg == Msg.EVAL: self.exec_q.put((sys.maxsize, (None, lambda self = self, q = self.message_q, expr = payload: q.put((Msg.EVAL, eval(expr)))))) else: raise RuntimeError("Bad command in the remote workflow") except Exception: log_exception()
[docs] def send_main(self, parent_conn): """ The method that runs in `send_thread` to send messages to the local process. """ try: while True: msg = self.message_q.get() parent_conn.send(msg) if msg[0] == Msg.SHUTDOWN: return except Exception: log_exception()
[docs] def shutdown(self): """Shut down the remote process""" # make sure the receiving thread is shut down self.recv_thread.join() # shut down the sending thread self.message_q.put((Msg.SHUTDOWN, 0)) self.send_thread.join()
@observe('apply_calls', post_init = True) def _on_apply_called(self, _): self.message_q.put((Msg.APPLY_CALLED, self.apply_calls)) @observe('plot_calls', post_init = True) def _on_plot_called(self, _): self.message_q.put((Msg.PLOT_CALLED, self.plot_calls)) @observe('workflow:items', post_init = True) def _on_workflow_add_remove_items(self, event): logger.debug("RemoteWorkflow._on_workflow_add_remove_items :: {}" .format((event.index, event.removed, event.added))) idx = event.index # remove deleted items from the linked list if event.removed: assert len(event.removed) == 1 removed = event.removed[0] if removed.previous_wi: removed.previous_wi.next_wi = removed.next_wi if removed.next_wi: removed.next_wi.previous_wi = removed.previous_wi # add new items to the linked list if event.added: assert len(event.added) == 1 if idx > 0: self.workflow[idx - 1].next_wi = self.workflow[idx] self.workflow[idx].previous_wi = self.workflow[idx - 1] if idx < len(self.workflow) - 1: self.workflow[idx].next_wi = self.workflow[idx + 1] self.workflow[idx + 1].previous_wi = self.workflow[idx] @observe('workflow:items:operation:+apply') def _on_operation_apply_changed(self, event): logger.debug("RemoteWorkflow._operation_apply_changed :: {}".format(event)) assert(event.name != 'changed') wi = next((x for x in self.workflow if x.operation is event.object)) idx = self.workflow.index(wi) if wi.operation.should_apply(Changed.APPLY, event): with wi.lock: wi.result = None wi.status = "invalid" self.exec_q.put((idx, (wi, wi.apply))) @observe('workflow:items:operation:+estimate') def _on_operation_estimate_changed(self, event): logger.debug("RemoteWorkflow._operation_estimate_changed :: {}".format(event)) assert(event.name != 'changed') wi = next((x for x in self.workflow if x.operation is event.object)) if wi.operation.should_clear_estimate(Changed.ESTIMATE, event): wi.operation.clear_estimate() @observe('workflow:items:operation:+estimate_result') def _on_estimate_result_changed(self, event): logger.debug("RemoteWorkflow._estimate_result_changed :: {}".format(event)) wi = next((x for x in self.workflow if x.operation is event.object)) idx = self.workflow.index(wi) if wi.operation.should_apply(Changed.ESTIMATE_RESULT, event): self.exec_q.put((idx, (wi, wi.apply))) if (wi == self.selected and wi.current_view and wi.current_view.should_plot(Changed.ESTIMATE_RESULT, event)): self.exec_q.put((idx + 0.1, (wi, wi.plot))) @observe('workflow:items:operation:+status') def _on_operation_status_changed(self, event): wi = next((x for x in self.workflow if x.operation is event.object)) idx = self.workflow.index(wi) self.message_q.put((Msg.UPDATE_OP, (idx, event.name, event.new))) @observe('workflow:items:result', post_init = True) def _on_result_changed(self, event): logger.debug("RemoteWorkflow._result_changed :: {}".format(event)) wi = event.object idx = self.workflow.index(wi) if wi.result: wi.channels = list(wi.result.channels) wi.conditions = dict(wi.result.conditions) wi.statistics = dict(wi.result.statistics) # some things in metadata are unpicklable, functions and such, # so filter them out. wi.metadata = filter_unpicklable(dict(wi.result.metadata)) if (wi == self.selected and wi.current_view and wi.current_view.should_plot(Changed.RESULT, event)): self.exec_q.put((idx + 0.1, (wi, wi.plot))) if wi.next_wi: next_wi = wi.next_wi next_idx = self.workflow.index(next_wi) if (next_wi is not None and next_wi.operation.should_clear_estimate(Changed.PREV_RESULT, event)): next_wi.operation.clear_estimate() if (next_wi is not None and next_wi.operation.should_apply(Changed.PREV_RESULT, event)): with next_wi.lock: next_wi.result = None next_wi.status = 'invalid' self.exec_q.put((next_idx, (next_wi, next_wi.apply))) if (next_wi == self.selected and next_wi.current_view and next_wi.current_view.should_plot(Changed.PREV_RESULT, event)): self.exec_q.put((next_idx + 0.1), (wi, wi.plot)) @observe('workflow:items:views:items:+type') def _on_view_changed(self, event): logger.debug("RemoteWorkflow._view_changed :: {}".format(event)) assert(event.name != 'changed') # filter out anything that's transient (like properties) if event.object.trait(event.name).transient: return view = event.object wi = next((x for x in self.workflow if view in x.views)) idx = self.workflow.index(wi) if event.object.trait(event.name).status: self.message_q.put((Msg.UPDATE_VIEW, (idx, view.id, event.name, event.new))) return if (wi == self.selected and wi.current_view == view and wi.current_view.should_plot(Changed.VIEW, event)): self.exec_q.put((idx + 0.1, (wi, wi.plot))) @observe('workflow:items:current_view') def _on_current_view_changed(self, event): logger.debug("RemoteWorkflow._current_view_changed :: {}".format(event)) wi = event.object if wi == self.selected: idx = self.workflow.index(wi) self.exec_q.put((idx + 0.1, (wi, wi.plot))) @observe('workflow:items:+status', post_init = True) def _on_workflow_item_changed(self, event): logger.debug("RemoteWorkflow._workflow_item_changed :: {}".format(event)) idx = self.workflow.index(event.object) self.message_q.put((Msg.UPDATE_WI, (idx, event.name, event.new))) @observe('selected', post_init = True) def _on_selected_workflowitem_changed(self, event): logger.debug("RemoteWorkflow._selected_changed :: {}".format(event)) if event.new is None: plt.clf() plt.show() else: idx = self.workflow.index(event.new) if event.new.current_view: self.exec_q.put((idx + 0.1, (event.new, event.new.plot)))