diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-03-16 18:57:23 -0700 |
|---|---|---|
| committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-07-01 06:04:31 +0000 |
| commit | 2b827e1e363a90e4037172ccb0d57ac0873497fb (patch) | |
| tree | 667c8cced86cfcdedec3678315ee3a796b058f43 /taskflow/patterns | |
| parent | e012d8de0c828c9f9037752dd0377012a638bca3 (diff) | |
| download | taskflow-2b827e1e363a90e4037172ccb0d57ac0873497fb.tar.gz | |
Add support for conditional execution
To make it possible to alter the runtime flow via a simple
conditional like structure make it possible to have the graph
flow link function take a decider that is expected to be some
callable that will decide (via a boolean return) whether the
edge should actually be traversed when running. When a decider
returns false; the affected + successors will be set into the
IGNORE state and they will be exempt from future runtime and
scheduling decisions.
Part of blueprint taskflow-conditional-execution
Change-Id: Iab0ee46f86d6b8e747911174d54a7295b3fa404d
Diffstat (limited to 'taskflow/patterns')
| -rw-r--r-- | taskflow/patterns/graph_flow.py | 18 |
1 files changed, 13 insertions, 5 deletions
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 7d407c8..50a4d61 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -16,6 +16,8 @@ import collections +import six + from taskflow import exceptions as exc from taskflow import flow from taskflow.types import graph as gr @@ -66,16 +68,20 @@ class Flow(flow.Flow): #: Extracts the unsatisified symbol requirements of a single node. _unsatisfied_requires = staticmethod(_unsatisfied_requires) - def link(self, u, v): + def link(self, u, v, decider=None): """Link existing node u as a runtime dependency of existing node v.""" if not self._graph.has_node(u): raise ValueError("Node '%s' not found to link from" % (u)) if not self._graph.has_node(v): raise ValueError("Node '%s' not found to link to" % (v)) - self._swap(self._link(u, v, manual=True)) + if decider is not None: + if not six.callable(decider): + raise ValueError("Decider boolean callback must be callable") + self._swap(self._link(u, v, manual=True, decider=decider)) return self - def _link(self, u, v, graph=None, reason=None, manual=False): + def _link(self, u, v, graph=None, + reason=None, manual=False, decider=None): mutable_graph = True if graph is None: graph = self._graph @@ -85,6 +91,8 @@ class Flow(flow.Flow): attrs = graph.get_edge_data(u, v) if not attrs: attrs = {} + if decider is not None: + attrs[flow.LINK_DECIDER] = decider if manual: attrs[flow.LINK_MANUAL] = True if reason is not None: @@ -281,9 +289,9 @@ class TargetedFlow(Flow): self._subgraph = None return self - def link(self, u, v): + def link(self, u, v, decider=None): """Link existing node u as a runtime dependency of existing node v.""" - super(TargetedFlow, self).link(u, v) + super(TargetedFlow, self).link(u, v, decider=decider) # reset cached subgraph, in case it was affected self._subgraph = None return self |
