diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 21:15:13 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 21:31:51 +0200 |
commit | 1b27292936c81637f6b9a7141dafaad1126f268e (patch) | |
tree | f629d098429099934a35798312e6e0660df0d677 /test/git/async/test_pool.py | |
parent | b3cde0ee162b8f0cb67da981311c8f9c16050a62 (diff) | |
download | gitpython-1b27292936c81637f6b9a7141dafaad1126f268e.tar.gz |
Plenty of fixes in the chunking routine, made possible by a serialized chunking test. Next up, actual async processing
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r-- | test/git/async/test_pool.py | 169 |
1 files changed, 147 insertions, 22 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 05943c8b..65b2d228 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -3,21 +3,22 @@ from test.testlib import * from git.async.pool import * from git.async.task import * from git.async.util import cpu_count - +import threading import time class TestThreadTaskNode(InputIteratorThreadTask): def __init__(self, *args, **kwargs): super(TestThreadTaskNode, self).__init__(*args, **kwargs) - self.reset() + self.reset(self._iterator) def do_fun(self, item): self.item_count += 1 return item - def reset(self): + def reset(self, iterator): self.process_count = 0 self.item_count = 0 + self._iterator = iterator def process(self, count=1): super(TestThreadTaskNode, self).process(count) @@ -36,6 +37,111 @@ class TestThreadPool(TestBase): max_threads = cpu_count() + def _assert_sync_single_task(self, p): + """Performs testing in a synchronized environment""" + null_tasks = p.num_tasks() # in case we had some before + + # add a simple task + # it iterates n items + ni = 20 + assert ni % 2 == 0, "ni needs to be dividable by 2" + + def make_iter(): + return iter(range(ni)) + # END utility + + task = TestThreadTaskNode(make_iter(), 'iterator', None) + task.fun = task.do_fun + + assert p.num_tasks() == null_tasks + rc = p.add_task(task) + assert p.num_tasks() == 1 + null_tasks + assert isinstance(rc, RPoolChannel) + assert task._out_wc is not None + + # pull the result completely - we should get one task, which calls its + # function once. In serial mode, the order matches + items = rc.read() + task._assert(1, ni).reset(make_iter()) + assert len(items) == ni + assert items[0] == 0 and items[-1] == ni-1 + + # as the task is done, it should have been removed - we have read everything + assert task.is_done() + assert p.num_tasks() == null_tasks + + # pull individual items + rc = p.add_task(task) + assert p.num_tasks() == 1 + null_tasks + for i in range(ni): + items = rc.read(1) + assert len(items) == 1 + assert i == items[0] + # END for each item + # it couldn't yet notice that the input is depleted as we pulled exaclty + # ni items - the next one would remove it. Instead, we delete our channel + # which triggers orphan handling + assert p.num_tasks() == 1 + null_tasks + del(rc) + assert p.num_tasks() == null_tasks + + task.reset(make_iter()) + + # test min count + # if we query 1 item, it will prepare ni / 2 + task.min_count = ni / 2 + rc = p.add_task(task) + assert len(rc.read(1)) == 1 # 1 + assert len(rc.read(1)) == 1 + assert len(rc.read(ni-2)) == ni - 2 # rest - it has ni/2 - 2 on the queue, and pulls ni-2 + task._assert(2, ni) # two chunks, 20 calls ( all items ) + assert p.num_tasks() == 1 + null_tasks # it still doesn't know, didn't read too much + assert len(rc.read()) == 0 # now we read too much and its done + assert p.num_tasks() == null_tasks + + # test chunking + # we always want 4 chunks, these could go to individual nodes + task.reset(make_iter()) + task.max_chunksize = ni / 4 # 4 chunks + rc = p.add_task(task) + # must read a specific item count + # count is still at ni / 2 - here we want more than that + assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2 # make sure its uneven ;) + assert len(rc.read(ni / 2 - 2)) == ni / 2 - 2 + + # END read chunks + task._assert(ni / 4, ni) # read two times, got 4 processing steps + assert p.num_tasks() == null_tasks # depleted + + # but this only hits if we want too many items, if we want less, it could + # still do too much - hence we set the min_count to the same number to enforce + # at least ni / 4 items to be preocessed, no matter what we request + task.reset(make_iter()) + task.min_count = None + rc = p.add_task(task) + for i in range(ni): + assert rc.read(1)[0] == i + # END pull individual items + # too many processing counts ;) + task._assert(ni, ni) + assert p.num_tasks() == 1 + null_tasks + assert p.del_task(task) is p # del manually this time + assert p.num_tasks() == null_tasks + + # now with we set the minimum count to reduce the number of processing counts + task.reset(make_iter()) + task.min_count = ni / 4 + rc = p.add_task(task) + for i in range(ni): + assert rc.read(1)[0] == i + # END for each item + task._assert(ni / 4, ni) + del(rc) + assert p.num_tasks() == null_tasks + + def _assert_async_dependent_tasks(self, p): + pass + def test_base(self): p = ThreadPool() @@ -50,30 +156,49 @@ class TestThreadPool(TestBase): p.set_size(i) assert p.size() == i - # currently in serial mode ! + # SINGLE TASK SERIAL SYNC MODE + ############################## + # put a few unrelated tasks that we forget about + urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) + urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) + assert p.num_tasks() == 2 + self._assert_sync_single_task(p) + assert p.num_tasks() == 2 + del(urc1) + del(urc2) + assert p.num_tasks() == 0 - # add a simple task - # it iterates n items - ni = 20 - task = TestThreadTaskNode(iter(range(ni)), 'iterator', None) - task.fun = task.do_fun - assert p.num_tasks() == 0 - rc = p.add_task(task) - assert p.num_tasks() == 1 - assert isinstance(rc, RPoolChannel) - assert task._out_wc is not None + # DEPENDENT TASKS SERIAL + ######################## + self._assert_async_dependent_tasks(p) + + + # SINGLE TASK THREADED SYNC MODE + ################################ + # step one gear up - just one thread for now. + num_threads = len(threading.enumerate()) + p.set_size(1) + assert len(threading.enumerate()) == num_threads + 1 + # deleting the pool stops its threads - just to be sure ;) + del(p) + assert len(threading.enumerate()) == num_threads + + p = ThreadPool(1) + assert len(threading.enumerate()) == num_threads + 1 + + # here we go + self._assert_sync_single_task(p) + - # pull the result completely - we should get one task, which calls its - # function once. In serial mode, the order matches - items = rc.read() - task._assert(1, ni).reset() - assert len(items) == ni - assert items[0] == 0 and items[-1] == ni-1 + # SINGLE TASK ASYNC MODE + ######################## + # two threads to compete for a single task - # switch to threaded mode - just one thread for now - # two threads to compete for tasks + # DEPENDENT TASK ASYNC MODE + ########################### + # self._assert_async_dependent_tasks(p) |