summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-08 18:20:12 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-08 18:20:12 +0200
commite5c0002d069382db1768349bf0c5ff40aafbf140 (patch)
tree0af5a850aa240aff9bc90b0d0e4a31ab1d1ac9eb /test/git/async/test_pool.py
parent13dd59ba5b3228820841682b59bad6c22476ff66 (diff)
downloadgitpython-e5c0002d069382db1768349bf0c5ff40aafbf140.tar.gz
Revised task deletion works well, adjusted test to be creating new tasks all the time instead of reusing its own one, it was somewhat hard to manage its state over time and could cause bugs. It works okay, but it occasionally hangs, it appears to be an empty queue, have to gradually put certain things back in, although in the current mode of operation, it should never have empty queues from the pool to the user
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r--test/git/async/test_pool.py51
1 files changed, 29 insertions, 22 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 3077dc32..82947988 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -6,14 +6,17 @@ from git.async.thread import terminate_threads
from git.async.util import cpu_count
import threading
import time
+import sys
class TestThreadTaskNode(InputIteratorThreadTask):
def __init__(self, *args, **kwargs):
super(TestThreadTaskNode, self).__init__(*args, **kwargs)
- self.reset(self._iterator)
self.should_fail = False
self.lock = threading.Lock() # yes, can't safely do x = x + 1 :)
self.plock = threading.Lock()
+ self.item_count = 0
+ self.process_count = 0
+ self._scheduled_items = 0
def do_fun(self, item):
self.lock.acquire()
@@ -23,13 +26,6 @@ class TestThreadTaskNode(InputIteratorThreadTask):
raise AssertionError("I am failing just for the fun of it")
return item
- def reset(self, iterator):
- self.process_count = 0
- self.item_count = 0
- self._exc = None
- self._iterator = iterator
- self._done = False
-
def process(self, count=1):
# must do it first, otherwise we might read and check results before
# the thread gets here :). Its a lesson !
@@ -68,7 +64,7 @@ class TestThreadPool(TestBase):
def _add_triple_task(self, p):
"""Add a triplet of feeder, transformer and finalizer to the pool, like
t1 -> t2 -> t3, return all 3 return channels in order"""
- t1 = TestThreadTaskNode(make_iter(), 'iterator', None)
+ # t1 = TestThreadTaskNode(make_task(), 'iterator', None)
# TODO:
def _assert_single_task(self, p, async=False):
@@ -81,12 +77,13 @@ class TestThreadPool(TestBase):
assert ni % 2 == 0, "ni needs to be dividable by 2"
assert ni % 4 == 0, "ni needs to be dividable by 4"
- def make_iter():
- return iter(range(ni))
+ def make_task():
+ t = TestThreadTaskNode(iter(range(ni)), 'iterator', None)
+ t.fun = t.do_fun
+ return t
# END utility
- task = TestThreadTaskNode(make_iter(), 'iterator', None)
- task.fun = task.do_fun
+ task = make_task()
assert p.num_tasks() == null_tasks
rc = p.add_task(task)
@@ -104,8 +101,9 @@ class TestThreadPool(TestBase):
# as the task is done, it should have been removed - we have read everything
assert task.is_done()
+ del(rc)
assert p.num_tasks() == null_tasks
- task.reset(make_iter())
+ task = make_task()
# pull individual items
rc = p.add_task(task)
@@ -126,14 +124,14 @@ class TestThreadPool(TestBase):
# it couldn't yet notice that the input is depleted as we pulled exaclty
# ni items - the next one would remove it. Instead, we delete our channel
# which triggers orphan handling
+ assert not task.is_done()
assert p.num_tasks() == 1 + null_tasks
del(rc)
assert p.num_tasks() == null_tasks
- task.reset(make_iter())
-
# test min count
# if we query 1 item, it will prepare ni / 2
+ task = make_task()
task.min_count = ni / 2
rc = p.add_task(task)
print "read(1)"
@@ -149,6 +147,7 @@ class TestThreadPool(TestBase):
print "read(%i)" % nri
items = rc.read(nri)
assert len(items) == nri
+ p.del_task(task)
assert p.num_tasks() == null_tasks
task._assert(2, ni) # two chunks, ni calls
@@ -158,31 +157,36 @@ class TestThreadPool(TestBase):
# test chunking
# we always want 4 chunks, these could go to individual nodes
- task.reset(make_iter())
+ task = make_task()
+ task.min_count = ni / 2 # restore previous value
task.max_chunksize = ni / 4 # 4 chunks
rc = p.add_task(task)
+
# must read a specific item count
# count is still at ni / 2 - here we want more than that
# 2 steps with n / 4 items, + 1 step with n/4 items to get + 2
nri = ni / 2 + 2
- print "read(%i)" % nri
+ print "read(%i) chunksize set" % nri
items = rc.read(nri)
assert len(items) == nri
# have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing
# ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing
nri = ni / 2 - 2
- print "read(%i)" % nri
+ print "read(%i) chunksize set" % nri
items = rc.read(nri)
assert len(items) == nri
task._assert( 5, ni)
+ assert task.is_done()
+ del(rc)
assert p.num_tasks() == null_tasks # depleted
# but this only hits if we want too many items, if we want less, it could
# still do too much - hence we set the min_count to the same number to enforce
# at least ni / 4 items to be preocessed, no matter what we request
- task.reset(make_iter())
+ task = make_task()
task.min_count = None
+ task.max_chunksize = ni / 4 # match previous setup
rc = p.add_task(task)
st = time.time()
print "read(1) * %i, chunksize set" % ni
@@ -203,8 +207,9 @@ class TestThreadPool(TestBase):
assert p.num_tasks() == null_tasks
# now with we set the minimum count to reduce the number of processing counts
- task.reset(make_iter())
+ task = make_task()
task.min_count = ni / 4
+ task.max_chunksize = ni / 4 # match previous setup
rc = p.add_task(task)
print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count)
for i in range(ni):
@@ -220,13 +225,15 @@ class TestThreadPool(TestBase):
# test failure
# on failure, the processing stops and the task is finished, keeping
# his error for later
- task.reset(make_iter())
+ task = make_task()
task.should_fail = True
rc = p.add_task(task)
print "read(0) with failure"
assert len(rc.read()) == 0 # failure on first item
print "done with everything"
assert isinstance(task.error(), AssertionError)
+ assert task.is_done() # on error, its marked done as well
+ del(rc)
assert p.num_tasks() == null_tasks