summaryrefslogtreecommitdiff
path: root/taskflow/engines/action_engine/engine.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-09-04 13:14:25 -0700
committerJoshua Harlow <harlowja@gmail.com>2015-10-01 22:38:30 -0700
commit79d25e69e8300db5debdfd717ffd80f91c246c10 (patch)
tree134e9764a021dc190a1b158fb27f222d9304908e /taskflow/engines/action_engine/engine.py
parentba4704cd18ab6d799a2de59bdf0feab9b5430a30 (diff)
downloadtaskflow-79d25e69e8300db5debdfd717ffd80f91c246c10.tar.gz
Simplify flow action engine compilation1.22.0
Instead of the added complexity of discarding flow nodes we can simplify the compilation process by just retaining them and jumping over them in further iteration and graph and tree runtime usage. This change moves toward a model that does just this, which makes it also easier to in the future use the newly added flow graph nodes to do meaningful things (like use them as a point to change which flow_detail is used). Change-Id: Icb1695f4b995a0392f940837514774768f222db4
Diffstat (limited to 'taskflow/engines/action_engine/engine.py')
-rw-r--r--taskflow/engines/action_engine/engine.py28
1 files changed, 13 insertions, 15 deletions
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index cc6b1ac..74e150c 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -241,11 +241,10 @@ class ActionEngine(base.Engine):
transient = strutils.bool_from_string(
self._options.get('inject_transient', True))
self.storage.ensure_atoms(
- self._compilation.execution_graph.nodes_iter())
- for node in self._compilation.execution_graph.nodes_iter():
- if node.inject:
- self.storage.inject_atom_args(node.name,
- node.inject,
+ self._runtime.analyzer.iterate_nodes(compiler.ATOMS))
+ for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
+ if atom.inject:
+ self.storage.inject_atom_args(atom.name, atom.inject,
transient=transient)
@fasteners.locked
@@ -255,8 +254,8 @@ class ActionEngine(base.Engine):
# flow/task provided or storage provided, if there are still missing
# dependencies then this flow will fail at runtime (which we can avoid
# by failing at validation time).
- execution_graph = self._compilation.execution_graph
if LOG.isEnabledFor(logging.BLATHER):
+ execution_graph = self._compilation.execution_graph
LOG.blather("Validating scoping and argument visibility for"
" execution graph with %s nodes and %s edges with"
" density %0.3f", execution_graph.number_of_nodes(),
@@ -269,18 +268,17 @@ class ActionEngine(base.Engine):
last_cause = None
last_node = None
missing_nodes = 0
- fetch_func = self.storage.fetch_unsatisfied_args
- for node in execution_graph.nodes_iter():
- node_missing = fetch_func(node.name, node.rebind,
- optional_args=node.optional)
- if node_missing:
- cause = exc.MissingDependencies(node,
- sorted(node_missing),
+ for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
+ atom_missing = self.storage.fetch_unsatisfied_args(
+ atom.name, atom.rebind, optional_args=atom.optional)
+ if atom_missing:
+ cause = exc.MissingDependencies(atom,
+ sorted(atom_missing),
cause=last_cause)
last_cause = cause
- last_node = node
+ last_node = atom
missing_nodes += 1
- missing.update(node_missing)
+ missing.update(atom_missing)
if missing:
# For when a task is provided (instead of a flow) and that
# task is the only item in the graph and its missing deps, avoid