diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-07-09 17:54:21 -0700 |
|---|---|---|
| committer | Joshua Harlow <harlowja@gmail.com> | 2014-09-07 08:49:15 -0700 |
| commit | e1ef04492ebddab02de2f277079ae0c7448c3b40 (patch) | |
| tree | 6e761c97ebcad5e804774515105e5e229707f323 /taskflow/tests/unit | |
| parent | db9fee947dd6614774275ab11884bf5b9a8c1233 (diff) | |
| download | taskflow-e1ef04492ebddab02de2f277079ae0c7448c3b40.tar.gz | |
Translate the engine runner into a well defined state-machine
Instead of having a ad-hoc state-machine being used to perform
the various runtime actions (performed when a engine is ran) we
can gain a much more explict execution model by translating that
ad-hoc state machine to an explicit one instead...
This commit does just that, it adds a new fsm type that can be
used to create, define and run state-machines that respond to
various events (internal or external) and uses it in the runner action
engine module to run the previously ad-hc/implicit state-machine.
Implements blueprint runner-state-machine
Change-Id: Id35633a9de707f3ffb1a4b7e9619af1be009317f
Diffstat (limited to 'taskflow/tests/unit')
| -rw-r--r-- | taskflow/tests/unit/action_engine/test_runner.py | 158 | ||||
| -rw-r--r-- | taskflow/tests/unit/test_types.py | 150 |
2 files changed, 297 insertions, 11 deletions
diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py index d7927f8..2e18f6b 100644 --- a/taskflow/tests/unit/action_engine/test_runner.py +++ b/taskflow/tests/unit/action_engine/test_runner.py @@ -18,17 +18,20 @@ import six from taskflow.engines.action_engine import compiler from taskflow.engines.action_engine import executor +from taskflow.engines.action_engine import runner from taskflow.engines.action_engine import runtime +from taskflow import exceptions as excp from taskflow.patterns import linear_flow as lf from taskflow import states as st from taskflow import storage from taskflow import test from taskflow.tests import utils as test_utils +from taskflow.types import fsm from taskflow.utils import misc from taskflow.utils import persistence_utils as pu -class RunnerTest(test.TestCase): +class _RunnerTestMixin(object): def _make_runtime(self, flow, initial_state=None): compilation = compiler.PatternCompiler().compile(flow) flow_detail = pu.create_flow_detail(flow) @@ -42,18 +45,20 @@ class RunnerTest(test.TestCase): task_executor = executor.SerialTaskExecutor() task_executor.start() self.addCleanup(task_executor.stop) - return runtime.Runtime(compiler.PatternCompiler().compile(flow), - store, task_notifier, task_executor) + return runtime.Runtime(compilation, store, + task_notifier, task_executor) + +class RunnerTest(test.TestCase, _RunnerTestMixin): def test_running(self): flow = lf.Flow("root") flow.add(*test_utils.make_many(1)) rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.is_running()) + self.assertTrue(rt.runner.runnable()) rt = self._make_runtime(flow, initial_state=st.SUSPENDED) - self.assertFalse(rt.runner.is_running()) + self.assertFalse(rt.runner.runnable()) def test_run_iterations(self): flow = lf.Flow("root") @@ -62,7 +67,7 @@ class RunnerTest(test.TestCase): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.is_running()) + self.assertTrue(rt.runner.runnable()) it = rt.runner.run_iter() state, failures = six.next(it) @@ -94,7 +99,7 @@ class RunnerTest(test.TestCase): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.is_running()) + self.assertTrue(rt.runner.runnable()) transitions = list(rt.runner.run_iter()) state, failures = transitions[-1] @@ -110,7 +115,7 @@ class RunnerTest(test.TestCase): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.is_running()) + self.assertTrue(rt.runner.runnable()) transitions = list(rt.runner.run_iter()) state, failures = transitions[-1] @@ -128,7 +133,7 @@ class RunnerTest(test.TestCase): flow.add(*tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.is_running()) + self.assertTrue(rt.runner.runnable()) transitions = [] for state, failures in rt.runner.run_iter(): @@ -152,7 +157,7 @@ class RunnerTest(test.TestCase): flow.add(*happy_tasks) rt = self._make_runtime(flow, initial_state=st.RUNNING) - self.assertTrue(rt.runner.is_running()) + self.assertTrue(rt.runner.runnable()) transitions = [] for state, failures in rt.runner.run_iter(): @@ -167,3 +172,136 @@ class RunnerTest(test.TestCase): rt.storage.get_atom_state(happy_tasks[0].name)) self.assertEqual(st.FAILURE, rt.storage.get_atom_state(sad_tasks[0].name)) + + +class RunnerBuilderTest(test.TestCase, _RunnerTestMixin): + def test_builder_manual_process(self): + flow = lf.Flow("root") + tasks = test_utils.make_many( + 1, task_cls=test_utils.TaskNoRequiresNoReturns) + flow.add(*tasks) + + rt = self._make_runtime(flow, initial_state=st.RUNNING) + machine, memory = rt.runner.builder.build() + self.assertTrue(rt.runner.builder.runnable()) + self.assertRaises(fsm.NotInitialized, machine.process_event, 'poke') + + # Should now be pending... + self.assertEqual(st.PENDING, rt.storage.get_atom_state(tasks[0].name)) + + machine.initialize() + self.assertEqual(runner._UNDEFINED, machine.current_state) + self.assertFalse(machine.terminated) + self.assertRaises(excp.NotFound, machine.process_event, 'poke') + last_state = machine.current_state + + reaction, terminal = machine.process_event('start') + self.assertFalse(terminal) + self.assertIsNotNone(reaction) + self.assertEqual(st.RESUMING, machine.current_state) + self.assertRaises(excp.NotFound, machine.process_event, 'poke') + + last_state = machine.current_state + cb, args, kwargs = reaction + next_event = cb(last_state, machine.current_state, + 'start', *args, **kwargs) + reaction, terminal = machine.process_event(next_event) + self.assertFalse(terminal) + self.assertIsNotNone(reaction) + self.assertEqual(st.SCHEDULING, machine.current_state) + self.assertRaises(excp.NotFound, machine.process_event, 'poke') + + last_state = machine.current_state + cb, args, kwargs = reaction + next_event = cb(last_state, machine.current_state, + next_event, *args, **kwargs) + reaction, terminal = machine.process_event(next_event) + self.assertFalse(terminal) + self.assertEqual(st.WAITING, machine.current_state) + self.assertRaises(excp.NotFound, machine.process_event, 'poke') + + # Should now be running... + self.assertEqual(st.RUNNING, rt.storage.get_atom_state(tasks[0].name)) + + last_state = machine.current_state + cb, args, kwargs = reaction + next_event = cb(last_state, machine.current_state, + next_event, *args, **kwargs) + reaction, terminal = machine.process_event(next_event) + self.assertFalse(terminal) + self.assertIsNotNone(reaction) + self.assertEqual(st.ANALYZING, machine.current_state) + self.assertRaises(excp.NotFound, machine.process_event, 'poke') + + last_state = machine.current_state + cb, args, kwargs = reaction + next_event = cb(last_state, machine.current_state, + next_event, *args, **kwargs) + reaction, terminal = machine.process_event(next_event) + self.assertFalse(terminal) + self.assertEqual(runner._GAME_OVER, machine.current_state) + + # Should now be done... + self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name)) + + def test_builder_automatic_process(self): + flow = lf.Flow("root") + tasks = test_utils.make_many( + 1, task_cls=test_utils.TaskNoRequiresNoReturns) + flow.add(*tasks) + + rt = self._make_runtime(flow, initial_state=st.RUNNING) + machine, memory = rt.runner.builder.build() + self.assertTrue(rt.runner.builder.runnable()) + + transitions = list(machine.run_iter('start')) + self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0]) + self.assertEqual((runner._GAME_OVER, st.SUCCESS), transitions[-1]) + self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name)) + + def test_builder_automatic_process_failure(self): + flow = lf.Flow("root") + tasks = test_utils.make_many(1, task_cls=test_utils.NastyFailingTask) + flow.add(*tasks) + + rt = self._make_runtime(flow, initial_state=st.RUNNING) + machine, memory = rt.runner.builder.build() + self.assertTrue(rt.runner.builder.runnable()) + + transitions = list(machine.run_iter('start')) + self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1]) + self.assertEqual(1, len(memory.failures)) + + def test_builder_automatic_process_reverted(self): + flow = lf.Flow("root") + tasks = test_utils.make_many(1, task_cls=test_utils.TaskWithFailure) + flow.add(*tasks) + + rt = self._make_runtime(flow, initial_state=st.RUNNING) + machine, memory = rt.runner.builder.build() + self.assertTrue(rt.runner.builder.runnable()) + + transitions = list(machine.run_iter('start')) + self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1]) + self.assertEqual(st.REVERTED, rt.storage.get_atom_state(tasks[0].name)) + + def test_builder_expected_transition_occurrences(self): + flow = lf.Flow("root") + tasks = test_utils.make_many( + 10, task_cls=test_utils.TaskNoRequiresNoReturns) + flow.add(*tasks) + + rt = self._make_runtime(flow, initial_state=st.RUNNING) + machine, memory = rt.runner.builder.build() + transitions = list(machine.run_iter('start')) + + occurrences = dict((t, transitions.count(t)) for t in transitions) + self.assertEqual(10, occurrences.get((st.SCHEDULING, st.WAITING))) + self.assertEqual(10, occurrences.get((st.WAITING, st.ANALYZING))) + self.assertEqual(9, occurrences.get((st.ANALYZING, st.SCHEDULING))) + self.assertEqual(1, occurrences.get((runner._GAME_OVER, st.SUCCESS))) + self.assertEqual(1, occurrences.get((runner._UNDEFINED, st.RESUMING))) + + self.assertEqual(0, len(memory.next_nodes)) + self.assertEqual(0, len(memory.not_done)) + self.assertEqual(0, len(memory.failures)) diff --git a/taskflow/tests/unit/test_types.py b/taskflow/tests/unit/test_types.py index e17a9fe..141cdfc 100644 --- a/taskflow/tests/unit/test_types.py +++ b/taskflow/tests/unit/test_types.py @@ -17,8 +17,11 @@ import time import networkx as nx +import six +from taskflow import exceptions as excp from taskflow import test +from taskflow.types import fsm from taskflow.types import graph from taskflow.types import timing as tt from taskflow.types import tree @@ -117,7 +120,7 @@ class TreeTest(test.TestCase): 'primate', 'monkey', 'human']), set(things)) -class StopWatchUtilsTest(test.TestCase): +class StopWatchTest(test.TestCase): def test_no_states(self): watch = tt.StopWatch() self.assertRaises(RuntimeError, watch.stop) @@ -156,3 +159,148 @@ class StopWatchUtilsTest(test.TestCase): with tt.StopWatch() as watch: time.sleep(0.05) self.assertGreater(0.01, watch.elapsed()) + + +class FSMTest(test.TestCase): + def setUp(self): + super(FSMTest, self).setUp() + # NOTE(harlowja): this state machine will never stop if run() is used. + self.jumper = fsm.FSM("down") + self.jumper.add_state('up') + self.jumper.add_state('down') + self.jumper.add_transition('down', 'up', 'jump') + self.jumper.add_transition('up', 'down', 'fall') + self.jumper.add_reaction('up', 'jump', lambda *args: 'fall') + self.jumper.add_reaction('down', 'fall', lambda *args: 'jump') + + def test_bad_start_state(self): + m = fsm.FSM('unknown') + self.assertRaises(excp.NotFound, m.run, 'unknown') + + def test_contains(self): + m = fsm.FSM('unknown') + self.assertNotIn('unknown', m) + m.add_state('unknown') + self.assertIn('unknown', m) + + def test_duplicate_state(self): + m = fsm.FSM('unknown') + m.add_state('unknown') + self.assertRaises(excp.Duplicate, m.add_state, 'unknown') + + def test_duplicate_reaction(self): + self.assertRaises( + # Currently duplicate reactions are not allowed... + excp.Duplicate, + self.jumper.add_reaction, 'down', 'fall', lambda *args: 'skate') + + def test_bad_transition(self): + m = fsm.FSM('unknown') + m.add_state('unknown') + m.add_state('fire') + self.assertRaises(excp.NotFound, m.add_transition, + 'unknown', 'something', 'boom') + self.assertRaises(excp.NotFound, m.add_transition, + 'something', 'unknown', 'boom') + + def test_bad_reaction(self): + m = fsm.FSM('unknown') + m.add_state('unknown') + self.assertRaises(excp.NotFound, m.add_reaction, 'something', 'boom', + lambda *args: 'cough') + + def test_run(self): + m = fsm.FSM('down') + m.add_state('down') + m.add_state('up') + m.add_state('broken', terminal=True) + m.add_transition('down', 'up', 'jump') + m.add_transition('up', 'broken', 'hit-wall') + m.add_reaction('up', 'jump', lambda *args: 'hit-wall') + self.assertEqual(['broken', 'down', 'up'], sorted(m.states)) + self.assertEqual(2, m.events) + m.initialize() + self.assertEqual('down', m.current_state) + self.assertFalse(m.terminated) + m.run('jump') + self.assertTrue(m.terminated) + self.assertEqual('broken', m.current_state) + self.assertRaises(excp.InvalidState, m.run, 'jump', initialize=False) + + def test_on_enter_on_exit(self): + enter_transitions = [] + exit_transitions = [] + + def on_exit(state, event): + exit_transitions.append((state, event)) + + def on_enter(state, event): + enter_transitions.append((state, event)) + + m = fsm.FSM('start') + m.add_state('start', on_exit=on_exit) + m.add_state('down', on_enter=on_enter, on_exit=on_exit) + m.add_state('up', on_enter=on_enter, on_exit=on_exit) + m.add_transition('start', 'down', 'beat') + m.add_transition('down', 'up', 'jump') + m.add_transition('up', 'down', 'fall') + + m.initialize() + m.process_event('beat') + m.process_event('jump') + m.process_event('fall') + self.assertEqual([('down', 'beat'), + ('up', 'jump'), ('down', 'fall')], enter_transitions) + self.assertEqual([('down', 'jump'), ('up', 'fall')], exit_transitions) + + def test_run_iter(self): + up_downs = [] + for (old_state, new_state) in self.jumper.run_iter('jump'): + up_downs.append((old_state, new_state)) + if len(up_downs) >= 3: + break + self.assertEqual([('down', 'up'), ('up', 'down'), ('down', 'up')], + up_downs) + self.assertFalse(self.jumper.terminated) + self.assertEqual('up', self.jumper.current_state) + self.jumper.process_event('fall') + self.assertEqual('down', self.jumper.current_state) + + def test_run_send(self): + up_downs = [] + it = self.jumper.run_iter('jump') + while True: + up_downs.append(it.send(None)) + if len(up_downs) >= 3: + it.close() + break + self.assertEqual('up', self.jumper.current_state) + self.assertFalse(self.jumper.terminated) + self.assertEqual([('down', 'up'), ('up', 'down'), ('down', 'up')], + up_downs) + self.assertRaises(StopIteration, six.next, it) + + def test_run_send_fail(self): + up_downs = [] + it = self.jumper.run_iter('jump') + up_downs.append(six.next(it)) + self.assertRaises(excp.NotFound, it.send, 'fail') + it.close() + self.assertEqual([('down', 'up')], up_downs) + + def test_not_initialized(self): + self.assertRaises(fsm.NotInitialized, + self.jumper.process_event, 'jump') + + def test_iter(self): + transitions = list(self.jumper) + self.assertEqual(2, len(transitions)) + self.assertIn(('up', 'fall', 'down'), transitions) + self.assertIn(('down', 'jump', 'up'), transitions) + + def test_invalid_callbacks(self): + m = fsm.FSM('working') + m.add_state('working') + m.add_state('broken') + self.assertRaises(AssertionError, m.add_state, 'b', on_enter=2) + self.assertRaises(AssertionError, m.add_state, 'b', on_exit=2) |
