summaryrefslogtreecommitdiff
path: root/taskflow/tests/unit/action_engine
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-07-09 17:54:21 -0700
committerJoshua Harlow <harlowja@gmail.com>2014-09-07 08:49:15 -0700
commite1ef04492ebddab02de2f277079ae0c7448c3b40 (patch)
tree6e761c97ebcad5e804774515105e5e229707f323 /taskflow/tests/unit/action_engine
parentdb9fee947dd6614774275ab11884bf5b9a8c1233 (diff)
downloadtaskflow-e1ef04492ebddab02de2f277079ae0c7448c3b40.tar.gz
Translate the engine runner into a well defined state-machine
Instead of having a ad-hoc state-machine being used to perform the various runtime actions (performed when a engine is ran) we can gain a much more explict execution model by translating that ad-hoc state machine to an explicit one instead... This commit does just that, it adds a new fsm type that can be used to create, define and run state-machines that respond to various events (internal or external) and uses it in the runner action engine module to run the previously ad-hc/implicit state-machine. Implements blueprint runner-state-machine Change-Id: Id35633a9de707f3ffb1a4b7e9619af1be009317f
Diffstat (limited to 'taskflow/tests/unit/action_engine')
-rw-r--r--taskflow/tests/unit/action_engine/test_runner.py158
1 files changed, 148 insertions, 10 deletions
diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py
index d7927f8..2e18f6b 100644
--- a/taskflow/tests/unit/action_engine/test_runner.py
+++ b/taskflow/tests/unit/action_engine/test_runner.py
@@ -18,17 +18,20 @@ import six
from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import executor
+from taskflow.engines.action_engine import runner
from taskflow.engines.action_engine import runtime
+from taskflow import exceptions as excp
from taskflow.patterns import linear_flow as lf
from taskflow import states as st
from taskflow import storage
from taskflow import test
from taskflow.tests import utils as test_utils
+from taskflow.types import fsm
from taskflow.utils import misc
from taskflow.utils import persistence_utils as pu
-class RunnerTest(test.TestCase):
+class _RunnerTestMixin(object):
def _make_runtime(self, flow, initial_state=None):
compilation = compiler.PatternCompiler().compile(flow)
flow_detail = pu.create_flow_detail(flow)
@@ -42,18 +45,20 @@ class RunnerTest(test.TestCase):
task_executor = executor.SerialTaskExecutor()
task_executor.start()
self.addCleanup(task_executor.stop)
- return runtime.Runtime(compiler.PatternCompiler().compile(flow),
- store, task_notifier, task_executor)
+ return runtime.Runtime(compilation, store,
+ task_notifier, task_executor)
+
+class RunnerTest(test.TestCase, _RunnerTestMixin):
def test_running(self):
flow = lf.Flow("root")
flow.add(*test_utils.make_many(1))
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- self.assertTrue(rt.runner.is_running())
+ self.assertTrue(rt.runner.runnable())
rt = self._make_runtime(flow, initial_state=st.SUSPENDED)
- self.assertFalse(rt.runner.is_running())
+ self.assertFalse(rt.runner.runnable())
def test_run_iterations(self):
flow = lf.Flow("root")
@@ -62,7 +67,7 @@ class RunnerTest(test.TestCase):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- self.assertTrue(rt.runner.is_running())
+ self.assertTrue(rt.runner.runnable())
it = rt.runner.run_iter()
state, failures = six.next(it)
@@ -94,7 +99,7 @@ class RunnerTest(test.TestCase):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- self.assertTrue(rt.runner.is_running())
+ self.assertTrue(rt.runner.runnable())
transitions = list(rt.runner.run_iter())
state, failures = transitions[-1]
@@ -110,7 +115,7 @@ class RunnerTest(test.TestCase):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- self.assertTrue(rt.runner.is_running())
+ self.assertTrue(rt.runner.runnable())
transitions = list(rt.runner.run_iter())
state, failures = transitions[-1]
@@ -128,7 +133,7 @@ class RunnerTest(test.TestCase):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- self.assertTrue(rt.runner.is_running())
+ self.assertTrue(rt.runner.runnable())
transitions = []
for state, failures in rt.runner.run_iter():
@@ -152,7 +157,7 @@ class RunnerTest(test.TestCase):
flow.add(*happy_tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- self.assertTrue(rt.runner.is_running())
+ self.assertTrue(rt.runner.runnable())
transitions = []
for state, failures in rt.runner.run_iter():
@@ -167,3 +172,136 @@ class RunnerTest(test.TestCase):
rt.storage.get_atom_state(happy_tasks[0].name))
self.assertEqual(st.FAILURE,
rt.storage.get_atom_state(sad_tasks[0].name))
+
+
+class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
+ def test_builder_manual_process(self):
+ flow = lf.Flow("root")
+ tasks = test_utils.make_many(
+ 1, task_cls=test_utils.TaskNoRequiresNoReturns)
+ flow.add(*tasks)
+
+ rt = self._make_runtime(flow, initial_state=st.RUNNING)
+ machine, memory = rt.runner.builder.build()
+ self.assertTrue(rt.runner.builder.runnable())
+ self.assertRaises(fsm.NotInitialized, machine.process_event, 'poke')
+
+ # Should now be pending...
+ self.assertEqual(st.PENDING, rt.storage.get_atom_state(tasks[0].name))
+
+ machine.initialize()
+ self.assertEqual(runner._UNDEFINED, machine.current_state)
+ self.assertFalse(machine.terminated)
+ self.assertRaises(excp.NotFound, machine.process_event, 'poke')
+ last_state = machine.current_state
+
+ reaction, terminal = machine.process_event('start')
+ self.assertFalse(terminal)
+ self.assertIsNotNone(reaction)
+ self.assertEqual(st.RESUMING, machine.current_state)
+ self.assertRaises(excp.NotFound, machine.process_event, 'poke')
+
+ last_state = machine.current_state
+ cb, args, kwargs = reaction
+ next_event = cb(last_state, machine.current_state,
+ 'start', *args, **kwargs)
+ reaction, terminal = machine.process_event(next_event)
+ self.assertFalse(terminal)
+ self.assertIsNotNone(reaction)
+ self.assertEqual(st.SCHEDULING, machine.current_state)
+ self.assertRaises(excp.NotFound, machine.process_event, 'poke')
+
+ last_state = machine.current_state
+ cb, args, kwargs = reaction
+ next_event = cb(last_state, machine.current_state,
+ next_event, *args, **kwargs)
+ reaction, terminal = machine.process_event(next_event)
+ self.assertFalse(terminal)
+ self.assertEqual(st.WAITING, machine.current_state)
+ self.assertRaises(excp.NotFound, machine.process_event, 'poke')
+
+ # Should now be running...
+ self.assertEqual(st.RUNNING, rt.storage.get_atom_state(tasks[0].name))
+
+ last_state = machine.current_state
+ cb, args, kwargs = reaction
+ next_event = cb(last_state, machine.current_state,
+ next_event, *args, **kwargs)
+ reaction, terminal = machine.process_event(next_event)
+ self.assertFalse(terminal)
+ self.assertIsNotNone(reaction)
+ self.assertEqual(st.ANALYZING, machine.current_state)
+ self.assertRaises(excp.NotFound, machine.process_event, 'poke')
+
+ last_state = machine.current_state
+ cb, args, kwargs = reaction
+ next_event = cb(last_state, machine.current_state,
+ next_event, *args, **kwargs)
+ reaction, terminal = machine.process_event(next_event)
+ self.assertFalse(terminal)
+ self.assertEqual(runner._GAME_OVER, machine.current_state)
+
+ # Should now be done...
+ self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name))
+
+ def test_builder_automatic_process(self):
+ flow = lf.Flow("root")
+ tasks = test_utils.make_many(
+ 1, task_cls=test_utils.TaskNoRequiresNoReturns)
+ flow.add(*tasks)
+
+ rt = self._make_runtime(flow, initial_state=st.RUNNING)
+ machine, memory = rt.runner.builder.build()
+ self.assertTrue(rt.runner.builder.runnable())
+
+ transitions = list(machine.run_iter('start'))
+ self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0])
+ self.assertEqual((runner._GAME_OVER, st.SUCCESS), transitions[-1])
+ self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name))
+
+ def test_builder_automatic_process_failure(self):
+ flow = lf.Flow("root")
+ tasks = test_utils.make_many(1, task_cls=test_utils.NastyFailingTask)
+ flow.add(*tasks)
+
+ rt = self._make_runtime(flow, initial_state=st.RUNNING)
+ machine, memory = rt.runner.builder.build()
+ self.assertTrue(rt.runner.builder.runnable())
+
+ transitions = list(machine.run_iter('start'))
+ self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1])
+ self.assertEqual(1, len(memory.failures))
+
+ def test_builder_automatic_process_reverted(self):
+ flow = lf.Flow("root")
+ tasks = test_utils.make_many(1, task_cls=test_utils.TaskWithFailure)
+ flow.add(*tasks)
+
+ rt = self._make_runtime(flow, initial_state=st.RUNNING)
+ machine, memory = rt.runner.builder.build()
+ self.assertTrue(rt.runner.builder.runnable())
+
+ transitions = list(machine.run_iter('start'))
+ self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1])
+ self.assertEqual(st.REVERTED, rt.storage.get_atom_state(tasks[0].name))
+
+ def test_builder_expected_transition_occurrences(self):
+ flow = lf.Flow("root")
+ tasks = test_utils.make_many(
+ 10, task_cls=test_utils.TaskNoRequiresNoReturns)
+ flow.add(*tasks)
+
+ rt = self._make_runtime(flow, initial_state=st.RUNNING)
+ machine, memory = rt.runner.builder.build()
+ transitions = list(machine.run_iter('start'))
+
+ occurrences = dict((t, transitions.count(t)) for t in transitions)
+ self.assertEqual(10, occurrences.get((st.SCHEDULING, st.WAITING)))
+ self.assertEqual(10, occurrences.get((st.WAITING, st.ANALYZING)))
+ self.assertEqual(9, occurrences.get((st.ANALYZING, st.SCHEDULING)))
+ self.assertEqual(1, occurrences.get((runner._GAME_OVER, st.SUCCESS)))
+ self.assertEqual(1, occurrences.get((runner._UNDEFINED, st.RESUMING)))
+
+ self.assertEqual(0, len(memory.next_nodes))
+ self.assertEqual(0, len(memory.not_done))
+ self.assertEqual(0, len(memory.failures))