summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r--test/git/async/test_pool.py43
1 files changed, 32 insertions, 11 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 65b2d228..628e2a93 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -10,9 +10,12 @@ class TestThreadTaskNode(InputIteratorThreadTask):
def __init__(self, *args, **kwargs):
super(TestThreadTaskNode, self).__init__(*args, **kwargs)
self.reset(self._iterator)
+ self.should_fail = False
def do_fun(self, item):
self.item_count += 1
+ if self.should_fail:
+ raise AssertionError("I am failing just for the fun of it")
return item
def reset(self, iterator):
@@ -29,7 +32,7 @@ class TestThreadTaskNode(InputIteratorThreadTask):
:return: self"""
assert self.process_count == pc
assert self.item_count == fc
-
+ assert not self.error()
return self
@@ -60,10 +63,10 @@ class TestThreadPool(TestBase):
assert task._out_wc is not None
# pull the result completely - we should get one task, which calls its
- # function once. In serial mode, the order matches
+ # function once. In sync mode, the order matches
items = rc.read()
- task._assert(1, ni).reset(make_iter())
assert len(items) == ni
+ task._assert(1, ni).reset(make_iter())
assert items[0] == 0 and items[-1] == ni-1
# as the task is done, it should have been removed - we have read everything
@@ -91,13 +94,17 @@ class TestThreadPool(TestBase):
# if we query 1 item, it will prepare ni / 2
task.min_count = ni / 2
rc = p.add_task(task)
- assert len(rc.read(1)) == 1 # 1
- assert len(rc.read(1)) == 1
- assert len(rc.read(ni-2)) == ni - 2 # rest - it has ni/2 - 2 on the queue, and pulls ni-2
- task._assert(2, ni) # two chunks, 20 calls ( all items )
- assert p.num_tasks() == 1 + null_tasks # it still doesn't know, didn't read too much
- assert len(rc.read()) == 0 # now we read too much and its done
+ assert len(rc.read(1)) == 1 # processes ni / 2
+ assert len(rc.read(1)) == 1 # processes nothing
+ # rest - it has ni/2 - 2 on the queue, and pulls ni-2
+ # It wants too much, so the task realizes its done. The task
+ # doesn't care about the items in its output channel
+ assert len(rc.read(ni-2)) == ni - 2
assert p.num_tasks() == null_tasks
+ task._assert(2, ni) # two chunks, 20 calls ( all items )
+
+ # its already done, gives us no more
+ assert len(rc.read()) == 0
# test chunking
# we always want 4 chunks, these could go to individual nodes
@@ -135,11 +142,25 @@ class TestThreadPool(TestBase):
for i in range(ni):
assert rc.read(1)[0] == i
# END for each item
- task._assert(ni / 4, ni)
+ task._assert(ni / task.min_count, ni)
del(rc)
assert p.num_tasks() == null_tasks
+ # test failure
+ # on failure, the processing stops and the task is finished, keeping
+ # his error for later
+ task.reset(make_iter())
+ task.should_fail = True
+ rc = p.add_task(task)
+ assert len(rc.read()) == 0 # failure on first item
+ assert isinstance(task.error(), AssertionError)
+ assert p.num_tasks() == null_tasks
+
def _assert_async_dependent_tasks(self, p):
+ # includes failure in center task, 'recursive' orphan cleanup
+ # This will also verify that the channel-close mechanism works
+ # t1 -> t2 -> t3
+ # t1 -> x -> t3
pass
def test_base(self):
@@ -199,6 +220,6 @@ class TestThreadPool(TestBase):
# DEPENDENT TASK ASYNC MODE
###########################
- # self._assert_async_dependent_tasks(p)
+ self._assert_async_dependent_tasks(p)