summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/git/async/pool.py1
-rw-r--r--lib/git/async/thread.py2
-rw-r--r--lib/git/async/util.py8
-rw-r--r--test/git/async/test_pool.py26
4 files changed, 13 insertions, 24 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 0aad90ae..dbc201a9 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -324,7 +324,6 @@ class Pool(object):
threadsafe to optimize item throughput.
:note: currently NOT threadsafe !"""
- print "set_size", size
assert size > -1, "Size cannot be negative"
# either start new threads, or kill existing ones.
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index b8d2e418..4d046a2f 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -146,9 +146,7 @@ class WorkerThread(TerminatableThread):
# we wait and block - to terminate, send the 'stop' method
tasktuple = gettask()
-
# needing exactly one function, and one arg
- assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need"
routine, arg = tasktuple
try:
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index b7750b0b..11ab75a6 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -101,10 +101,12 @@ class HSCondition(deque):
waiter.acquire() # get it the first time, no blocking
self.append(waiter)
- # in the momemnt we release our lock, someone else might actually resume
- self._lock.release()
- try: # restore state no matter what (e.g., KeyboardInterrupt)
+
+ try:
+ # restore state no matter what (e.g., KeyboardInterrupt)
# now we block, as we hold the lock already
+ # in the momemnt we release our lock, someone else might actually resume
+ self._lock.release()
if timeout is None:
waiter.acquire()
else:
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 4851f61b..5bb48cc2 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -199,7 +199,7 @@ class TestThreadPool(TestBase):
# add a simple task
# it iterates n items
- ni = 5000
+ ni = 1000
assert ni % 2 == 0, "ni needs to be dividable by 2"
assert ni % 4 == 0, "ni needs to be dividable by 4"
@@ -382,18 +382,18 @@ class TestThreadPool(TestBase):
# includes failure in center task, 'recursive' orphan cleanup
# This will also verify that the channel-close mechanism works
# t1 -> t2 -> t3
-
+
print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size()
null_tasks = pool.num_tasks()
- ni = 5000
+ ni = 1000
count = 3
aic = count + 2
make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs)
+
ts, rcs = make_task()
assert len(ts) == aic
assert len(rcs) == aic
assert pool.num_tasks() == null_tasks + len(ts)
- print pool._tasks.nodes
# read(0)
#########
@@ -407,9 +407,6 @@ class TestThreadPool(TestBase):
# wait a tiny moment - there could still be something unprocessed on the
# queue, increasing the refcount
time.sleep(0.15)
- import gc
- print gc.get_referrers(ts[-1])
- print len(pool._queue)
assert sys.getrefcount(ts[-1]) == 2 # ts + call
assert sys.getrefcount(ts[0]) == 2 # ts + call
print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed)
@@ -467,15 +464,15 @@ class TestThreadPool(TestBase):
items = rcs[-1].read()
assert len(items) == fail_after
-
+
# MULTI-POOL
# If two pools are connected, this shold work as well.
# The second one has just one more thread
ts, rcs = make_task()
# connect verifier channel as feeder of the second pool
- p2 = ThreadPool(1)
- assert p2.size() == 1
+ p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes
+ assert p2.size() == 0
p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
assert p2ts[0] is None # we have no feeder task
assert rcs[-1].pool_ref()() is pool # it didnt change the pool
@@ -501,14 +498,8 @@ class TestThreadPool(TestBase):
del(ts)
- print "del rcs"
- print rcs[-1]
- print sys.getrefcount(rcs[-1])
del(rcs)
- # TODO: make this work - something with the refcount goes wrong,
- # they never get cleaned up properly
- ts = pool._tasks.nodes
- print pool.num_tasks()
+
assert pool.num_tasks() == null_tasks
@@ -585,7 +576,6 @@ class TestThreadPool(TestBase):
# step one gear up - just one thread for now.
p.set_size(1)
assert p.size() == 1
- print len(threading.enumerate())
assert len(threading.enumerate()) == num_threads + 1
# deleting the pool stops its threads - just to be sure ;)
# Its not synchronized, hence we wait a moment