diff options
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r-- | test/git/async/test_pool.py | 43 |
1 files changed, 32 insertions, 11 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 65b2d228..628e2a93 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -10,9 +10,12 @@ class TestThreadTaskNode(InputIteratorThreadTask): def __init__(self, *args, **kwargs): super(TestThreadTaskNode, self).__init__(*args, **kwargs) self.reset(self._iterator) + self.should_fail = False def do_fun(self, item): self.item_count += 1 + if self.should_fail: + raise AssertionError("I am failing just for the fun of it") return item def reset(self, iterator): @@ -29,7 +32,7 @@ class TestThreadTaskNode(InputIteratorThreadTask): :return: self""" assert self.process_count == pc assert self.item_count == fc - + assert not self.error() return self @@ -60,10 +63,10 @@ class TestThreadPool(TestBase): assert task._out_wc is not None # pull the result completely - we should get one task, which calls its - # function once. In serial mode, the order matches + # function once. In sync mode, the order matches items = rc.read() - task._assert(1, ni).reset(make_iter()) assert len(items) == ni + task._assert(1, ni).reset(make_iter()) assert items[0] == 0 and items[-1] == ni-1 # as the task is done, it should have been removed - we have read everything @@ -91,13 +94,17 @@ class TestThreadPool(TestBase): # if we query 1 item, it will prepare ni / 2 task.min_count = ni / 2 rc = p.add_task(task) - assert len(rc.read(1)) == 1 # 1 - assert len(rc.read(1)) == 1 - assert len(rc.read(ni-2)) == ni - 2 # rest - it has ni/2 - 2 on the queue, and pulls ni-2 - task._assert(2, ni) # two chunks, 20 calls ( all items ) - assert p.num_tasks() == 1 + null_tasks # it still doesn't know, didn't read too much - assert len(rc.read()) == 0 # now we read too much and its done + assert len(rc.read(1)) == 1 # processes ni / 2 + assert len(rc.read(1)) == 1 # processes nothing + # rest - it has ni/2 - 2 on the queue, and pulls ni-2 + # It wants too much, so the task realizes its done. The task + # doesn't care about the items in its output channel + assert len(rc.read(ni-2)) == ni - 2 assert p.num_tasks() == null_tasks + task._assert(2, ni) # two chunks, 20 calls ( all items ) + + # its already done, gives us no more + assert len(rc.read()) == 0 # test chunking # we always want 4 chunks, these could go to individual nodes @@ -135,11 +142,25 @@ class TestThreadPool(TestBase): for i in range(ni): assert rc.read(1)[0] == i # END for each item - task._assert(ni / 4, ni) + task._assert(ni / task.min_count, ni) del(rc) assert p.num_tasks() == null_tasks + # test failure + # on failure, the processing stops and the task is finished, keeping + # his error for later + task.reset(make_iter()) + task.should_fail = True + rc = p.add_task(task) + assert len(rc.read()) == 0 # failure on first item + assert isinstance(task.error(), AssertionError) + assert p.num_tasks() == null_tasks + def _assert_async_dependent_tasks(self, p): + # includes failure in center task, 'recursive' orphan cleanup + # This will also verify that the channel-close mechanism works + # t1 -> t2 -> t3 + # t1 -> x -> t3 pass def test_base(self): @@ -199,6 +220,6 @@ class TestThreadPool(TestBase): # DEPENDENT TASK ASYNC MODE ########################### - # self._assert_async_dependent_tasks(p) + self._assert_async_dependent_tasks(p) |