summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r--test/git/async/test_pool.py169
1 files changed, 147 insertions, 22 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 05943c8b..65b2d228 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -3,21 +3,22 @@ from test.testlib import *
from git.async.pool import *
from git.async.task import *
from git.async.util import cpu_count
-
+import threading
import time
class TestThreadTaskNode(InputIteratorThreadTask):
def __init__(self, *args, **kwargs):
super(TestThreadTaskNode, self).__init__(*args, **kwargs)
- self.reset()
+ self.reset(self._iterator)
def do_fun(self, item):
self.item_count += 1
return item
- def reset(self):
+ def reset(self, iterator):
self.process_count = 0
self.item_count = 0
+ self._iterator = iterator
def process(self, count=1):
super(TestThreadTaskNode, self).process(count)
@@ -36,6 +37,111 @@ class TestThreadPool(TestBase):
max_threads = cpu_count()
+ def _assert_sync_single_task(self, p):
+ """Performs testing in a synchronized environment"""
+ null_tasks = p.num_tasks() # in case we had some before
+
+ # add a simple task
+ # it iterates n items
+ ni = 20
+ assert ni % 2 == 0, "ni needs to be dividable by 2"
+
+ def make_iter():
+ return iter(range(ni))
+ # END utility
+
+ task = TestThreadTaskNode(make_iter(), 'iterator', None)
+ task.fun = task.do_fun
+
+ assert p.num_tasks() == null_tasks
+ rc = p.add_task(task)
+ assert p.num_tasks() == 1 + null_tasks
+ assert isinstance(rc, RPoolChannel)
+ assert task._out_wc is not None
+
+ # pull the result completely - we should get one task, which calls its
+ # function once. In serial mode, the order matches
+ items = rc.read()
+ task._assert(1, ni).reset(make_iter())
+ assert len(items) == ni
+ assert items[0] == 0 and items[-1] == ni-1
+
+ # as the task is done, it should have been removed - we have read everything
+ assert task.is_done()
+ assert p.num_tasks() == null_tasks
+
+ # pull individual items
+ rc = p.add_task(task)
+ assert p.num_tasks() == 1 + null_tasks
+ for i in range(ni):
+ items = rc.read(1)
+ assert len(items) == 1
+ assert i == items[0]
+ # END for each item
+ # it couldn't yet notice that the input is depleted as we pulled exaclty
+ # ni items - the next one would remove it. Instead, we delete our channel
+ # which triggers orphan handling
+ assert p.num_tasks() == 1 + null_tasks
+ del(rc)
+ assert p.num_tasks() == null_tasks
+
+ task.reset(make_iter())
+
+ # test min count
+ # if we query 1 item, it will prepare ni / 2
+ task.min_count = ni / 2
+ rc = p.add_task(task)
+ assert len(rc.read(1)) == 1 # 1
+ assert len(rc.read(1)) == 1
+ assert len(rc.read(ni-2)) == ni - 2 # rest - it has ni/2 - 2 on the queue, and pulls ni-2
+ task._assert(2, ni) # two chunks, 20 calls ( all items )
+ assert p.num_tasks() == 1 + null_tasks # it still doesn't know, didn't read too much
+ assert len(rc.read()) == 0 # now we read too much and its done
+ assert p.num_tasks() == null_tasks
+
+ # test chunking
+ # we always want 4 chunks, these could go to individual nodes
+ task.reset(make_iter())
+ task.max_chunksize = ni / 4 # 4 chunks
+ rc = p.add_task(task)
+ # must read a specific item count
+ # count is still at ni / 2 - here we want more than that
+ assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2 # make sure its uneven ;)
+ assert len(rc.read(ni / 2 - 2)) == ni / 2 - 2
+
+ # END read chunks
+ task._assert(ni / 4, ni) # read two times, got 4 processing steps
+ assert p.num_tasks() == null_tasks # depleted
+
+ # but this only hits if we want too many items, if we want less, it could
+ # still do too much - hence we set the min_count to the same number to enforce
+ # at least ni / 4 items to be preocessed, no matter what we request
+ task.reset(make_iter())
+ task.min_count = None
+ rc = p.add_task(task)
+ for i in range(ni):
+ assert rc.read(1)[0] == i
+ # END pull individual items
+ # too many processing counts ;)
+ task._assert(ni, ni)
+ assert p.num_tasks() == 1 + null_tasks
+ assert p.del_task(task) is p # del manually this time
+ assert p.num_tasks() == null_tasks
+
+ # now with we set the minimum count to reduce the number of processing counts
+ task.reset(make_iter())
+ task.min_count = ni / 4
+ rc = p.add_task(task)
+ for i in range(ni):
+ assert rc.read(1)[0] == i
+ # END for each item
+ task._assert(ni / 4, ni)
+ del(rc)
+ assert p.num_tasks() == null_tasks
+
+ def _assert_async_dependent_tasks(self, p):
+ pass
+
def test_base(self):
p = ThreadPool()
@@ -50,30 +156,49 @@ class TestThreadPool(TestBase):
p.set_size(i)
assert p.size() == i
- # currently in serial mode !
+ # SINGLE TASK SERIAL SYNC MODE
+ ##############################
+ # put a few unrelated tasks that we forget about
+ urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
+ urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
+ assert p.num_tasks() == 2
+ self._assert_sync_single_task(p)
+ assert p.num_tasks() == 2
+ del(urc1)
+ del(urc2)
+ assert p.num_tasks() == 0
- # add a simple task
- # it iterates n items
- ni = 20
- task = TestThreadTaskNode(iter(range(ni)), 'iterator', None)
- task.fun = task.do_fun
- assert p.num_tasks() == 0
- rc = p.add_task(task)
- assert p.num_tasks() == 1
- assert isinstance(rc, RPoolChannel)
- assert task._out_wc is not None
+ # DEPENDENT TASKS SERIAL
+ ########################
+ self._assert_async_dependent_tasks(p)
+
+
+ # SINGLE TASK THREADED SYNC MODE
+ ################################
+ # step one gear up - just one thread for now.
+ num_threads = len(threading.enumerate())
+ p.set_size(1)
+ assert len(threading.enumerate()) == num_threads + 1
+ # deleting the pool stops its threads - just to be sure ;)
+ del(p)
+ assert len(threading.enumerate()) == num_threads
+
+ p = ThreadPool(1)
+ assert len(threading.enumerate()) == num_threads + 1
+
+ # here we go
+ self._assert_sync_single_task(p)
+
- # pull the result completely - we should get one task, which calls its
- # function once. In serial mode, the order matches
- items = rc.read()
- task._assert(1, ni).reset()
- assert len(items) == ni
- assert items[0] == 0 and items[-1] == ni-1
+ # SINGLE TASK ASYNC MODE
+ ########################
+ # two threads to compete for a single task
- # switch to threaded mode - just one thread for now
- # two threads to compete for tasks
+ # DEPENDENT TASK ASYNC MODE
+ ###########################
+ # self._assert_async_dependent_tasks(p)