Source code for openalea.core.observer

#   -*- python -*-
#
#         OpenAlea.Core
#
#         Copyright 2006-2009 INRIA - CIRAD - INRA
#
#         File author(s): Samuel Dufour-Kowalski <samuel.dufour@sophia.inria.fr>
#                         Christophe Pradal <christophe.prada@cirad.fr>
#
#         Distributed under the Cecill-C License.
#         See accompanying file LICENSE.txt or copy at
#             http://www.cecill.info/licences/Licence_CeCILL-C_V1-en.html
#
#         OpenAlea WebSite : http://openalea.gforge.inria.fr
#
###############################################################################
"""This module defines all the classes for the Observer design Pattern"""

from __future__ import print_function
__license__ = "Cecill-C"
__revision__ = " $Id$ "

try:
    import openalea.grapheditor
    graphobserver = True
except ImportError as e:
    graphobserver = False

###############################################################################

if graphobserver:
    from openalea.grapheditor.observer import *
else:
   import weakref
   from collections import deque


[docs] class Observed(object): """ Observed Object """ def __init__(self): self.listeners = set() self.__isNotifying = False self.__postNotifs = [] #calls to execute after a notication is done self.__exclusive = None self.__blockNotifs = False
[docs] def register_listener(self, listener): """ Add listener to list of listeners. If the observed is currently notifying, the registration is delayed until it finishes.""" if(not self.__isNotifying): wr = weakref.ref(listener, self.unregister_listener) self.listeners.add(wr) else: def push_listener_after(): self.register_listener(listener) self.__postNotifs.append(push_listener_after)
[docs] def unregister_listener(self, listener): """ Remove listener from the list of listeners """ if(not self.__isNotifying): if isinstance(listener, weakref.ref): self.listeners.discard(listener) else: toDiscard = None for lis in self.listeners: if lis() == listener: toDiscard = lis break self.listeners.discard(toDiscard) else: def discard_listener_after(): self.unregister_listener(listener) self.__postNotifs.append(discard_listener_after)
[docs] def transfer_listeners(self, newObs): """Takes all this observed's listeners, unregisters them from itself and registers them to the newObs, calling listener.change_observed if implemented""" self.__isNotifying = True for lis in self.listeners: self.unregister_listener(lis) newObs.register_listener(lis()) lis().change_observed(self, newObs) self.__isNotifying = False
[docs] def exclusive_command(self, who, command, *args, **kargs): """Executes a call "command" and if it triggers any signal from this observed object along the way, "who" will be the only one to be notified""" ln = [i() for i in self.listeners] if who not in ln: raise Exception("Observed.exclusive : " + str(who) + " is not registered") self.__exclusive = who command(*args, **kargs) self.__exclusive = None
[docs] def notify_listeners(self, event=None): """ Send a notification to all listeners :param event: an object to pass to the notify function """ self.__isNotifying = True #If an exclusive handler is set let's only #notify that one. if(self.__exclusive): self.__exclusive.call_notify(self, event) else: toDelete = [] for ref in self.listeners: listener = ref() if(listener is None): toDelete.append(ref) continue if(not listener.is_notification_locked()): try: listener.call_notify(self, event) except Exception as e: print("Warning :", str(self), "notification of", str(listener), "failed", e) for dead in toDelete: self.listeners.discard(dead) self.__isNotifying = False self.post_notification()
[docs] def post_notification(self): for action in self.__postNotifs: action() self.__postNotifs = []
def __getstate__(self): """ Pickle function """ odict = self.__dict__.copy() odict['listeners'] = set() return odict
[docs] class AbstractListener(object): """ Listener base class """ notify_lock = None def __init__(self): self.__deaf = False self.__eventQueue = None
[docs] def initialise(self, observed): """ Register self as a listener to observed """ assert observed != None observed.register_listener(self) if (self.notify_lock == None): self.notify_lock = list()
[docs] def change_observed(self, old, new): return
[docs] def is_notification_locked(self): return self.notify_lock != None and len(self.notify_lock)>0
[docs] def deaf(self, setDeaf=True): self.__deaf=setDeaf
[docs] def queue_call_notifications(self, call, *args, **kwargs): """ Runs a call and queues notifications coming from that call to this listener. Once the call is finished, queued notifications are processed FIFO.""" self.__eventQueue = deque() call(*args, **kwargs) self.__process_queued_calls()
def __process_queued_calls(self): queue = deque(self.__eventQueue) self.__eventQueue = None for e in queue: self.call_notify(e[0], e[1])
[docs] def call_notify(self, sender, event=None): """ Basic implementation call directly notify function Sub class can override this method to implement different call strategy (like signal slot) """ #if we are running a call with delayed event delivery #we queue the events: if self.__eventQueue is not None : self.__eventQueue.append((sender, event)) elif not self.__deaf: self.notify(sender, event)
[docs] def notify(self, sender, event=None): """ This function is called by observed objects :param sender: the observed object which send notification :param event: the data associated to the notification """ raise NotImplementedError()
# Decorator function to protect an AbstractListener against notification
[docs] def lock_notify(method): def wrapped(self, *args, **kwargs): if(self.notify_lock == None): self.notify_lock = list() self.notify_lock.append(True) try: result = method(self, *args, **kwargs) finally: self.notify_lock.pop() return result return wrapped