Source code for openalea.core.dataflow_state

# -*- python -*-
#
#       OpenAlea.Core
#
#       Copyright 2006-2009 INRIA - CIRAD - INRA
#
#       File author(s): Jerome Chopard <revesansparole@gmail.com>
#
#       Distributed under the Cecill-C License.
#       See accompanying file LICENSE.txt or copy at
#           http://www.cecill.info/licences/Licence_CeCILL-C_V1-en.html
#
#       OpenAlea WebSite: http://openalea.gforge.inria.fr
#
###############################################################################
""" This module provide an implementation of a way
to store data exchanged between nodes of a dataflow.
"""

__license__ = "Cecill-C"
__revision__ = " $Id$ "

import functools

[docs] def cmp(x, y): """ Replacement for built-in function cmp that was removed in Python 3 Compare the two objects x and y and return an integer according to the outcome. The return value is negative if x < y, zero if x == y and strictly positive if x > y. """ return (x > y) - (x < y)
[docs] class DataflowState(object): """ Store outputs of node and provide a way to access them """ def __init__(self, dataflow): """ constructor args: - dataflow (Dataflow) """ self._dataflow = dataflow self._state = {}
[docs] def clear(self): """Clear state """ self._state.clear()
[docs] def reinit(self): """ Remove all data stored except for the one associated to lonely input ports. """ df = self._dataflow # save state save = dict((pid, dat) for pid, dat in self._state.items() if df.is_in_port(pid) and df.nb_connections(pid) == 0) # clear self.clear() # resume state self._state.update(save)
[docs] def is_ready_for_evaluation(self): """ Test wether the state contains enough information to evaluate the associated dataflow. Simply check that each lonely input port has some data attached to it. """ df = self._dataflow state = self._state return all([pid in state for pid in df.in_ports() if df.nb_connections(pid) == 0])
[docs] def is_valid(self): """ Test wether all data have been computed """ df = self._dataflow state = self._state if not self.is_ready_for_evaluation(): return False # check that all nodes have been evaluated if not all([pid in state for pid in df.out_ports()]): return False return True
[docs] def cmp_port_priority(self, pid1, pid2): """ Compare port priority. Compare first x position of actors then use pids""" df = self._dataflow try: node1 = df.actor(df.vertex(pid1)) except KeyError: return cmp(pid1, pid2) try: node2 = df.actor(df.vertex(pid2)) except KeyError: return cmp(pid1, pid2) p1 = node1.get_ad_hoc_dict().get_metadata('position')[0] p2 = node2.get_ad_hoc_dict().get_metadata('position')[0] ret = cmp(p1, p2) if ret != 0: return ret return cmp(pid1, pid2)
[docs] def get_data(self, pid): """ Retrieve data associated with a port. If pid is an output port, retrieve single item of data if pid is an input port, retrieve data on all output ports connected to this port and return a list of it or a single item if only one port connected args: - pid (pid): id of port either in or out """ df = self._dataflow state = self._state if pid in state: # either out_port or lonely in_port return state[pid] elif df.is_out_port(pid): raise KeyError("value not set for this port") else: npids = list(df.connected_ports(pid)) if len(npids) == 0: raise KeyError("lonely in_port not set") elif len(npids) == 1: return self.get_data(npids[0]) else: npids.sort(key = functools.cmp_to_key(self.cmp_port_priority)) return [self.get_data(pid) for pid in npids]
[docs] def set_data(self, pid, data): """ Store data on a port. This function does not test that the port is an output port args: - pid (pid): id of port - data (any) """ self._state[pid] = data