summaryrefslogtreecommitdiff
path: root/taskflow/tests/unit
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-07-09 17:54:21 -0700
committerJoshua Harlow <harlowja@gmail.com>2014-09-07 08:49:15 -0700
commite1ef04492ebddab02de2f277079ae0c7448c3b40 (patch)
tree6e761c97ebcad5e804774515105e5e229707f323 /taskflow/tests/unit
parentdb9fee947dd6614774275ab11884bf5b9a8c1233 (diff)
downloadtaskflow-e1ef04492ebddab02de2f277079ae0c7448c3b40.tar.gz
Translate the engine runner into a well defined state-machine
Instead of having a ad-hoc state-machine being used to perform the various runtime actions (performed when a engine is ran) we can gain a much more explict execution model by translating that ad-hoc state machine to an explicit one instead... This commit does just that, it adds a new fsm type that can be used to create, define and run state-machines that respond to various events (internal or external) and uses it in the runner action engine module to run the previously ad-hc/implicit state-machine. Implements blueprint runner-state-machine Change-Id: Id35633a9de707f3ffb1a4b7e9619af1be009317f
Diffstat (limited to 'taskflow/tests/unit')
-rw-r--r--taskflow/tests/unit/action_engine/test_runner.py158
-rw-r--r--taskflow/tests/unit/test_types.py150
2 files changed, 297 insertions, 11 deletions
diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py
index d7927f8..2e18f6b 100644
--- a/taskflow/tests/unit/action_engine/test_runner.py
+++ b/taskflow/tests/unit/action_engine/test_runner.py
@@ -18,17 +18,20 @@ import six
from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import executor
+from taskflow.engines.action_engine import runner
from taskflow.engines.action_engine import runtime
+from taskflow import exceptions as excp
from taskflow.patterns import linear_flow as lf
from taskflow import states as st
from taskflow import storage
from taskflow import test
from taskflow.tests import utils as test_utils
+from taskflow.types import fsm
from taskflow.utils import misc
from taskflow.utils import persistence_utils as pu
-class RunnerTest(test.TestCase):
+class _RunnerTestMixin(object):
def _make_runtime(self, flow, initial_state=None):
compilation = compiler.PatternCompiler().compile(flow)
flow_detail = pu.create_flow_detail(flow)
@@ -42,18 +45,20 @@ class RunnerTest(test.TestCase):
task_executor = executor.SerialTaskExecutor()
task_executor.start()
self.addCleanup(task_executor.stop)
- return runtime.Runtime(compiler.PatternCompiler().compile(flow),
- store, task_notifier, task_executor)
+ return runtime.Runtime(compilation, store,
+ task_notifier, task_executor)
+
+class RunnerTest(test.TestCase, _RunnerTestMixin):
def test_running(self):
flow = lf.Flow("root")
flow.add(*test_utils.make_many(1))
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- self.assertTrue(rt.runner.is_running())
+ self.assertTrue(rt.runner.runnable())
rt = self._make_runtime(flow, initial_state=st.SUSPENDED)
- self.assertFalse(rt.runner.is_running())
+ self.assertFalse(rt.runner.runnable())
def test_run_iterations(self):
flow = lf.Flow("root")
@@ -62,7 +67,7 @@ class RunnerTest(test.TestCase):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- self.assertTrue(rt.runner.is_running())
+ self.assertTrue(rt.runner.runnable())
it = rt.runner.run_iter()
state, failures = six.next(it)
@@ -94,7 +99,7 @@ class RunnerTest(test.TestCase):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- self.assertTrue(rt.runner.is_running())
+ self.assertTrue(rt.runner.runnable())
transitions = list(rt.runner.run_iter())
state, failures = transitions[-1]
@@ -110,7 +115,7 @@ class RunnerTest(test.TestCase):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- self.assertTrue(rt.runner.is_running())
+ self.assertTrue(rt.runner.runnable())
transitions = list(rt.runner.run_iter())
state, failures = transitions[-1]
@@ -128,7 +133,7 @@ class RunnerTest(test.TestCase):
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- self.assertTrue(rt.runner.is_running())
+ self.assertTrue(rt.runner.runnable())
transitions = []
for state, failures in rt.runner.run_iter():
@@ -152,7 +157,7 @@ class RunnerTest(test.TestCase):
flow.add(*happy_tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
- self.assertTrue(rt.runner.is_running())
+ self.assertTrue(rt.runner.runnable())
transitions = []
for state, failures in rt.runner.run_iter():
@@ -167,3 +172,136 @@ class RunnerTest(test.TestCase):
rt.storage.get_atom_state(happy_tasks[0].name))
self.assertEqual(st.FAILURE,
rt.storage.get_atom_state(sad_tasks[0].name))
+
+
+class RunnerBuilderTest(test.TestCase, _RunnerTestMixin):
+ def test_builder_manual_process(self):
+ flow = lf.Flow("root")
+ tasks = test_utils.make_many(
+ 1, task_cls=test_utils.TaskNoRequiresNoReturns)
+ flow.add(*tasks)
+
+ rt = self._make_runtime(flow, initial_state=st.RUNNING)
+ machine, memory = rt.runner.builder.build()
+ self.assertTrue(rt.runner.builder.runnable())
+ self.assertRaises(fsm.NotInitialized, machine.process_event, 'poke')
+
+ # Should now be pending...
+ self.assertEqual(st.PENDING, rt.storage.get_atom_state(tasks[0].name))
+
+ machine.initialize()
+ self.assertEqual(runner._UNDEFINED, machine.current_state)
+ self.assertFalse(machine.terminated)
+ self.assertRaises(excp.NotFound, machine.process_event, 'poke')
+ last_state = machine.current_state
+
+ reaction, terminal = machine.process_event('start')
+ self.assertFalse(terminal)
+ self.assertIsNotNone(reaction)
+ self.assertEqual(st.RESUMING, machine.current_state)
+ self.assertRaises(excp.NotFound, machine.process_event, 'poke')
+
+ last_state = machine.current_state
+ cb, args, kwargs = reaction
+ next_event = cb(last_state, machine.current_state,
+ 'start', *args, **kwargs)
+ reaction, terminal = machine.process_event(next_event)
+ self.assertFalse(terminal)
+ self.assertIsNotNone(reaction)
+ self.assertEqual(st.SCHEDULING, machine.current_state)
+ self.assertRaises(excp.NotFound, machine.process_event, 'poke')
+
+ last_state = machine.current_state
+ cb, args, kwargs = reaction
+ next_event = cb(last_state, machine.current_state,
+ next_event, *args, **kwargs)
+ reaction, terminal = machine.process_event(next_event)
+ self.assertFalse(terminal)
+ self.assertEqual(st.WAITING, machine.current_state)
+ self.assertRaises(excp.NotFound, machine.process_event, 'poke')
+
+ # Should now be running...
+ self.assertEqual(st.RUNNING, rt.storage.get_atom_state(tasks[0].name))
+
+ last_state = machine.current_state
+ cb, args, kwargs = reaction
+ next_event = cb(last_state, machine.current_state,
+ next_event, *args, **kwargs)
+ reaction, terminal = machine.process_event(next_event)
+ self.assertFalse(terminal)
+ self.assertIsNotNone(reaction)
+ self.assertEqual(st.ANALYZING, machine.current_state)
+ self.assertRaises(excp.NotFound, machine.process_event, 'poke')
+
+ last_state = machine.current_state
+ cb, args, kwargs = reaction
+ next_event = cb(last_state, machine.current_state,
+ next_event, *args, **kwargs)
+ reaction, terminal = machine.process_event(next_event)
+ self.assertFalse(terminal)
+ self.assertEqual(runner._GAME_OVER, machine.current_state)
+
+ # Should now be done...
+ self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name))
+
+ def test_builder_automatic_process(self):
+ flow = lf.Flow("root")
+ tasks = test_utils.make_many(
+ 1, task_cls=test_utils.TaskNoRequiresNoReturns)
+ flow.add(*tasks)
+
+ rt = self._make_runtime(flow, initial_state=st.RUNNING)
+ machine, memory = rt.runner.builder.build()
+ self.assertTrue(rt.runner.builder.runnable())
+
+ transitions = list(machine.run_iter('start'))
+ self.assertEqual((runner._UNDEFINED, st.RESUMING), transitions[0])
+ self.assertEqual((runner._GAME_OVER, st.SUCCESS), transitions[-1])
+ self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name))
+
+ def test_builder_automatic_process_failure(self):
+ flow = lf.Flow("root")
+ tasks = test_utils.make_many(1, task_cls=test_utils.NastyFailingTask)
+ flow.add(*tasks)
+
+ rt = self._make_runtime(flow, initial_state=st.RUNNING)
+ machine, memory = rt.runner.builder.build()
+ self.assertTrue(rt.runner.builder.runnable())
+
+ transitions = list(machine.run_iter('start'))
+ self.assertEqual((runner._GAME_OVER, st.FAILURE), transitions[-1])
+ self.assertEqual(1, len(memory.failures))
+
+ def test_builder_automatic_process_reverted(self):
+ flow = lf.Flow("root")
+ tasks = test_utils.make_many(1, task_cls=test_utils.TaskWithFailure)
+ flow.add(*tasks)
+
+ rt = self._make_runtime(flow, initial_state=st.RUNNING)
+ machine, memory = rt.runner.builder.build()
+ self.assertTrue(rt.runner.builder.runnable())
+
+ transitions = list(machine.run_iter('start'))
+ self.assertEqual((runner._GAME_OVER, st.REVERTED), transitions[-1])
+ self.assertEqual(st.REVERTED, rt.storage.get_atom_state(tasks[0].name))
+
+ def test_builder_expected_transition_occurrences(self):
+ flow = lf.Flow("root")
+ tasks = test_utils.make_many(
+ 10, task_cls=test_utils.TaskNoRequiresNoReturns)
+ flow.add(*tasks)
+
+ rt = self._make_runtime(flow, initial_state=st.RUNNING)
+ machine, memory = rt.runner.builder.build()
+ transitions = list(machine.run_iter('start'))
+
+ occurrences = dict((t, transitions.count(t)) for t in transitions)
+ self.assertEqual(10, occurrences.get((st.SCHEDULING, st.WAITING)))
+ self.assertEqual(10, occurrences.get((st.WAITING, st.ANALYZING)))
+ self.assertEqual(9, occurrences.get((st.ANALYZING, st.SCHEDULING)))
+ self.assertEqual(1, occurrences.get((runner._GAME_OVER, st.SUCCESS)))
+ self.assertEqual(1, occurrences.get((runner._UNDEFINED, st.RESUMING)))
+
+ self.assertEqual(0, len(memory.next_nodes))
+ self.assertEqual(0, len(memory.not_done))
+ self.assertEqual(0, len(memory.failures))
diff --git a/taskflow/tests/unit/test_types.py b/taskflow/tests/unit/test_types.py
index e17a9fe..141cdfc 100644
--- a/taskflow/tests/unit/test_types.py
+++ b/taskflow/tests/unit/test_types.py
@@ -17,8 +17,11 @@
import time
import networkx as nx
+import six
+from taskflow import exceptions as excp
from taskflow import test
+from taskflow.types import fsm
from taskflow.types import graph
from taskflow.types import timing as tt
from taskflow.types import tree
@@ -117,7 +120,7 @@ class TreeTest(test.TestCase):
'primate', 'monkey', 'human']), set(things))
-class StopWatchUtilsTest(test.TestCase):
+class StopWatchTest(test.TestCase):
def test_no_states(self):
watch = tt.StopWatch()
self.assertRaises(RuntimeError, watch.stop)
@@ -156,3 +159,148 @@ class StopWatchUtilsTest(test.TestCase):
with tt.StopWatch() as watch:
time.sleep(0.05)
self.assertGreater(0.01, watch.elapsed())
+
+
+class FSMTest(test.TestCase):
+ def setUp(self):
+ super(FSMTest, self).setUp()
+ # NOTE(harlowja): this state machine will never stop if run() is used.
+ self.jumper = fsm.FSM("down")
+ self.jumper.add_state('up')
+ self.jumper.add_state('down')
+ self.jumper.add_transition('down', 'up', 'jump')
+ self.jumper.add_transition('up', 'down', 'fall')
+ self.jumper.add_reaction('up', 'jump', lambda *args: 'fall')
+ self.jumper.add_reaction('down', 'fall', lambda *args: 'jump')
+
+ def test_bad_start_state(self):
+ m = fsm.FSM('unknown')
+ self.assertRaises(excp.NotFound, m.run, 'unknown')
+
+ def test_contains(self):
+ m = fsm.FSM('unknown')
+ self.assertNotIn('unknown', m)
+ m.add_state('unknown')
+ self.assertIn('unknown', m)
+
+ def test_duplicate_state(self):
+ m = fsm.FSM('unknown')
+ m.add_state('unknown')
+ self.assertRaises(excp.Duplicate, m.add_state, 'unknown')
+
+ def test_duplicate_reaction(self):
+ self.assertRaises(
+ # Currently duplicate reactions are not allowed...
+ excp.Duplicate,
+ self.jumper.add_reaction, 'down', 'fall', lambda *args: 'skate')
+
+ def test_bad_transition(self):
+ m = fsm.FSM('unknown')
+ m.add_state('unknown')
+ m.add_state('fire')
+ self.assertRaises(excp.NotFound, m.add_transition,
+ 'unknown', 'something', 'boom')
+ self.assertRaises(excp.NotFound, m.add_transition,
+ 'something', 'unknown', 'boom')
+
+ def test_bad_reaction(self):
+ m = fsm.FSM('unknown')
+ m.add_state('unknown')
+ self.assertRaises(excp.NotFound, m.add_reaction, 'something', 'boom',
+ lambda *args: 'cough')
+
+ def test_run(self):
+ m = fsm.FSM('down')
+ m.add_state('down')
+ m.add_state('up')
+ m.add_state('broken', terminal=True)
+ m.add_transition('down', 'up', 'jump')
+ m.add_transition('up', 'broken', 'hit-wall')
+ m.add_reaction('up', 'jump', lambda *args: 'hit-wall')
+ self.assertEqual(['broken', 'down', 'up'], sorted(m.states))
+ self.assertEqual(2, m.events)
+ m.initialize()
+ self.assertEqual('down', m.current_state)
+ self.assertFalse(m.terminated)
+ m.run('jump')
+ self.assertTrue(m.terminated)
+ self.assertEqual('broken', m.current_state)
+ self.assertRaises(excp.InvalidState, m.run, 'jump', initialize=False)
+
+ def test_on_enter_on_exit(self):
+ enter_transitions = []
+ exit_transitions = []
+
+ def on_exit(state, event):
+ exit_transitions.append((state, event))
+
+ def on_enter(state, event):
+ enter_transitions.append((state, event))
+
+ m = fsm.FSM('start')
+ m.add_state('start', on_exit=on_exit)
+ m.add_state('down', on_enter=on_enter, on_exit=on_exit)
+ m.add_state('up', on_enter=on_enter, on_exit=on_exit)
+ m.add_transition('start', 'down', 'beat')
+ m.add_transition('down', 'up', 'jump')
+ m.add_transition('up', 'down', 'fall')
+
+ m.initialize()
+ m.process_event('beat')
+ m.process_event('jump')
+ m.process_event('fall')
+ self.assertEqual([('down', 'beat'),
+ ('up', 'jump'), ('down', 'fall')], enter_transitions)
+ self.assertEqual([('down', 'jump'), ('up', 'fall')], exit_transitions)
+
+ def test_run_iter(self):
+ up_downs = []
+ for (old_state, new_state) in self.jumper.run_iter('jump'):
+ up_downs.append((old_state, new_state))
+ if len(up_downs) >= 3:
+ break
+ self.assertEqual([('down', 'up'), ('up', 'down'), ('down', 'up')],
+ up_downs)
+ self.assertFalse(self.jumper.terminated)
+ self.assertEqual('up', self.jumper.current_state)
+ self.jumper.process_event('fall')
+ self.assertEqual('down', self.jumper.current_state)
+
+ def test_run_send(self):
+ up_downs = []
+ it = self.jumper.run_iter('jump')
+ while True:
+ up_downs.append(it.send(None))
+ if len(up_downs) >= 3:
+ it.close()
+ break
+ self.assertEqual('up', self.jumper.current_state)
+ self.assertFalse(self.jumper.terminated)
+ self.assertEqual([('down', 'up'), ('up', 'down'), ('down', 'up')],
+ up_downs)
+ self.assertRaises(StopIteration, six.next, it)
+
+ def test_run_send_fail(self):
+ up_downs = []
+ it = self.jumper.run_iter('jump')
+ up_downs.append(six.next(it))
+ self.assertRaises(excp.NotFound, it.send, 'fail')
+ it.close()
+ self.assertEqual([('down', 'up')], up_downs)
+
+ def test_not_initialized(self):
+ self.assertRaises(fsm.NotInitialized,
+ self.jumper.process_event, 'jump')
+
+ def test_iter(self):
+ transitions = list(self.jumper)
+ self.assertEqual(2, len(transitions))
+ self.assertIn(('up', 'fall', 'down'), transitions)
+ self.assertIn(('down', 'jump', 'up'), transitions)
+
+ def test_invalid_callbacks(self):
+ m = fsm.FSM('working')
+ m.add_state('working')
+ m.add_state('broken')
+ self.assertRaises(AssertionError, m.add_state, 'b', on_enter=2)
+ self.assertRaises(AssertionError, m.add_state, 'b', on_exit=2)