summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r--test/git/async/test_pool.py129
1 files changed, 98 insertions, 31 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 7cb94a86..4851f61b 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -5,6 +5,7 @@ from git.async.task import *
from git.async.thread import terminate_threads
from git.async.util import cpu_count
import threading
+import weakref
import time
import sys
@@ -42,7 +43,9 @@ class _TestTaskBase(object):
print self.item_count, fc
assert self.item_count == fc
self.lock.release()
+ self._wlock.acquire()
assert self._num_writers == 0
+ self._wlock.release()
return self
@@ -122,31 +125,47 @@ class TestThreadPool(TestBase):
max_threads = cpu_count()
- def _add_task_chain(self, p, ni, count=1, fail_setup=list()):
+
+ def _make_proxy_method(self, t):
+ """required to prevent binding self into the method we call"""
+ wt = weakref.proxy(t)
+ return lambda item: wt.do_fun(item)
+
+ def _add_task_chain(self, p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0):
"""Create a task chain of feeder, count transformers and order verifcator
to the pool p, like t1 -> t2 -> t3
:param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would
make the third transformer fail after 20 items
+ :param feeder_channel: if set to a channel, it will be used as input of the
+ first transformation task. The respective first task in the return value
+ will be None.
+ :param id_offset: defines the id of the first transformation task, all subsequent
+ ones will add one
:return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
nt = p.num_tasks()
- feeder = self._make_iterator_task(ni)
- frc = p.add_task(feeder)
-
- assert p.num_tasks() == nt + 1
+ feeder = None
+ frc = feeder_channel
+ if feeder_channel is None:
+ feeder = self._make_iterator_task(ni)
+ frc = p.add_task(feeder)
+ # END handle specific feeder
rcs = [frc]
tasks = [feeder]
+ make_proxy_method = self._make_proxy_method
+
inrc = frc
for tc in xrange(count):
- t = TestThreadInputChannelTaskNode(inrc, tc, None)
- t.fun = t.do_fun
+ t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None)
+
+ t.fun = make_proxy_method(t)
+ #t.fun = t.do_fun
inrc = p.add_task(t)
tasks.append(t)
rcs.append(inrc)
- assert p.num_tasks() == nt + 2 + tc
# END create count transformers
# setup failure
@@ -155,10 +174,10 @@ class TestThreadPool(TestBase):
# END setup failure
verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
- verifier.fun = verifier.do_fun
+ #verifier.fun = verifier.do_fun
+ verifier.fun = make_proxy_method(verifier)
vrc = p.add_task(verifier)
- assert p.num_tasks() == nt + tc + 3
tasks.append(verifier)
rcs.append(vrc)
@@ -169,7 +188,7 @@ class TestThreadPool(TestBase):
:param taskcls: the actual iterator type to use
:param **kwargs: additional kwargs to be passed to the task"""
t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
- t.fun = t.do_fun
+ t.fun = self._make_proxy_method(t)
return t
def _assert_single_task(self, p, async=False):
@@ -385,6 +404,14 @@ class TestThreadPool(TestBase):
assert len(items) == ni
del(rcs)
assert pool.num_tasks() == 0 # tasks depleted, all done, no handles
+ # wait a tiny moment - there could still be something unprocessed on the
+ # queue, increasing the refcount
+ time.sleep(0.15)
+ import gc
+ print gc.get_referrers(ts[-1])
+ print len(pool._queue)
+ assert sys.getrefcount(ts[-1]) == 2 # ts + call
+ assert sys.getrefcount(ts[0]) == 2 # ts + call
print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed)
@@ -444,25 +471,53 @@ class TestThreadPool(TestBase):
# MULTI-POOL
# If two pools are connected, this shold work as well.
# The second one has just one more thread
- if False:
- p2 = ThreadPool(1)
- assert p2.size() == 1
- p2ts, p2rcs = self._add_task_chain(p2, ni, count)
-
- ts, rcs = make_task()
-
-
- del(p2ts)
- del(p2rcs)
- assert p2.num_tasks() == 0
- del(p2)
-
- # in the end, we expect all tasks to be gone, automatically
- # order of deletion doesnt matter
+ ts, rcs = make_task()
+
+ # connect verifier channel as feeder of the second pool
+ p2 = ThreadPool(1)
+ assert p2.size() == 1
+ p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
+ assert p2ts[0] is None # we have no feeder task
+ assert rcs[-1].pool_ref()() is pool # it didnt change the pool
+ assert rcs[-1] is p2ts[1].reader()
+ assert p2.num_tasks() == len(p2ts)-1 # first is None
+
+ # reading from the last one will evaluate all pools correctly
+ print "read(0) multi-pool"
+ items = p2rcs[-1].read()
+ assert len(items) == ni
+
+ # now that both are connected, I can drop my handle to the reader
+ # without affecting the task-count, but whats more important:
+ # They remove their tasks correctly once we drop our references in the
+ # right order
+ del(p2ts)
+ assert p2rcs[0] is rcs[-1]
+ del(p2rcs)
+ assert p2.num_tasks() == 0
+ del(p2)
+
+ assert pool.num_tasks() == null_tasks + len(ts)
+
+
del(ts)
+ print "del rcs"
+ print rcs[-1]
+ print sys.getrefcount(rcs[-1])
del(rcs)
+ # TODO: make this work - something with the refcount goes wrong,
+ # they never get cleaned up properly
+ ts = pool._tasks.nodes
print pool.num_tasks()
- assert pool.num_tasks() == null_tasks
+ assert pool.num_tasks() == null_tasks
+
+
+ # TODO: Test multi-read(1)
+
+ # in the end, we expect all tasks to be gone, automatically
+ # order of deletion doesnt matter
+
+
@@ -496,17 +551,28 @@ class TestThreadPool(TestBase):
# SINGLE TASK SERIAL SYNC MODE
##############################
- # put a few unrelated tasks that we forget about
- urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
- urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None))
+ # put a few unrelated tasks that we forget about - check ref counts and cleanup
+ t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None)
+ urc1 = p.add_task(t1)
+ urc2 = p.add_task(t2)
assert p.num_tasks() == 2
## SINGLE TASK #################
self._assert_single_task(p, False)
assert p.num_tasks() == 2
del(urc1)
- del(urc2)
+ assert p.num_tasks() == 1
+
+ p.remove_task(t2)
+ assert p.num_tasks() == 0
+ assert sys.getrefcount(t2) == 2
+
+ t3 = TestThreadInputChannelTaskNode(urc2, "channel", None)
+ urc3 = p.add_task(t3)
+ assert p.num_tasks() == 1
+ del(urc3)
assert p.num_tasks() == 0
+ assert sys.getrefcount(t3) == 2
# DEPENDENT TASKS SYNC MODE
@@ -519,6 +585,7 @@ class TestThreadPool(TestBase):
# step one gear up - just one thread for now.
p.set_size(1)
assert p.size() == 1
+ print len(threading.enumerate())
assert len(threading.enumerate()) == num_threads + 1
# deleting the pool stops its threads - just to be sure ;)
# Its not synchronized, hence we wait a moment