summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-11 11:52:24 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-11 14:20:25 +0200
commit583e6a25b0d891a2f531a81029f2bac0c237cbf9 (patch)
tree09025a39d44fa2a28a6533a0f969316652f974bc /test/git/async/test_pool.py
parent01eac1a959c1fa5894a86bf11e6b92f96762bdd8 (diff)
parent6d1212e8c412b0b4802bc1080d38d54907db879d (diff)
downloadgitpython-583e6a25b0d891a2f531a81029f2bac0c237cbf9.tar.gz
Merge branch 'channel' into taskdep
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r--test/git/async/test_pool.py135
1 files changed, 97 insertions, 38 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 679bab31..5bb48cc2 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -5,6 +5,7 @@ from git.async.task import *
from git.async.thread import terminate_threads
from git.async.util import cpu_count
import threading
+import weakref
import time
import sys
@@ -42,7 +43,9 @@ class _TestTaskBase(object):
print self.item_count, fc
assert self.item_count == fc
self.lock.release()
-
+ self._wlock.acquire()
+ assert self._num_writers == 0
+ self._wlock.release()
return self
@@ -122,31 +125,47 @@ class TestThreadPool(TestBase):
max_threads = cpu_count()
- def _add_task_chain(self, p, ni, count=1, fail_setup=list()):
+
+ def _make_proxy_method(self, t):
+ """required to prevent binding self into the method we call"""
+ wt = weakref.proxy(t)
+ return lambda item: wt.do_fun(item)
+
+ def _add_task_chain(self, p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0):
"""Create a task chain of feeder, count transformers and order verifcator
to the pool p, like t1 -> t2 -> t3
:param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would
make the third transformer fail after 20 items
+ :param feeder_channel: if set to a channel, it will be used as input of the
+ first transformation task. The respective first task in the return value
+ will be None.
+ :param id_offset: defines the id of the first transformation task, all subsequent
+ ones will add one
:return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
nt = p.num_tasks()
- feeder = self._make_iterator_task(ni)
- frc = p.add_task(feeder)
-
- assert p.num_tasks() == nt + 1
+ feeder = None
+ frc = feeder_channel
+ if feeder_channel is None:
+ feeder = self._make_iterator_task(ni)
+ frc = p.add_task(feeder)
+ # END handle specific feeder
rcs = [frc]
tasks = [feeder]
+ make_proxy_method = self._make_proxy_method
+
inrc = frc
for tc in xrange(count):
- t = TestThreadInputChannelTaskNode(inrc, tc, None)
- t.fun = t.do_fun
+ t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None)
+
+ t.fun = make_proxy_method(t)
+ #t.fun = t.do_fun
inrc = p.add_task(t)
tasks.append(t)
rcs.append(inrc)
- assert p.num_tasks() == nt + 2 + tc
# END create count transformers
# setup failure
@@ -155,10 +174,10 @@ class TestThreadPool(TestBase):
# END setup failure
verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
- verifier.fun = verifier.do_fun
+ #verifier.fun = verifier.do_fun
+ verifier.fun = make_proxy_method(verifier)
vrc = p.add_task(verifier)
- assert p.num_tasks() == nt + tc + 3
tasks.append(verifier)
rcs.append(vrc)
@@ -169,7 +188,7 @@ class TestThreadPool(TestBase):
:param taskcls: the actual iterator type to use
:param **kwargs: additional kwargs to be passed to the task"""
t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
- t.fun = t.do_fun
+ t.fun = self._make_proxy_method(t)
return t
def _assert_single_task(self, p, async=False):
@@ -180,7 +199,7 @@ class TestThreadPool(TestBase):
# add a simple task
# it iterates n items
- ni = 5000
+ ni = 1000
assert ni % 2 == 0, "ni needs to be dividable by 2"
assert ni % 4 == 0, "ni needs to be dividable by 4"
@@ -191,8 +210,8 @@ class TestThreadPool(TestBase):
assert p.num_tasks() == null_tasks
rc = p.add_task(task)
assert p.num_tasks() == 1 + null_tasks
- assert isinstance(rc, RPoolChannel)
- assert task._out_wc is not None
+ assert isinstance(rc, PoolReader)
+ assert task._out_writer is not None
# pull the result completely - we should get one task, which calls its
# function once. In sync mode, the order matches
@@ -363,27 +382,33 @@ class TestThreadPool(TestBase):
# includes failure in center task, 'recursive' orphan cleanup
# This will also verify that the channel-close mechanism works
# t1 -> t2 -> t3
-
+
print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size()
null_tasks = pool.num_tasks()
- ni = 5000
+ ni = 1000
count = 3
aic = count + 2
make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs)
+
ts, rcs = make_task()
assert len(ts) == aic
assert len(rcs) == aic
assert pool.num_tasks() == null_tasks + len(ts)
- print pool._tasks.nodes
# read(0)
#########
st = time.time()
items = rcs[-1].read()
elapsed = time.time() - st
+ print len(items), ni
assert len(items) == ni
del(rcs)
assert pool.num_tasks() == 0 # tasks depleted, all done, no handles
+ # wait a tiny moment - there could still be something unprocessed on the
+ # queue, increasing the refcount
+ time.sleep(0.15)
+ assert sys.getrefcount(ts[-1]) == 2 # ts + call
+ assert sys.getrefcount(ts[0]) == 2 # ts + call
print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed)
@@ -439,28 +464,51 @@ class TestThreadPool(TestBase):
items = rcs[-1].read()
assert len(items) == fail_after
-
+
# MULTI-POOL
# If two pools are connected, this shold work as well.
# The second one has just one more thread
- if False:
- p2 = ThreadPool(1)
- assert p2.size() == 1
- p2ts, p2rcs = self._add_task_chain(p2, ni, count)
-
- ts, rcs = make_task()
-
-
- del(p2ts)
- del(p2rcs)
- assert p2.num_tasks() == 0
- del(p2)
+ ts, rcs = make_task()
+
+ # connect verifier channel as feeder of the second pool
+ p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes
+ assert p2.size() == 0
+ p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
+ assert p2ts[0] is None # we have no feeder task
+ assert rcs[-1].pool_ref()() is pool # it didnt change the pool
+ assert rcs[-1] is p2ts[1].reader()
+ assert p2.num_tasks() == len(p2ts)-1 # first is None
+
+ # reading from the last one will evaluate all pools correctly
+ print "read(0) multi-pool"
+ items = p2rcs[-1].read()
+ assert len(items) == ni
+
+ # now that both are connected, I can drop my handle to the reader
+ # without affecting the task-count, but whats more important:
+ # They remove their tasks correctly once we drop our references in the
+ # right order
+ del(p2ts)
+ assert p2rcs[0] is rcs[-1]
+ del(p2rcs)
+ assert p2.num_tasks() == 0
+ del(p2)
+
+ assert pool.num_tasks() == null_tasks + len(ts)
+
+
+ del(ts)
+ del(rcs)
+
+ assert pool.num_tasks() == null_tasks
+
+
+ # TODO: Test multi-read(1)
# in the end, we expect all tasks to be gone, automatically
# order of deletion doesnt matter
- del(ts)
- del(rcs)
- assert pool.num_tasks() == null_tasks
+
+
@@ -494,17 +542,28 @@ class TestThreadPool(TestBase):
# SINGLE TASK SERIAL SYNC MODE
##############################
- # put a few unrelated tasks that we forget about
- urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
- urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
+ # put a few unrelated tasks that we forget about - check ref counts and cleanup
+ t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None)
+ urc1 = p.add_task(t1)
+ urc2 = p.add_task(t2)
assert p.num_tasks() == 2
## SINGLE TASK #################
self._assert_single_task(p, False)
assert p.num_tasks() == 2
del(urc1)
- del(urc2)
+ assert p.num_tasks() == 1
+
+ p.remove_task(t2)
+ assert p.num_tasks() == 0
+ assert sys.getrefcount(t2) == 2
+
+ t3 = TestThreadInputChannelTaskNode(urc2, "channel", None)
+ urc3 = p.add_task(t3)
+ assert p.num_tasks() == 1
+ del(urc3)
assert p.num_tasks() == 0
+ assert sys.getrefcount(t3) == 2
# DEPENDENT TASKS SYNC MODE