diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 12:41:20 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 12:41:20 +0200 |
commit | f91495e271597034226f1b9651345091083172c4 (patch) | |
tree | e0e2aa63b7dc649083858366eaedb6ac4cc5739b /test/git/async/test_pool.py | |
parent | 7c1169f6ea406fec1e26e99821e18e66437e65eb (diff) | |
parent | 7a0b79ee574999ecbc76696506352e4a5a0d7159 (diff) | |
download | gitpython-f91495e271597034226f1b9651345091083172c4.tar.gz |
Merge branch 'async'
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r-- | test/git/async/test_pool.py | 476 |
1 files changed, 476 insertions, 0 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py new file mode 100644 index 00000000..aab618aa --- /dev/null +++ b/test/git/async/test_pool.py @@ -0,0 +1,476 @@ +"""Channel testing""" +from test.testlib import * +from task import * + +from git.async.pool import * +from git.async.thread import terminate_threads +from git.async.util import cpu_count + +import threading +import weakref +import time +import sys + + + +class TestThreadPool(TestBase): + + max_threads = cpu_count() + + def _assert_single_task(self, p, async=False): + """Performs testing in a synchronized environment""" + print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()) + null_tasks = p.num_tasks() # in case we had some before + + # add a simple task + # it iterates n items + ni = 1000 + assert ni % 2 == 0, "ni needs to be dividable by 2" + assert ni % 4 == 0, "ni needs to be dividable by 4" + + make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs) + + task = make_task() + + assert p.num_tasks() == null_tasks + rc = p.add_task(task) + assert p.num_tasks() == 1 + null_tasks + assert isinstance(rc, PoolReader) + assert task._out_writer is not None + + # pull the result completely - we should get one task, which calls its + # function once. In sync mode, the order matches + print "read(0)" + items = rc.read() + assert len(items) == ni + task._assert(1, ni) + if not async: + assert items[0] == 0 and items[-1] == ni-1 + + # as the task is done, it should have been removed - we have read everything + assert task.is_done() + del(rc) + assert p.num_tasks() == null_tasks + task = make_task() + + # pull individual items + rc = p.add_task(task) + assert p.num_tasks() == 1 + null_tasks + st = time.time() + print "read(1) * %i" % ni + for i in range(ni): + items = rc.read(1) + assert len(items) == 1 + + # can't assert order in async mode + if not async: + assert i == items[0] + # END for each item + elapsed = time.time() - st + print >> sys.stderr, "Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed) + + # it couldn't yet notice that the input is depleted as we pulled exaclty + # ni items - the next one would remove it. Instead, we delete our channel + # which triggers orphan handling + assert not task.is_done() + assert p.num_tasks() == 1 + null_tasks + del(rc) + assert p.num_tasks() == null_tasks + + # test min count + # if we query 1 item, it will prepare ni / 2 + task = make_task() + task.min_count = ni / 2 + rc = p.add_task(task) + print "read(1)" + items = rc.read(1) + assert len(items) == 1 and items[0] == 0 # processes ni / 2 + print "read(1)" + items = rc.read(1) + assert len(items) == 1 and items[0] == 1 # processes nothing + # rest - it has ni/2 - 2 on the queue, and pulls ni-2 + # It wants too much, so the task realizes its done. The task + # doesn't care about the items in its output channel + nri = ni-2 + print "read(%i)" % nri + items = rc.read(nri) + assert len(items) == nri + p.remove_task(task) + assert p.num_tasks() == null_tasks + task._assert(2, ni) # two chunks, ni calls + + # its already done, gives us no more, its still okay to use it though + # as a task doesn't have to be in the graph to allow reading its produced + # items + print "read(0) on closed" + # it can happen that a thread closes the channel just a tiny fraction of time + # after we check this, so the test fails, although it is nearly closed. + # When we start reading, we should wake up once it sends its signal + # assert task.is_closed() + assert len(rc.read()) == 0 + + # test chunking + # we always want 4 chunks, these could go to individual nodes + task = make_task() + task.min_count = ni / 2 # restore previous value + task.max_chunksize = ni / 4 # 4 chunks + rc = p.add_task(task) + + # must read a specific item count + # count is still at ni / 2 - here we want more than that + # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2 + nri = ni / 2 + 2 + print "read(%i) chunksize set" % nri + items = rc.read(nri) + assert len(items) == nri + # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing + # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing + nri = ni / 2 - 2 + print "read(%i) chunksize set" % nri + items = rc.read(nri) + assert len(items) == nri + + task._assert( 5, ni) + + # delete the handle first, causing the task to be removed and to be set + # done. We check for the set-done state later. Depending on the timing, + # The task is not yet set done when we are checking it because we were + # scheduled in before the flag could be set. + del(rc) + assert task.is_done() + assert p.num_tasks() == null_tasks # depleted + + # but this only hits if we want too many items, if we want less, it could + # still do too much - hence we set the min_count to the same number to enforce + # at least ni / 4 items to be preocessed, no matter what we request + task = make_task() + task.min_count = None + task.max_chunksize = ni / 4 # match previous setup + rc = p.add_task(task) + st = time.time() + print "read(1) * %i, chunksize set" % ni + for i in range(ni): + if async: + assert len(rc.read(1)) == 1 + else: + assert rc.read(1)[0] == i + # END handle async mode + # END pull individual items + # too many processing counts ;) + elapsed = time.time() - st + print >> sys.stderr, "Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed) + + task._assert(ni, ni) + assert p.num_tasks() == 1 + null_tasks + assert p.remove_task(task) is p # del manually this time + assert p.num_tasks() == null_tasks + + # now with we set the minimum count to reduce the number of processing counts + task = make_task() + task.min_count = ni / 4 + task.max_chunksize = ni / 4 # match previous setup + rc = p.add_task(task) + print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count) + for i in range(ni): + items = rc.read(1) + assert len(items) == 1 + if not async: + assert items[0] == i + # END for each item + task._assert(ni / task.min_count, ni) + del(rc) + assert p.num_tasks() == null_tasks + + # test failure + # on failure, the processing stops and the task is finished, keeping + # his error for later + task = make_task() + task.should_fail = True + rc = p.add_task(task) + print "read(0) with failure" + assert len(rc.read()) == 0 # failure on first item + + assert isinstance(task.error(), AssertionError) + assert task.is_done() # on error, its marked done as well + del(rc) + assert p.num_tasks() == null_tasks + + # test failure after ni / 2 items + # This makes sure it correctly closes the channel on failure to prevent blocking + nri = ni/2 + task = make_task(TestFailureThreadTask, fail_after=ni/2) + rc = p.add_task(task) + assert len(rc.read()) == nri + assert task.is_done() + assert isinstance(task.error(), AssertionError) + + print >> sys.stderr, "done with everything" + + + + def _assert_async_dependent_tasks(self, pool): + # includes failure in center task, 'recursive' orphan cleanup + # This will also verify that the channel-close mechanism works + # t1 -> t2 -> t3 + + print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() + null_tasks = pool.num_tasks() + ni = 1000 + count = 3 + aic = count + 2 + make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs) + + ts, rcs = make_task() + assert len(ts) == aic + assert len(rcs) == aic + assert pool.num_tasks() == null_tasks + len(ts) + + # read(0) + ######### + st = time.time() + items = rcs[-1].read() + elapsed = time.time() - st + print len(items), ni + assert len(items) == ni + del(rcs) + assert pool.num_tasks() == 0 # tasks depleted, all done, no handles + # wait a tiny moment - there could still be something unprocessed on the + # queue, increasing the refcount + time.sleep(0.15) + assert sys.getrefcount(ts[-1]) == 2 # ts + call + assert sys.getrefcount(ts[0]) == 2 # ts + call + print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) + + + # read(1) + ######### + ts, rcs = make_task() + st = time.time() + for i in xrange(ni): + items = rcs[-1].read(1) + assert len(items) == 1 + # END for each item to pull + elapsed_single = time.time() - st + # another read yields nothing, its empty + assert len(rcs[-1].read()) == 0 + print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single) + + + # read with min-count size + ########################### + # must be faster, as it will read ni / 4 chunks + # Its enough to set one task, as it will force all others in the chain + # to min_size as well. + ts, rcs = make_task() + assert pool.num_tasks() == len(ts) + nri = ni / 4 + ts[-1].min_count = nri + st = time.time() + for i in xrange(ni): + items = rcs[-1].read(1) + assert len(items) == 1 + # END for each item to read + elapsed_minsize = time.time() - st + # its empty + assert len(rcs[-1].read()) == 0 + print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize) + + # it should have been a bit faster at least, and most of the time it is + # Sometimes, its not, mainly because: + # * The test tasks lock a lot, hence they slow down the system + # * Each read will still trigger the pool to evaluate, causing some overhead + # even though there are enough items on the queue in that case. Keeping + # track of the scheduled items helped there, but it caused further inacceptable + # slowdown + # assert elapsed_minsize < elapsed_single + + + # read with failure + ################### + # it should recover and give at least fail_after items + # t1 -> x -> t3 + fail_after = ni/2 + ts, rcs = make_task(fail_setup=[(0, fail_after)]) + items = rcs[-1].read() + assert len(items) == fail_after + + + # MULTI-POOL + # If two pools are connected, this shold work as well. + # The second one has just one more thread + ts, rcs = make_task() + + # connect verifier channel as feeder of the second pool + p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes + assert p2.size() == 0 + p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2ts[0] is None # we have no feeder task + assert rcs[-1].pool_ref()() is pool # it didnt change the pool + assert rcs[-1] is p2ts[1].reader() + assert p2.num_tasks() == len(p2ts)-1 # first is None + + # reading from the last one will evaluate all pools correctly + print "read(0) multi-pool" + st = time.time() + items = p2rcs[-1].read() + elapsed = time.time() - st + assert len(items) == ni + + print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) + + + # loose the handles of the second pool to allow others to go as well + del(p2rcs); del(p2ts) + assert p2.num_tasks() == 0 + + # now we lost our old handles as well, and the tasks go away + ts, rcs = make_task() + assert pool.num_tasks() == len(ts) + + p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2.num_tasks() == len(p2ts) - 1 + + # Test multi-read(1) + print "read(1) * %i" % ni + reader = rcs[-1] + st = time.time() + for i in xrange(ni): + items = reader.read(1) + assert len(items) == 1 + # END for each item to get + elapsed = time.time() - st + del(reader) # decrement refcount + + print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) + + # another read is empty + assert len(rcs[-1].read()) == 0 + + # now that both are connected, I can drop my handle to the reader + # without affecting the task-count, but whats more important: + # They remove their tasks correctly once we drop our references in the + # right order + del(p2ts) + assert p2rcs[0] is rcs[-1] + del(p2rcs) + assert p2.num_tasks() == 0 + del(p2) + + assert pool.num_tasks() == null_tasks + len(ts) + + + del(ts) + del(rcs) + + assert pool.num_tasks() == null_tasks + + + # ASSERTION: We already tested that one pool behaves correctly when an error + # occours - if two pools handle their ref-counts correctly, which they + # do if we are here, then they should handle errors happening during + # the task processing as expected as well. Hence we can safe this here + + + + @terminate_threads + def test_base(self): + max_wait_attempts = 3 + sleep_time = 0.1 + for mc in range(max_wait_attempts): + # wait for threads to die + if len(threading.enumerate()) != 1: + time.sleep(sleep_time) + # END for each attempt + assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time) + + p = ThreadPool() + + # default pools have no workers + assert p.size() == 0 + + # increase and decrease the size + num_threads = len(threading.enumerate()) + for i in range(self.max_threads): + p.set_size(i) + assert p.size() == i + assert len(threading.enumerate()) == num_threads + i + + for i in range(self.max_threads, -1, -1): + p.set_size(i) + assert p.size() == i + + assert p.size() == 0 + # threads should be killed already, but we let them a tiny amount of time + # just to be sure + time.sleep(0.05) + assert len(threading.enumerate()) == num_threads + + # SINGLE TASK SERIAL SYNC MODE + ############################## + # put a few unrelated tasks that we forget about - check ref counts and cleanup + t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None) + urc1 = p.add_task(t1) + urc2 = p.add_task(t2) + assert p.num_tasks() == 2 + + ## SINGLE TASK ################# + self._assert_single_task(p, False) + assert p.num_tasks() == 2 + del(urc1) + assert p.num_tasks() == 1 + + p.remove_task(t2) + assert p.num_tasks() == 0 + assert sys.getrefcount(t2) == 2 + + t3 = TestChannelThreadTask(urc2, "channel", None) + urc3 = p.add_task(t3) + assert p.num_tasks() == 1 + del(urc3) + assert p.num_tasks() == 0 + assert sys.getrefcount(t3) == 2 + + + # DEPENDENT TASKS SYNC MODE + ########################### + self._assert_async_dependent_tasks(p) + + + # SINGLE TASK THREADED ASYNC MODE ( 1 thread ) + ############################################## + # step one gear up - just one thread for now. + p.set_size(1) + assert p.size() == 1 + assert len(threading.enumerate()) == num_threads + 1 + # deleting the pool stops its threads - just to be sure ;) + # Its not synchronized, hence we wait a moment + del(p) + time.sleep(0.05) + assert len(threading.enumerate()) == num_threads + + p = ThreadPool(1) + assert len(threading.enumerate()) == num_threads + 1 + + # here we go + self._assert_single_task(p, True) + + + + # SINGLE TASK ASYNC MODE ( 2 threads ) + ###################################### + # two threads to compete for a single task + p.set_size(2) + self._assert_single_task(p, True) + + # real stress test- should be native on every dual-core cpu with 2 hardware + # threads per core + p.set_size(4) + self._assert_single_task(p, True) + + + # DEPENDENT TASK ASYNC MODE + ########################### + self._assert_async_dependent_tasks(p) + + print >> sys.stderr, "Done with everything" + |