summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r--test/git/async/test_pool.py476
1 files changed, 476 insertions, 0 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
new file mode 100644
index 00000000..aab618aa
--- /dev/null
+++ b/test/git/async/test_pool.py
@@ -0,0 +1,476 @@
+"""Channel testing"""
+from test.testlib import *
+from task import *
+
+from git.async.pool import *
+from git.async.thread import terminate_threads
+from git.async.util import cpu_count
+
+import threading
+import weakref
+import time
+import sys
+
+
+
+class TestThreadPool(TestBase):
+
+ max_threads = cpu_count()
+
+ def _assert_single_task(self, p, async=False):
+ """Performs testing in a synchronized environment"""
+ print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size())
+ null_tasks = p.num_tasks() # in case we had some before
+
+ # add a simple task
+ # it iterates n items
+ ni = 1000
+ assert ni % 2 == 0, "ni needs to be dividable by 2"
+ assert ni % 4 == 0, "ni needs to be dividable by 4"
+
+ make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs)
+
+ task = make_task()
+
+ assert p.num_tasks() == null_tasks
+ rc = p.add_task(task)
+ assert p.num_tasks() == 1 + null_tasks
+ assert isinstance(rc, PoolReader)
+ assert task._out_writer is not None
+
+ # pull the result completely - we should get one task, which calls its
+ # function once. In sync mode, the order matches
+ print "read(0)"
+ items = rc.read()
+ assert len(items) == ni
+ task._assert(1, ni)
+ if not async:
+ assert items[0] == 0 and items[-1] == ni-1
+
+ # as the task is done, it should have been removed - we have read everything
+ assert task.is_done()
+ del(rc)
+ assert p.num_tasks() == null_tasks
+ task = make_task()
+
+ # pull individual items
+ rc = p.add_task(task)
+ assert p.num_tasks() == 1 + null_tasks
+ st = time.time()
+ print "read(1) * %i" % ni
+ for i in range(ni):
+ items = rc.read(1)
+ assert len(items) == 1
+
+ # can't assert order in async mode
+ if not async:
+ assert i == items[0]
+ # END for each item
+ elapsed = time.time() - st
+ print >> sys.stderr, "Threadpool: processed %i individual items, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, p.size(), elapsed, ni / elapsed)
+
+ # it couldn't yet notice that the input is depleted as we pulled exaclty
+ # ni items - the next one would remove it. Instead, we delete our channel
+ # which triggers orphan handling
+ assert not task.is_done()
+ assert p.num_tasks() == 1 + null_tasks
+ del(rc)
+ assert p.num_tasks() == null_tasks
+
+ # test min count
+ # if we query 1 item, it will prepare ni / 2
+ task = make_task()
+ task.min_count = ni / 2
+ rc = p.add_task(task)
+ print "read(1)"
+ items = rc.read(1)
+ assert len(items) == 1 and items[0] == 0 # processes ni / 2
+ print "read(1)"
+ items = rc.read(1)
+ assert len(items) == 1 and items[0] == 1 # processes nothing
+ # rest - it has ni/2 - 2 on the queue, and pulls ni-2
+ # It wants too much, so the task realizes its done. The task
+ # doesn't care about the items in its output channel
+ nri = ni-2
+ print "read(%i)" % nri
+ items = rc.read(nri)
+ assert len(items) == nri
+ p.remove_task(task)
+ assert p.num_tasks() == null_tasks
+ task._assert(2, ni) # two chunks, ni calls
+
+ # its already done, gives us no more, its still okay to use it though
+ # as a task doesn't have to be in the graph to allow reading its produced
+ # items
+ print "read(0) on closed"
+ # it can happen that a thread closes the channel just a tiny fraction of time
+ # after we check this, so the test fails, although it is nearly closed.
+ # When we start reading, we should wake up once it sends its signal
+ # assert task.is_closed()
+ assert len(rc.read()) == 0
+
+ # test chunking
+ # we always want 4 chunks, these could go to individual nodes
+ task = make_task()
+ task.min_count = ni / 2 # restore previous value
+ task.max_chunksize = ni / 4 # 4 chunks
+ rc = p.add_task(task)
+
+ # must read a specific item count
+ # count is still at ni / 2 - here we want more than that
+ # 2 steps with n / 4 items, + 1 step with n/4 items to get + 2
+ nri = ni / 2 + 2
+ print "read(%i) chunksize set" % nri
+ items = rc.read(nri)
+ assert len(items) == nri
+ # have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing
+ # ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing
+ nri = ni / 2 - 2
+ print "read(%i) chunksize set" % nri
+ items = rc.read(nri)
+ assert len(items) == nri
+
+ task._assert( 5, ni)
+
+ # delete the handle first, causing the task to be removed and to be set
+ # done. We check for the set-done state later. Depending on the timing,
+ # The task is not yet set done when we are checking it because we were
+ # scheduled in before the flag could be set.
+ del(rc)
+ assert task.is_done()
+ assert p.num_tasks() == null_tasks # depleted
+
+ # but this only hits if we want too many items, if we want less, it could
+ # still do too much - hence we set the min_count to the same number to enforce
+ # at least ni / 4 items to be preocessed, no matter what we request
+ task = make_task()
+ task.min_count = None
+ task.max_chunksize = ni / 4 # match previous setup
+ rc = p.add_task(task)
+ st = time.time()
+ print "read(1) * %i, chunksize set" % ni
+ for i in range(ni):
+ if async:
+ assert len(rc.read(1)) == 1
+ else:
+ assert rc.read(1)[0] == i
+ # END handle async mode
+ # END pull individual items
+ # too many processing counts ;)
+ elapsed = time.time() - st
+ print >> sys.stderr, "Threadpool: processed %i individual items in chunks of %i, with %i threads, one at a time, in %f s ( %f items / s )" % (ni, ni/4, p.size(), elapsed, ni / elapsed)
+
+ task._assert(ni, ni)
+ assert p.num_tasks() == 1 + null_tasks
+ assert p.remove_task(task) is p # del manually this time
+ assert p.num_tasks() == null_tasks
+
+ # now with we set the minimum count to reduce the number of processing counts
+ task = make_task()
+ task.min_count = ni / 4
+ task.max_chunksize = ni / 4 # match previous setup
+ rc = p.add_task(task)
+ print "read(1) * %i, min_count%i + chunksize" % (ni, task.min_count)
+ for i in range(ni):
+ items = rc.read(1)
+ assert len(items) == 1
+ if not async:
+ assert items[0] == i
+ # END for each item
+ task._assert(ni / task.min_count, ni)
+ del(rc)
+ assert p.num_tasks() == null_tasks
+
+ # test failure
+ # on failure, the processing stops and the task is finished, keeping
+ # his error for later
+ task = make_task()
+ task.should_fail = True
+ rc = p.add_task(task)
+ print "read(0) with failure"
+ assert len(rc.read()) == 0 # failure on first item
+
+ assert isinstance(task.error(), AssertionError)
+ assert task.is_done() # on error, its marked done as well
+ del(rc)
+ assert p.num_tasks() == null_tasks
+
+ # test failure after ni / 2 items
+ # This makes sure it correctly closes the channel on failure to prevent blocking
+ nri = ni/2
+ task = make_task(TestFailureThreadTask, fail_after=ni/2)
+ rc = p.add_task(task)
+ assert len(rc.read()) == nri
+ assert task.is_done()
+ assert isinstance(task.error(), AssertionError)
+
+ print >> sys.stderr, "done with everything"
+
+
+
+ def _assert_async_dependent_tasks(self, pool):
+ # includes failure in center task, 'recursive' orphan cleanup
+ # This will also verify that the channel-close mechanism works
+ # t1 -> t2 -> t3
+
+ print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size()
+ null_tasks = pool.num_tasks()
+ ni = 1000
+ count = 3
+ aic = count + 2
+ make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs)
+
+ ts, rcs = make_task()
+ assert len(ts) == aic
+ assert len(rcs) == aic
+ assert pool.num_tasks() == null_tasks + len(ts)
+
+ # read(0)
+ #########
+ st = time.time()
+ items = rcs[-1].read()
+ elapsed = time.time() - st
+ print len(items), ni
+ assert len(items) == ni
+ del(rcs)
+ assert pool.num_tasks() == 0 # tasks depleted, all done, no handles
+ # wait a tiny moment - there could still be something unprocessed on the
+ # queue, increasing the refcount
+ time.sleep(0.15)
+ assert sys.getrefcount(ts[-1]) == 2 # ts + call
+ assert sys.getrefcount(ts[0]) == 2 # ts + call
+ print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed)
+
+
+ # read(1)
+ #########
+ ts, rcs = make_task()
+ st = time.time()
+ for i in xrange(ni):
+ items = rcs[-1].read(1)
+ assert len(items) == 1
+ # END for each item to pull
+ elapsed_single = time.time() - st
+ # another read yields nothing, its empty
+ assert len(rcs[-1].read()) == 0
+ print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single)
+
+
+ # read with min-count size
+ ###########################
+ # must be faster, as it will read ni / 4 chunks
+ # Its enough to set one task, as it will force all others in the chain
+ # to min_size as well.
+ ts, rcs = make_task()
+ assert pool.num_tasks() == len(ts)
+ nri = ni / 4
+ ts[-1].min_count = nri
+ st = time.time()
+ for i in xrange(ni):
+ items = rcs[-1].read(1)
+ assert len(items) == 1
+ # END for each item to read
+ elapsed_minsize = time.time() - st
+ # its empty
+ assert len(rcs[-1].read()) == 0
+ print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize)
+
+ # it should have been a bit faster at least, and most of the time it is
+ # Sometimes, its not, mainly because:
+ # * The test tasks lock a lot, hence they slow down the system
+ # * Each read will still trigger the pool to evaluate, causing some overhead
+ # even though there are enough items on the queue in that case. Keeping
+ # track of the scheduled items helped there, but it caused further inacceptable
+ # slowdown
+ # assert elapsed_minsize < elapsed_single
+
+
+ # read with failure
+ ###################
+ # it should recover and give at least fail_after items
+ # t1 -> x -> t3
+ fail_after = ni/2
+ ts, rcs = make_task(fail_setup=[(0, fail_after)])
+ items = rcs[-1].read()
+ assert len(items) == fail_after
+
+
+ # MULTI-POOL
+ # If two pools are connected, this shold work as well.
+ # The second one has just one more thread
+ ts, rcs = make_task()
+
+ # connect verifier channel as feeder of the second pool
+ p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes
+ assert p2.size() == 0
+ p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
+ assert p2ts[0] is None # we have no feeder task
+ assert rcs[-1].pool_ref()() is pool # it didnt change the pool
+ assert rcs[-1] is p2ts[1].reader()
+ assert p2.num_tasks() == len(p2ts)-1 # first is None
+
+ # reading from the last one will evaluate all pools correctly
+ print "read(0) multi-pool"
+ st = time.time()
+ items = p2rcs[-1].read()
+ elapsed = time.time() - st
+ assert len(items) == ni
+
+ print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed)
+
+
+ # loose the handles of the second pool to allow others to go as well
+ del(p2rcs); del(p2ts)
+ assert p2.num_tasks() == 0
+
+ # now we lost our old handles as well, and the tasks go away
+ ts, rcs = make_task()
+ assert pool.num_tasks() == len(ts)
+
+ p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
+ assert p2.num_tasks() == len(p2ts) - 1
+
+ # Test multi-read(1)
+ print "read(1) * %i" % ni
+ reader = rcs[-1]
+ st = time.time()
+ for i in xrange(ni):
+ items = reader.read(1)
+ assert len(items) == 1
+ # END for each item to get
+ elapsed = time.time() - st
+ del(reader) # decrement refcount
+
+ print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed)
+
+ # another read is empty
+ assert len(rcs[-1].read()) == 0
+
+ # now that both are connected, I can drop my handle to the reader
+ # without affecting the task-count, but whats more important:
+ # They remove their tasks correctly once we drop our references in the
+ # right order
+ del(p2ts)
+ assert p2rcs[0] is rcs[-1]
+ del(p2rcs)
+ assert p2.num_tasks() == 0
+ del(p2)
+
+ assert pool.num_tasks() == null_tasks + len(ts)
+
+
+ del(ts)
+ del(rcs)
+
+ assert pool.num_tasks() == null_tasks
+
+
+ # ASSERTION: We already tested that one pool behaves correctly when an error
+ # occours - if two pools handle their ref-counts correctly, which they
+ # do if we are here, then they should handle errors happening during
+ # the task processing as expected as well. Hence we can safe this here
+
+
+
+ @terminate_threads
+ def test_base(self):
+ max_wait_attempts = 3
+ sleep_time = 0.1
+ for mc in range(max_wait_attempts):
+ # wait for threads to die
+ if len(threading.enumerate()) != 1:
+ time.sleep(sleep_time)
+ # END for each attempt
+ assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time)
+
+ p = ThreadPool()
+
+ # default pools have no workers
+ assert p.size() == 0
+
+ # increase and decrease the size
+ num_threads = len(threading.enumerate())
+ for i in range(self.max_threads):
+ p.set_size(i)
+ assert p.size() == i
+ assert len(threading.enumerate()) == num_threads + i
+
+ for i in range(self.max_threads, -1, -1):
+ p.set_size(i)
+ assert p.size() == i
+
+ assert p.size() == 0
+ # threads should be killed already, but we let them a tiny amount of time
+ # just to be sure
+ time.sleep(0.05)
+ assert len(threading.enumerate()) == num_threads
+
+ # SINGLE TASK SERIAL SYNC MODE
+ ##############################
+ # put a few unrelated tasks that we forget about - check ref counts and cleanup
+ t1, t2 = TestThreadTask(iter(list()), "nothing1", None), TestThreadTask(iter(list()), "nothing2", None)
+ urc1 = p.add_task(t1)
+ urc2 = p.add_task(t2)
+ assert p.num_tasks() == 2
+
+ ## SINGLE TASK #################
+ self._assert_single_task(p, False)
+ assert p.num_tasks() == 2
+ del(urc1)
+ assert p.num_tasks() == 1
+
+ p.remove_task(t2)
+ assert p.num_tasks() == 0
+ assert sys.getrefcount(t2) == 2
+
+ t3 = TestChannelThreadTask(urc2, "channel", None)
+ urc3 = p.add_task(t3)
+ assert p.num_tasks() == 1
+ del(urc3)
+ assert p.num_tasks() == 0
+ assert sys.getrefcount(t3) == 2
+
+
+ # DEPENDENT TASKS SYNC MODE
+ ###########################
+ self._assert_async_dependent_tasks(p)
+
+
+ # SINGLE TASK THREADED ASYNC MODE ( 1 thread )
+ ##############################################
+ # step one gear up - just one thread for now.
+ p.set_size(1)
+ assert p.size() == 1
+ assert len(threading.enumerate()) == num_threads + 1
+ # deleting the pool stops its threads - just to be sure ;)
+ # Its not synchronized, hence we wait a moment
+ del(p)
+ time.sleep(0.05)
+ assert len(threading.enumerate()) == num_threads
+
+ p = ThreadPool(1)
+ assert len(threading.enumerate()) == num_threads + 1
+
+ # here we go
+ self._assert_single_task(p, True)
+
+
+
+ # SINGLE TASK ASYNC MODE ( 2 threads )
+ ######################################
+ # two threads to compete for a single task
+ p.set_size(2)
+ self._assert_single_task(p, True)
+
+ # real stress test- should be native on every dual-core cpu with 2 hardware
+ # threads per core
+ p.set_size(4)
+ self._assert_single_task(p, True)
+
+
+ # DEPENDENT TASK ASYNC MODE
+ ###########################
+ self._assert_async_dependent_tasks(p)
+
+ print >> sys.stderr, "Done with everything"
+