summaryrefslogtreecommitdiff
path: root/taskflow
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@gmail.com>2015-06-17 23:34:31 -0700
committerJoshua Harlow <harlowja@gmail.com>2015-06-18 07:41:25 -0700
commit2fa4af7a24584b211ca549f2df715c2d126360c8 (patch)
tree87bbb400dca07e484efc22961670a5f963e13c85 /taskflow
parentcaf37be3456dd3a0a03624afc8c25bcd20459e02 (diff)
downloadtaskflow-2fa4af7a24584b211ca549f2df715c2d126360c8.tar.gz
Split-off the additional retry states from the task states
Split the states that are not task states (but are retry states) into there own additional set and then use that set and a new function to validate the transition at other locations in the code-base. This makes the transitions that are valid for tasks/retries easily viewable, more easy to read and understand, and more correct (instead of being a mix of task + retry atom transitions and states). Change-Id: I9515c19daf59a21e581f51e757ece2050f348214
Diffstat (limited to 'taskflow')
-rw-r--r--taskflow/engines/action_engine/actions/base.py4
-rw-r--r--taskflow/engines/action_engine/actions/retry.py4
-rw-r--r--taskflow/engines/action_engine/actions/task.py4
-rw-r--r--taskflow/engines/action_engine/analyzer.py49
-rw-r--r--taskflow/engines/action_engine/engine.py1
-rw-r--r--taskflow/engines/action_engine/runtime.py88
-rw-r--r--taskflow/states.py41
-rw-r--r--taskflow/tests/unit/action_engine/test_runner.py6
-rw-r--r--taskflow/tests/unit/test_check_transition.py17
9 files changed, 131 insertions, 83 deletions
diff --git a/taskflow/engines/action_engine/actions/base.py b/taskflow/engines/action_engine/actions/base.py
index 869ef22..369a6c6 100644
--- a/taskflow/engines/action_engine/actions/base.py
+++ b/taskflow/engines/action_engine/actions/base.py
@@ -35,7 +35,3 @@ class Action(object):
def __init__(self, storage, notifier):
self._storage = storage
self._notifier = notifier
-
- @abc.abstractmethod
- def handles(self, atom):
- """Checks if this action handles the provided atom."""
diff --git a/taskflow/engines/action_engine/actions/retry.py b/taskflow/engines/action_engine/actions/retry.py
index f69d5a5..c8cad50 100644
--- a/taskflow/engines/action_engine/actions/retry.py
+++ b/taskflow/engines/action_engine/actions/retry.py
@@ -48,10 +48,6 @@ class RetryAction(base.Action):
super(RetryAction, self).__init__(storage, notifier)
self._executor = futures.SynchronousExecutor()
- @staticmethod
- def handles(atom):
- return isinstance(atom, retry_atom.Retry)
-
def _get_retry_args(self, retry, addons=None):
arguments = self._storage.fetch_mapped_args(
retry.rebind,
diff --git a/taskflow/engines/action_engine/actions/task.py b/taskflow/engines/action_engine/actions/task.py
index 2a11bf8..ab4b50d 100644
--- a/taskflow/engines/action_engine/actions/task.py
+++ b/taskflow/engines/action_engine/actions/task.py
@@ -32,10 +32,6 @@ class TaskAction(base.Action):
super(TaskAction, self).__init__(storage, notifier)
self._task_executor = task_executor
- @staticmethod
- def handles(atom):
- return isinstance(atom, task_atom.BaseTask)
-
def _is_identity_transition(self, old_state, state, task, progress):
if state in base.SAVE_RESULT_STATES:
# saving result is never identity transition
diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py
index a8f20c0..bef7b8b 100644
--- a/taskflow/engines/action_engine/analyzer.py
+++ b/taskflow/engines/action_engine/analyzer.py
@@ -34,6 +34,7 @@ class Analyzer(object):
def __init__(self, runtime):
self._storage = runtime.storage
self._execution_graph = runtime.compilation.execution_graph
+ self._check_atom_transition = runtime.check_atom_transition
def get_next_nodes(self, node=None):
if node is None:
@@ -93,37 +94,37 @@ class Analyzer(object):
available_nodes.append(node)
return available_nodes
- def _is_ready_for_execute(self, task):
- """Checks if task is ready to be executed."""
- state = self.get_state(task)
- intention = self._storage.get_atom_intention(task.name)
- transition = st.check_task_transition(state, st.RUNNING)
+ def _is_ready_for_execute(self, atom):
+ """Checks if atom is ready to be executed."""
+ state = self.get_state(atom)
+ intention = self._storage.get_atom_intention(atom.name)
+ transition = self._check_atom_transition(atom, state, st.RUNNING)
if not transition or intention != st.EXECUTE:
return False
- task_names = []
- for prev_task in self._execution_graph.predecessors(task):
- task_names.append(prev_task.name)
+ atom_names = []
+ for prev_atom in self._execution_graph.predecessors(atom):
+ atom_names.append(prev_atom.name)
- task_states = self._storage.get_atoms_states(task_names)
+ atom_states = self._storage.get_atoms_states(atom_names)
return all(state == st.SUCCESS and intention == st.EXECUTE
- for state, intention in six.itervalues(task_states))
+ for state, intention in six.itervalues(atom_states))
- def _is_ready_for_revert(self, task):
- """Checks if task is ready to be reverted."""
- state = self.get_state(task)
- intention = self._storage.get_atom_intention(task.name)
- transition = st.check_task_transition(state, st.REVERTING)
+ def _is_ready_for_revert(self, atom):
+ """Checks if atom is ready to be reverted."""
+ state = self.get_state(atom)
+ intention = self._storage.get_atom_intention(atom.name)
+ transition = self._check_atom_transition(atom, state, st.REVERTING)
if not transition or intention not in (st.REVERT, st.RETRY):
return False
- task_names = []
- for prev_task in self._execution_graph.successors(task):
- task_names.append(prev_task.name)
+ atom_names = []
+ for prev_atom in self._execution_graph.successors(atom):
+ atom_names.append(prev_atom.name)
- task_states = self._storage.get_atoms_states(task_names)
+ atom_states = self._storage.get_atoms_states(atom_names)
return all(state in (st.PENDING, st.REVERTED)
- for state, intention in six.itervalues(task_states))
+ for state, intention in six.itervalues(atom_states))
def iterate_subgraph(self, atom):
"""Iterates a subgraph connected to given atom."""
@@ -148,10 +149,10 @@ class Analyzer(object):
return self._execution_graph.node[atom].get('retry')
def is_success(self):
- for node in self._execution_graph.nodes_iter():
- if self.get_state(node) != st.SUCCESS:
+ for atom in self.iterate_all_nodes():
+ if self.get_state(atom) != st.SUCCESS:
return False
return True
- def get_state(self, node):
- return self._storage.get_atom_state(node.name)
+ def get_state(self, atom):
+ return self._storage.get_atom_state(atom.name)
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index 124b8a5..df8d1d3 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -295,6 +295,7 @@ class ActionEngine(base.Engine):
self.storage,
self.atom_notifier,
self._task_executor)
+ self._runtime.compile()
self._compiled = True
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py
index fc16fd9..0fba861 100644
--- a/taskflow/engines/action_engine/runtime.py
+++ b/taskflow/engines/action_engine/runtime.py
@@ -14,6 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import functools
+
from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
from taskflow.engines.action_engine import analyzer as an
@@ -22,6 +24,7 @@ from taskflow.engines.action_engine import runner as ru
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
from taskflow import states as st
+from taskflow import task
from taskflow.utils import misc
@@ -38,7 +41,30 @@ class Runtime(object):
self._task_executor = task_executor
self._storage = storage
self._compilation = compilation
- self._walkers_to_names = {}
+ self._atom_cache = {}
+
+ def compile(self):
+ # Build out a cache of commonly used item that are associated
+ # with the contained atoms (by name), and are useful to have for
+ # quick lookup on...
+ change_state_handlers = {
+ 'task': functools.partial(self.task_action.change_state,
+ progress=0.0),
+ 'retry': self.retry_action.change_state,
+ }
+ for atom in self.analyzer.iterate_all_nodes():
+ metadata = {}
+ walker = sc.ScopeWalker(self.compilation, atom, names_only=True)
+ if isinstance(atom, task.BaseTask):
+ check_transition_handler = st.check_task_transition
+ change_state_handler = change_state_handlers['task']
+ else:
+ check_transition_handler = st.check_retry_transition
+ change_state_handler = change_state_handlers['retry']
+ metadata['scope_walker'] = walker
+ metadata['check_transition_handler'] = check_transition_handler
+ metadata['change_state_handler'] = change_state_handler
+ self._atom_cache[atom.name] = metadata
@property
def compilation(self):
@@ -75,56 +101,52 @@ class Runtime(object):
self._atom_notifier,
self._task_executor)
+ def check_atom_transition(self, atom, current_state, target_state):
+ """Checks if the atom can transition to the provided target state."""
+ # This does not check if the name exists (since this is only used
+ # internally to the engine, and is not exposed to atoms that will
+ # not exist and therefore doesn't need to handle that case).
+ metadata = self._atom_cache[atom.name]
+ check_transition_handler = metadata['check_transition_handler']
+ return check_transition_handler(current_state, target_state)
+
def fetch_scopes_for(self, atom_name):
"""Fetches a walker of the visible scopes for the given atom."""
try:
- return self._walkers_to_names[atom_name]
+ metadata = self._atom_cache[atom_name]
except KeyError:
- atom = None
- for node in self.analyzer.iterate_all_nodes():
- if node.name == atom_name:
- atom = node
- break
- if atom is not None:
- walker = sc.ScopeWalker(self.compilation, atom,
- names_only=True)
- self._walkers_to_names[atom_name] = walker
- else:
- walker = None
- return walker
+ # This signals to the caller that there is no walker for whatever
+ # atom name was given that doesn't really have any associated atom
+ # known to be named with that name; this is done since the storage
+ # layer will call into this layer to fetch a scope for a named
+ # atom and users can provide random names that do not actually
+ # exist...
+ return None
+ else:
+ return metadata['scope_walker']
# Various helper methods used by the runtime components; not for public
# consumption...
- def reset_nodes(self, nodes, state=st.PENDING, intention=st.EXECUTE):
+ def reset_nodes(self, atoms, state=st.PENDING, intention=st.EXECUTE):
tweaked = []
- node_state_handlers = [
- (self.task_action, {'progress': 0.0}),
- (self.retry_action, {}),
- ]
- for node in nodes:
+ for atom in atoms:
+ metadata = self._atom_cache[atom.name]
if state or intention:
- tweaked.append((node, state, intention))
+ tweaked.append((atom, state, intention))
if state:
- handled = False
- for h, kwargs in node_state_handlers:
- if h.handles(node):
- h.change_state(node, state, **kwargs)
- handled = True
- break
- if not handled:
- raise TypeError("Unknown how to reset state of"
- " node '%s' (%s)" % (node, type(node)))
+ change_state_handler = metadata['change_state_handler']
+ change_state_handler(atom, state)
if intention:
- self.storage.set_atom_intention(node.name, intention)
+ self.storage.set_atom_intention(atom.name, intention)
return tweaked
def reset_all(self, state=st.PENDING, intention=st.EXECUTE):
return self.reset_nodes(self.analyzer.iterate_all_nodes(),
state=state, intention=intention)
- def reset_subgraph(self, node, state=st.PENDING, intention=st.EXECUTE):
- return self.reset_nodes(self.analyzer.iterate_subgraph(node),
+ def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):
+ return self.reset_nodes(self.analyzer.iterate_subgraph(atom),
state=state, intention=intention)
def retry_subflow(self, retry):
diff --git a/taskflow/states.py b/taskflow/states.py
index c5ea579..265d6b2 100644
--- a/taskflow/states.py
+++ b/taskflow/states.py
@@ -69,10 +69,10 @@ _ALLOWED_JOB_TRANSITIONS = frozenset((
def check_job_transition(old_state, new_state):
- """Check that job can transition from old_state to new_state.
+ """Check that job can transition from from ``old_state`` to ``new_state``.
- If transition can be performed, it returns True. If transition
- should be ignored, it returns False. If transition is not
+ If transition can be performed, it returns true. If transition
+ should be ignored, it returns false. If transition is not
valid, it raises an InvalidState exception.
"""
if old_state == new_state:
@@ -138,10 +138,10 @@ _IGNORED_FLOW_TRANSITIONS = frozenset(
def check_flow_transition(old_state, new_state):
- """Check that flow can transition from old_state to new_state.
+ """Check that flow can transition from ``old_state`` to ``new_state``.
- If transition can be performed, it returns True. If transition
- should be ignored, it returns False. If transition is not
+ If transition can be performed, it returns true. If transition
+ should be ignored, it returns false. If transition is not
valid, it raises an InvalidState exception.
"""
if old_state == new_state:
@@ -171,18 +171,37 @@ _ALLOWED_TASK_TRANSITIONS = frozenset((
(REVERTING, FAILURE), # revert failed
(REVERTED, PENDING), # try again
-
- (SUCCESS, RETRYING), # retrying retry controller
- (RETRYING, RUNNING), # run retry controller that has been retrying
))
def check_task_transition(old_state, new_state):
- """Check that task can transition from old_state to new_state.
+ """Check that task can transition from ``old_state`` to ``new_state``.
- If transition can be performed, it returns True, False otherwise.
+ If transition can be performed, it returns true, false otherwise.
"""
pair = (old_state, new_state)
if pair in _ALLOWED_TASK_TRANSITIONS:
return True
return False
+
+
+# Retry state transitions
+# See: http://docs.openstack.org/developer/taskflow/states.html#retry
+
+_ALLOWED_RETRY_TRANSITIONS = list(_ALLOWED_TASK_TRANSITIONS)
+_ALLOWED_RETRY_TRANSITIONS.extend([
+ (SUCCESS, RETRYING), # retrying retry controller
+ (RETRYING, RUNNING), # run retry controller that has been retrying
+])
+_ALLOWED_RETRY_TRANSITIONS = frozenset(_ALLOWED_RETRY_TRANSITIONS)
+
+
+def check_retry_transition(old_state, new_state):
+ """Check that retry can transition from ``old_state`` to ``new_state``.
+
+ If transition can be performed, it returns true, false otherwise.
+ """
+ pair = (old_state, new_state)
+ if pair in _ALLOWED_RETRY_TRANSITIONS:
+ return True
+ return False
diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py
index 98ae0e2..eb0f0a2 100644
--- a/taskflow/tests/unit/action_engine/test_runner.py
+++ b/taskflow/tests/unit/action_engine/test_runner.py
@@ -45,8 +45,10 @@ class _RunnerTestMixin(object):
task_executor = executor.SerialTaskExecutor()
task_executor.start()
self.addCleanup(task_executor.stop)
- return runtime.Runtime(compilation, store,
- task_notifier, task_executor)
+ r = runtime.Runtime(compilation, store,
+ task_notifier, task_executor)
+ r.compile()
+ return r
class RunnerTest(test.TestCase, _RunnerTestMixin):
diff --git a/taskflow/tests/unit/test_check_transition.py b/taskflow/tests/unit/test_check_transition.py
index bed7bc9..7c820fd 100644
--- a/taskflow/tests/unit/test_check_transition.py
+++ b/taskflow/tests/unit/test_check_transition.py
@@ -87,7 +87,7 @@ class CheckTaskTransitionTest(TransitionTest):
def test_from_success_state(self):
self.assertTransitions(from_state=states.SUCCESS,
- allowed=(states.REVERTING, states.RETRYING),
+ allowed=(states.REVERTING,),
ignored=(states.RUNNING, states.SUCCESS,
states.PENDING, states.FAILURE,
states.REVERTED))
@@ -112,6 +112,21 @@ class CheckTaskTransitionTest(TransitionTest):
states.RUNNING,
states.SUCCESS, states.FAILURE))
+
+class CheckRetryTransitionTest(CheckTaskTransitionTest):
+
+ def setUp(self):
+ super(CheckRetryTransitionTest, self).setUp()
+ self.check_transition = states.check_retry_transition
+ self.transition_exc_regexp = '^Retry transition.*not allowed'
+
+ def test_from_success_state(self):
+ self.assertTransitions(from_state=states.SUCCESS,
+ allowed=(states.REVERTING, states.RETRYING),
+ ignored=(states.RUNNING, states.SUCCESS,
+ states.PENDING, states.FAILURE,
+ states.REVERTED))
+
def test_from_retrying_state(self):
self.assertTransitions(from_state=states.RETRYING,
allowed=(states.RUNNING,),