summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-07 13:05:35 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 13:05:35 +0200
commit8c3c271b0d6b5f56b86e3f177caf3e916b509b52 (patch)
tree1a8341bb622d8ae18c6eb5061f5845619502e648 /test/git/async/test_pool.py
parent619662a9138fd78df02c52cae6dc89db1d70a0e5 (diff)
downloadgitpython-8c3c271b0d6b5f56b86e3f177caf3e916b509b52.tar.gz
Added task order cache, and a lock to prevent us walking the graph while changing tasks
Now processing more items to test performance, in dual-threaded mode as well, and its rather bad, have to figure out the reason for this, probably gil, but queues could help
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r--test/git/async/test_pool.py40
1 files changed, 31 insertions, 9 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 628e2a93..df3eaf11 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -40,14 +40,15 @@ class TestThreadPool(TestBase):
max_threads = cpu_count()
- def _assert_sync_single_task(self, p):
+ def _assert_single_task(self, p, async=False):
"""Performs testing in a synchronized environment"""
null_tasks = p.num_tasks() # in case we had some before
# add a simple task
# it iterates n items
- ni = 20
+ ni = 1000
assert ni % 2 == 0, "ni needs to be dividable by 2"
+ assert ni % 4 == 0, "ni needs to be dividable by 4"
def make_iter():
return iter(range(ni))
@@ -76,11 +77,18 @@ class TestThreadPool(TestBase):
# pull individual items
rc = p.add_task(task)
assert p.num_tasks() == 1 + null_tasks
+ st = time.time()
for i in range(ni):
items = rc.read(1)
assert len(items) == 1
- assert i == items[0]
+
+ # can't assert order in async mode
+ if not async:
+ assert i == items[0]
# END for each item
+ elapsed = time.time() - st
+ print >> sys.stderr, "Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed)
+
# it couldn't yet notice that the input is depleted as we pulled exaclty
# ni items - the next one would remove it. Instead, we delete our channel
# which triggers orphan handling
@@ -113,11 +121,13 @@ class TestThreadPool(TestBase):
rc = p.add_task(task)
# must read a specific item count
# count is still at ni / 2 - here we want more than that
- assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2 # make sure its uneven ;)
+ # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2
+ assert len(rc.read(ni / 2 + 2)) == ni / 2 + 2
+ # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing
+ # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing
assert len(rc.read(ni / 2 - 2)) == ni / 2 - 2
- # END read chunks
- task._assert(ni / 4, ni) # read two times, got 4 processing steps
+ task._assert( 5, ni)
assert p.num_tasks() == null_tasks # depleted
# but this only hits if we want too many items, if we want less, it could
@@ -126,10 +136,18 @@ class TestThreadPool(TestBase):
task.reset(make_iter())
task.min_count = None
rc = p.add_task(task)
+ st = time.time()
for i in range(ni):
- assert rc.read(1)[0] == i
+ if async:
+ assert len(rc.read(1)) == 1
+ else:
+ assert rc.read(1)[0] == i
+ # END handle async mode
# END pull individual items
# too many processing counts ;)
+ elapsed = time.time() - st
+ print >> sys.stderr, "Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed)
+
task._assert(ni, ni)
assert p.num_tasks() == 1 + null_tasks
assert p.del_task(task) is p # del manually this time
@@ -183,7 +201,9 @@ class TestThreadPool(TestBase):
urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
assert p.num_tasks() == 2
- self._assert_sync_single_task(p)
+
+ ## SINGLE TASK #################
+ self._assert_single_task(p, False)
assert p.num_tasks() == 2
del(urc1)
del(urc2)
@@ -209,13 +229,15 @@ class TestThreadPool(TestBase):
assert len(threading.enumerate()) == num_threads + 1
# here we go
- self._assert_sync_single_task(p)
+ self._assert_single_task(p, False)
# SINGLE TASK ASYNC MODE
########################
# two threads to compete for a single task
+ p.set_size(2)
+ self._assert_single_task(p, True)
# DEPENDENT TASK ASYNC MODE