summaryrefslogtreecommitdiff
path: root/taskflow/patterns
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-03-16 18:57:23 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2015-07-01 06:04:31 +0000
commit2b827e1e363a90e4037172ccb0d57ac0873497fb (patch)
tree667c8cced86cfcdedec3678315ee3a796b058f43 /taskflow/patterns
parente012d8de0c828c9f9037752dd0377012a638bca3 (diff)
downloadtaskflow-2b827e1e363a90e4037172ccb0d57ac0873497fb.tar.gz
Add support for conditional execution
To make it possible to alter the runtime flow via a simple conditional like structure make it possible to have the graph flow link function take a decider that is expected to be some callable that will decide (via a boolean return) whether the edge should actually be traversed when running. When a decider returns false; the affected + successors will be set into the IGNORE state and they will be exempt from future runtime and scheduling decisions. Part of blueprint taskflow-conditional-execution Change-Id: Iab0ee46f86d6b8e747911174d54a7295b3fa404d
Diffstat (limited to 'taskflow/patterns')
-rw-r--r--taskflow/patterns/graph_flow.py18
1 files changed, 13 insertions, 5 deletions
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py
index 7d407c8..50a4d61 100644
--- a/taskflow/patterns/graph_flow.py
+++ b/taskflow/patterns/graph_flow.py
@@ -16,6 +16,8 @@
import collections
+import six
+
from taskflow import exceptions as exc
from taskflow import flow
from taskflow.types import graph as gr
@@ -66,16 +68,20 @@ class Flow(flow.Flow):
#: Extracts the unsatisified symbol requirements of a single node.
_unsatisfied_requires = staticmethod(_unsatisfied_requires)
- def link(self, u, v):
+ def link(self, u, v, decider=None):
"""Link existing node u as a runtime dependency of existing node v."""
if not self._graph.has_node(u):
raise ValueError("Node '%s' not found to link from" % (u))
if not self._graph.has_node(v):
raise ValueError("Node '%s' not found to link to" % (v))
- self._swap(self._link(u, v, manual=True))
+ if decider is not None:
+ if not six.callable(decider):
+ raise ValueError("Decider boolean callback must be callable")
+ self._swap(self._link(u, v, manual=True, decider=decider))
return self
- def _link(self, u, v, graph=None, reason=None, manual=False):
+ def _link(self, u, v, graph=None,
+ reason=None, manual=False, decider=None):
mutable_graph = True
if graph is None:
graph = self._graph
@@ -85,6 +91,8 @@ class Flow(flow.Flow):
attrs = graph.get_edge_data(u, v)
if not attrs:
attrs = {}
+ if decider is not None:
+ attrs[flow.LINK_DECIDER] = decider
if manual:
attrs[flow.LINK_MANUAL] = True
if reason is not None:
@@ -281,9 +289,9 @@ class TargetedFlow(Flow):
self._subgraph = None
return self
- def link(self, u, v):
+ def link(self, u, v, decider=None):
"""Link existing node u as a runtime dependency of existing node v."""
- super(TargetedFlow, self).link(u, v)
+ super(TargetedFlow, self).link(u, v, decider=decider)
# reset cached subgraph, in case it was affected
self._subgraph = None
return self