diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-07-09 17:54:21 -0700 |
|---|---|---|
| committer | Joshua Harlow <harlowja@gmail.com> | 2014-09-07 08:49:15 -0700 |
| commit | e1ef04492ebddab02de2f277079ae0c7448c3b40 (patch) | |
| tree | 6e761c97ebcad5e804774515105e5e229707f323 /taskflow/tests/unit/action_engine | |
| parent | db9fee947dd6614774275ab11884bf5b9a8c1233 (diff) | |
| download | taskflow-e1ef04492ebddab02de2f277079ae0c7448c3b40.tar.gz | |
Translate the engine runner into a well defined state-machine
Instead of having a ad-hoc state-machine being used to perform
the various runtime actions (performed when a engine is ran) we
can gain a much more explict execution model by translating that
ad-hoc state machine to an explicit one instead...
This commit does just that, it adds a new fsm type that can be
used to create, define and run state-machines that respond to
various events (internal or external) and uses it in the runner action
engine module to run the previously ad-hc/implicit state-machine.
Implements blueprint runner-state-machine
Change-Id: Id35633a9de707f3ffb1a4b7e9619af1be009317f
Diffstat (limited to 'taskflow/tests/unit/action_engine')
| -rw-r--r-- | taskflow/tests/unit/action_engine/test_runner.py | 158 |
1 files changed, 148 insertions, 10 deletions
diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py index d7927f8..2e18f6b 100644 --- a/taskflow/tests/unit/action_engine/test_runner.py +++ b/taskflow/tests/unit/action_engine/test_runner.py @@ -18,17 +18,20 @@ import six from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import executor +from taskflow.engines.action_engine import runner from taskflow.engines.action_engine import runtime +from taskflow import exceptions as excp from taskflow.patterns import linear_flow as lf from taskflow import states as st from taskflow import storage from taskflow import test from taskflow.tests import utils as test_utils +from taskflow.types import fsm from taskflow.utils import misc from taskflow.utils import persistence_utils as pu -class RunnerTest(test.TestCase): +class _RunnerTestMixin(object): def _make_runtime(self, flow, initial_state=None): compilation = compiler.PatternCompiler().compile(flow) flow_detail = pu.create_flow_detail(flow) @@ -42,18 +45,20 @@ class RunnerTest(test.TestCase): task_executor = executor.SerialTaskExecutor() task_executor.start() self.addCleanup(task_executor.stop) - return runtime.Runtime(compiler.PatternCompiler().compile(flow), - store, task_notifier, task_executor) + return runtime.Runtime(compilation, store, + task_notifier, task_executor) + +class RunnerTest(test.TestCase, _RunnerTestMixin): def test_running(self): flow = lf.Flow("root") flow.add(*test_utils.make_many(1)) rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.is_running()) + self.assertTrue(rt.runner.runnable()) rt = self._make_runtime(flow, initial_state=st.SUSPENDED) - self.assertFalse(rt.runner.is_running()) + self.assertFalse(rt.runner.runnable()) def test_run_iterations(self): flow = lf.Flow("root") @@ -62,7 +67,7 @@ class RunnerTest(test.TestCase): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.is_running()) + self.assertTrue(rt.runner.runnable()) it = rt.runner.run_iter() state, failures = six.next(it) @@ -94,7 +99,7 @@ class RunnerTest(test.TestCase): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.is_running()) + self.assertTrue(rt.runner.runnable()) transitions = list(rt.runner.run_iter()) state, failures = transitions[-1] @@ -110,7 +115,7 @@ class RunnerTest(test.TestCase): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.is_running()) + self.assertTrue(rt.runner.runnable()) transitions = list(rt.runner.run_iter()) state, failures = transitions[-1] @@ -128,7 +133,7 @@ class RunnerTest(test.TestCase): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.is_running()) + self.assertTrue(rt.runner.runnable()) transitions = [] for state, failures in rt.runner.run_iter(): @@ -152,7 +157,7 @@ class RunnerTest(test.TestCase): flow.add(*happy_tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.is_running()) + self.assertTrue(rt.runner.runnable()) transitions = [] for state, failures in rt.runner.run_iter(): @@ -167,3 +172,136 @@ class RunnerTest(test.TestCase): rt.storage.get_atom_state(happy_tasks[0].name)) self.assertEqual(st.FAILURE, rt.storage.get_atom_state(sad_tasks[0].name)) + + +class RunnerBuilderTest(test.TestCase, _RunnerTestMixin): + def test_builder_manual_process(self): + flow = lf.Flow("root") + tasks = test_utils.make_many( + 1, task_cls=test_utils.TaskNoRequiresNoReturns) + flow.add(*tasks) + + rt = self._make_runtime(flow, initial_state=st.RUNNING) + machine, memory = rt.runner.builder.build() + self.assertTrue(rt.runner.builder.runnable()) + self.assertRaises(fsm.NotInitialized, machine.process_event, 'poke') + + # Should now be pending... + self.assertEqual(st.PENDING, rt.storage.get_atom_state(tasks[0].name)) + + machine.initialize() + self.assertEqual(runner._UNDEFINED, machine.current_state) + self.assertFalse(machine.terminated) + self.assertRaises(excp.NotFound, machine.process_event, 'poke') + last_state = machine.current_state + + reaction, terminal = machine.process_event('start') + self.assertFalse(terminal) + self.assertIsNotNone(reaction) + self.assertEqual(st.RESUMING, machine.current_state) + self.assertRaises(excp.NotFound, machine.process_event, 'poke') + + last_state = machine.current_state + cb, args, kwargs = reaction + next_event = cb(last_state, machine.current_state, + 'start', *args, **kwargs) + reaction, terminal = machine.process_event(next_event) + self.assertFalse(terminal) + self.assertIsNotNone(reaction) + self.assertEqual(st.SCHEDULING, machine.current_state) + self.assertRaises(excp.NotFound, machine.process_event, 'poke') + + last_state = machine.current_state + cb, args, kwargs = reaction + next_event = cb(last_state, machine.current_state, + next_event, *args, **kwargs) + reaction, terminal = machine.process_event(next_event) + self.assertFalse(terminal) + self.assertEqual(st.WAITING, machine.current_state) + self.assertRaises(excp.NotFound, machine.process_event, 'poke') + + # Should now be running... + self.assertEqual(st.RUNNING, rt.storage.get_atom_state(tasks[0].name)) + + last_state = machine.current_state + cb, args, kwargs = reaction + next_event = cb(last_state, machine.current_state, + next_event, *args, **kwargs) + reaction, terminal = machine.process_event(next_event) + self.assertFalse(terminal) + self.assertIsNotNone(reaction) + self.assertEqual(st.ANALYZING, machine.current_state) + self.assertRaises(excp.NotFound, machine.process_event, 'poke') + + last_state = machine.current_state + cb, args, kwargs = reaction + next_event = cb(last_state, machine.current_state, + next_event, *args, **kwargs) + reaction, terminal = machine.process_event(next_event) + self.assertFalse(terminal) + self.assertEqual(runner._GAME_OVER, machine.current_state) + + # Should now be done... + self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name)) + + def test_builder_automatic_process(self): + flow = lf.Flow("root") + tasks = test_utils.make_many( + 1, task_cls=test_utils.TaskNoRequiresNoReturns) + flow.add(*tasks) + + rt = self._make_runtime(flow, initial_state=st.RUNNING) + machine, memory = rt.runner.builder.build() + self.assertTrue(rt.runner.builder.runnable()) + + transitions = list(machine.run_iter('start')) + self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0]) + self.assertEqual((runner._GAME_OVER, st.SUCCESS), transitions[-1]) + self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name)) + + def test_builder_automatic_process_failure(self): + flow = lf.Flow("root") + tasks = test_utils.make_many(1, task_cls=test_utils.NastyFailingTask) + flow.add(*tasks) + + rt = self._make_runtime(flow, initial_state=st.RUNNING) + machine, memory = rt.runner.builder.build() + self.assertTrue(rt.runner.builder.runnable()) + + transitions = list(machine.run_iter('start')) + self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1]) + self.assertEqual(1, len(memory.failures)) + + def test_builder_automatic_process_reverted(self): + flow = lf.Flow("root") + tasks = test_utils.make_many(1, task_cls=test_utils.TaskWithFailure) + flow.add(*tasks) + + rt = self._make_runtime(flow, initial_state=st.RUNNING) + machine, memory = rt.runner.builder.build() + self.assertTrue(rt.runner.builder.runnable()) + + transitions = list(machine.run_iter('start')) + self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1]) + self.assertEqual(st.REVERTED, rt.storage.get_atom_state(tasks[0].name)) + + def test_builder_expected_transition_occurrences(self): + flow = lf.Flow("root") + tasks = test_utils.make_many( + 10, task_cls=test_utils.TaskNoRequiresNoReturns) + flow.add(*tasks) + + rt = self._make_runtime(flow, initial_state=st.RUNNING) + machine, memory = rt.runner.builder.build() + transitions = list(machine.run_iter('start')) + + occurrences = dict((t, transitions.count(t)) for t in transitions) + self.assertEqual(10, occurrences.get((st.SCHEDULING, st.WAITING))) + self.assertEqual(10, occurrences.get((st.WAITING, st.ANALYZING))) + self.assertEqual(9, occurrences.get((st.ANALYZING, st.SCHEDULING))) + self.assertEqual(1, occurrences.get((runner._GAME_OVER, st.SUCCESS))) + self.assertEqual(1, occurrences.get((runner._UNDEFINED, st.RESUMING))) + + self.assertEqual(0, len(memory.next_nodes)) + self.assertEqual(0, len(memory.not_done)) + self.assertEqual(0, len(memory.failures)) |
