diff options
| author | Ivan A. Melnikov <imelnikov@griddynamics.com> | 2014-03-21 17:07:03 +0400 |
|---|---|---|
| committer | Ivan A. Melnikov <imelnikov@griddynamics.com> | 2014-03-22 19:22:00 +0400 |
| commit | 261d69a75915f869b2e7bf351c47f202c725837c (patch) | |
| tree | 256c4686b20bff1388f2abd55ce3a71044fa49a9 /taskflow/tests/unit/patterns/test_graph_flow.py | |
| parent | d162c82d44d1a6f4a7833e5f14c438f1a480239e (diff) | |
| download | taskflow-261d69a75915f869b2e7bf351c47f202c725837c.tar.gz | |
Rework graph flow unit tests
This commit adds unit tests that check graph flow methods without
executing or flattening it. Now-redundant tests from other test
suites are deleted.
Change-Id: I8dafe0f9b295428831eddb3f9fd48f042d2f1ffc
Diffstat (limited to 'taskflow/tests/unit/patterns/test_graph_flow.py')
| -rw-r--r-- | taskflow/tests/unit/patterns/test_graph_flow.py | 259 |
1 files changed, 259 insertions, 0 deletions
diff --git a/taskflow/tests/unit/patterns/test_graph_flow.py b/taskflow/tests/unit/patterns/test_graph_flow.py new file mode 100644 index 0000000..2a95ad2 --- /dev/null +++ b/taskflow/tests/unit/patterns/test_graph_flow.py @@ -0,0 +1,259 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from taskflow import exceptions as exc +from taskflow.patterns import graph_flow as gf +from taskflow import retry + +from taskflow import test +from taskflow.tests import utils + + +def _task(name, provides=None, requires=None): + return utils.ProvidesRequiresTask(name, provides, requires) + + +class GraphFlowTest(test.TestCase): + + def test_graph_flow_starts_as_empty(self): + f = gf.Flow('test') + + self.assertEqual(len(f), 0) + self.assertEqual(list(f), []) + self.assertEqual(list(f.iter_links()), []) + + self.assertEqual(f.requires, set()) + self.assertEqual(f.provides, set()) + + expected = 'taskflow.patterns.graph_flow.Flow: test; 0' + self.assertEqual(str(f), expected) + + def test_graph_flow_add_nothing(self): + f = gf.Flow('test') + result = f.add() + self.assertIs(f, result) + self.assertEqual(len(f), 0) + + def test_graph_flow_one_task(self): + f = gf.Flow('test') + task = _task(name='task1', requires=['a', 'b'], provides=['c', 'd']) + result = f.add(task) + + self.assertIs(f, result) + + self.assertEqual(len(f), 1) + self.assertEqual(list(f), [task]) + self.assertEqual(list(f.iter_links()), []) + self.assertEqual(f.requires, set(['a', 'b'])) + self.assertEqual(f.provides, set(['c', 'd'])) + + def test_graph_flow_two_independent_tasks(self): + task1 = _task(name='task1') + task2 = _task(name='task2') + f = gf.Flow('test').add(task1, task2) + + self.assertEqual(len(f), 2) + self.assertItemsEqual(f, [task1, task2]) + self.assertEqual(list(f.iter_links()), []) + + def test_graph_flow_two_dependent_tasks(self): + task1 = _task(name='task1', provides=['a']) + task2 = _task(name='task2', requires=['a']) + f = gf.Flow('test').add(task1, task2) + + self.assertEqual(len(f), 2) + self.assertItemsEqual(f, [task1, task2]) + self.assertEqual(list(f.iter_links()), [ + (task1, task2, {'reasons': set(['a'])}) + ]) + + self.assertEqual(f.requires, set()) + self.assertEqual(f.provides, set(['a'])) + + def test_graph_flow_two_dependent_tasks_two_different_calls(self): + task1 = _task(name='task1', provides=['a']) + task2 = _task(name='task2', requires=['a']) + f = gf.Flow('test').add(task1).add(task2) + + self.assertEqual(len(f), 2) + self.assertItemsEqual(f, [task1, task2]) + self.assertEqual(list(f.iter_links()), [ + (task1, task2, {'reasons': set(['a'])}) + ]) + + def test_graph_flow_two_task_same_provide(self): + task1 = _task(name='task1', provides=['a', 'b']) + task2 = _task(name='task2', provides=['a', 'c']) + f = gf.Flow('test') + self.assertRaises(exc.DependencyFailure, f.add, task2, task1) + + def test_graph_flow_with_retry(self): + ret = retry.AlwaysRevert(requires=['a'], provides=['b']) + f = gf.Flow('test', ret) + self.assertIs(f.retry, ret) + self.assertEqual(ret.name, 'test_retry') + + self.assertEqual(f.requires, set(['a'])) + self.assertEqual(f.provides, set(['b'])) + + def test_graph_flow_ordering(self): + task1 = _task('task1', provides=set(['a', 'b'])) + task2 = _task('task2', provides=['c'], requires=['a', 'b']) + task3 = _task('task3', provides=[], requires=['c']) + f = gf.Flow('test').add(task1, task2, task3) + + self.assertEqual(3, len(f)) + + self.assertItemsEqual(list(f.iter_links()), [ + (task1, task2, {'reasons': set(['a', 'b'])}), + (task2, task3, {'reasons': set(['c'])}) + ]) + + def test_graph_flow_links(self): + task1 = _task('task1') + task2 = _task('task2') + f = gf.Flow('test').add(task1, task2) + linked = f.link(task1, task2) + self.assertIs(linked, f) + self.assertItemsEqual(list(f.iter_links()), [ + (task1, task2, {'manual': True}) + ]) + + def test_graph_flow_links_and_dependencies(self): + task1 = _task('task1', provides=['a']) + task2 = _task('task2', requires=['a']) + f = gf.Flow('test').add(task1, task2) + linked = f.link(task1, task2) + self.assertIs(linked, f) + expected_meta = { + 'manual': True, + 'reasons': set(['a']) + } + self.assertItemsEqual(list(f.iter_links()), [ + (task1, task2, expected_meta) + ]) + + def test_graph_flow_link_from_unknown_node(self): + task1 = _task('task1') + task2 = _task('task2') + f = gf.Flow('test').add(task2) + self.assertRaisesRegexp(ValueError, 'Item .* not found to link from', + f.link, task1, task2) + + def test_graph_flow_link_to_unknown_node(self): + task1 = _task('task1') + task2 = _task('task2') + f = gf.Flow('test').add(task1) + self.assertRaisesRegexp(ValueError, 'Item .* not found to link to', + f.link, task1, task2) + + def test_graph_flow_link_raises_on_cycle(self): + task1 = _task('task1', provides=['a']) + task2 = _task('task2', requires=['a']) + f = gf.Flow('test').add(task1, task2) + self.assertRaises(exc.DependencyFailure, f.link, task2, task1) + + def test_graph_flow_link_raises_on_link_cycle(self): + task1 = _task('task1') + task2 = _task('task2') + f = gf.Flow('test').add(task1, task2) + f.link(task1, task2) + self.assertRaises(exc.DependencyFailure, f.link, task2, task1) + + def test_graph_flow_dependency_cycle(self): + task1 = _task('task1', provides=['a'], requires=['c']) + task2 = _task('task2', provides=['b'], requires=['a']) + task3 = _task('task3', provides=['c'], requires=['b']) + f = gf.Flow('test').add(task1, task2) + self.assertRaises(exc.DependencyFailure, f.add, task3) + + +class TargetedGraphFlowTest(test.TestCase): + + def test_targeted_flow_restricts(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=['a'], requires=[]) + task2 = _task('task2', provides=['b'], requires=['a']) + task3 = _task('task3', provides=[], requires=['b']) + task4 = _task('task4', provides=[], requires=['b']) + f.add(task1, task2, task3, task4) + f.set_target(task3) + self.assertEqual(len(f), 3) + self.assertItemsEqual(f, [task1, task2, task3]) + self.assertNotIn('c', f.provides) + + def test_targeted_flow_reset(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=['a'], requires=[]) + task2 = _task('task2', provides=['b'], requires=['a']) + task3 = _task('task3', provides=[], requires=['b']) + task4 = _task('task4', provides=['c'], requires=['b']) + f.add(task1, task2, task3, task4) + f.set_target(task3) + f.reset_target() + self.assertEqual(len(f), 4) + self.assertItemsEqual(f, [task1, task2, task3, task4]) + self.assertIn('c', f.provides) + + def test_targeted_flow_bad_target(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=['a'], requires=[]) + task2 = _task('task2', provides=['b'], requires=['a']) + f.add(task1) + self.assertRaisesRegexp(ValueError, '^Item .* not found', + f.set_target, task2) + + def test_targeted_flow_one_node(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=['a'], requires=[]) + f.add(task1) + f.set_target(task1) + self.assertEqual(len(f), 1) + self.assertItemsEqual(f, [task1]) + + def test_recache_on_add(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=[], requires=['a']) + f.add(task1) + f.set_target(task1) + self.assertEqual(1, len(f)) + task2 = _task('task2', provides=['a'], requires=[]) + f.add(task2) + self.assertEqual(2, len(f)) + + def test_recache_on_add_no_deps(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=[], requires=[]) + f.add(task1) + f.set_target(task1) + self.assertEqual(1, len(f)) + task2 = _task('task2', provides=[], requires=[]) + f.add(task2) + self.assertEqual(1, len(f)) + + def test_recache_on_link(self): + f = gf.TargetedFlow("test") + task1 = _task('task1', provides=[], requires=[]) + task2 = _task('task2', provides=[], requires=[]) + f.add(task1, task2) + f.set_target(task1) + self.assertEqual(1, len(f)) + + f.link(task2, task1) + self.assertEqual(2, len(f)) + self.assertEqual(list(f.iter_links()), [ + (task2, task1, {'manual': True}) + ]) |
