diff options
Diffstat (limited to 'test/git/async')
-rw-r--r-- | test/git/async/task.py | 190 | ||||
-rw-r--r-- | test/git/async/test_channel.py | 6 | ||||
-rw-r--r-- | test/git/async/test_graph.py | 45 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 343 |
4 files changed, 397 insertions, 187 deletions
diff --git a/test/git/async/task.py b/test/git/async/task.py new file mode 100644 index 00000000..9cc3cb9d --- /dev/null +++ b/test/git/async/task.py @@ -0,0 +1,190 @@ +"""Module containing task implementations useful for testing them""" +from git.async.task import * + +import threading +import weakref + +class _TestTaskBase(object): + """Note: causes great slowdown due to the required locking of task variables""" + def __init__(self, *args, **kwargs): + super(_TestTaskBase, self).__init__(*args, **kwargs) + self.should_fail = False + self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) + self.plock = threading.Lock() + self.item_count = 0 + self.process_count = 0 + + def do_fun(self, item): + self.lock.acquire() + self.item_count += 1 + self.lock.release() + if self.should_fail: + raise AssertionError("I am failing just for the fun of it") + return item + + def process(self, count=1): + # must do it first, otherwise we might read and check results before + # the thread gets here :). Its a lesson ! + self.plock.acquire() + self.process_count += 1 + self.plock.release() + super(_TestTaskBase, self).process(count) + + def _assert(self, pc, fc, check_scheduled=False): + """Assert for num process counts (pc) and num function counts (fc) + :return: self""" + self.lock.acquire() + if self.item_count != fc: + print self.item_count, fc + assert self.item_count == fc + self.lock.release() + + # NOTE: asserting num-writers fails every now and then, implying a thread is + # still processing (an empty chunk) when we are checking it. This can + # only be prevented by checking the scheduled items, which requires locking + # and causes slowdows, so we don't do that. If the num_writers + # counter wouldn't be maintained properly, more tests would fail, so + # we can safely refrain from checking this here + # self._wlock.acquire() + # assert self._num_writers == 0 + # self._wlock.release() + return self + + +class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): + pass + + +class TestThreadFailureNode(TestThreadTaskNode): + """Fails after X items""" + def __init__(self, *args, **kwargs): + self.fail_after = kwargs.pop('fail_after') + super(TestThreadFailureNode, self).__init__(*args, **kwargs) + + def do_fun(self, item): + item = TestThreadTaskNode.do_fun(self, item) + + self.lock.acquire() + try: + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + finally: + self.lock.release() + # END handle fail after + return item + + +class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): + """Apply a transformation on items read from an input channel""" + def __init__(self, *args, **kwargs): + self.fail_after = kwargs.pop('fail_after', 0) + super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs) + + def do_fun(self, item): + """return tuple(i, i*2)""" + item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + + # fail after support + if self.fail_after: + self.lock.acquire() + try: + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + finally: + self.lock.release() + # END handle fail-after + + if isinstance(item, tuple): + i = item[0] + return item + (i * self.id, ) + else: + return (item, item * self.id) + # END handle tuple + + +class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): + """An input channel task, which verifies the result of its input channels, + should be last in the chain. + Id must be int""" + + def do_fun(self, item): + """return tuple(i, i*2)""" + item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) + + # make sure the computation order matches + assert isinstance(item, tuple), "input was no tuple: %s" % item + + base = item[0] + for id, num in enumerate(item[1:]): + assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item)) + # END verify order + + return item + + +#{ Utilities + +def make_proxy_method(t): + """required to prevent binding self into the method we call""" + wt = weakref.proxy(t) + return lambda item: wt.do_fun(item) + +def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0): + """Create a task chain of feeder, count transformers and order verifcator + to the pool p, like t1 -> t2 -> t3 + :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would + make the third transformer fail after 20 items + :param feeder_channel: if set to a channel, it will be used as input of the + first transformation task. The respective first task in the return value + will be None. + :param id_offset: defines the id of the first transformation task, all subsequent + ones will add one + :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" + nt = p.num_tasks() + + feeder = None + frc = feeder_channel + if feeder_channel is None: + feeder = make_iterator_task(ni) + frc = p.add_task(feeder) + # END handle specific feeder + + rcs = [frc] + tasks = [feeder] + + inrc = frc + for tc in xrange(count): + t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None) + + t.fun = make_proxy_method(t) + #t.fun = t.do_fun + inrc = p.add_task(t) + + tasks.append(t) + rcs.append(inrc) + # END create count transformers + + # setup failure + for id, fail_after in fail_setup: + tasks[1+id].fail_after = fail_after + # END setup failure + + verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) + #verifier.fun = verifier.do_fun + verifier.fun = make_proxy_method(verifier) + vrc = p.add_task(verifier) + + + tasks.append(verifier) + rcs.append(vrc) + return tasks, rcs + +def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs): + """:return: task which yields ni items + :param taskcls: the actual iterator type to use + :param **kwargs: additional kwargs to be passed to the task""" + t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) + t.fun = make_proxy_method(t) + return t + +#} END utilities diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py index 215081cd..a24c7c91 100644 --- a/test/git/async/test_channel.py +++ b/test/git/async/test_channel.py @@ -9,8 +9,8 @@ class TestChannels(TestBase): def test_base(self): # creating channel yields a write and a read channal wc, rc = mkchannel() - assert isinstance(wc, WChannel) # default args - assert isinstance(rc, RChannel) + assert isinstance(wc, Writer) # default args + assert isinstance(rc, Reader) # TEST UNLIMITED SIZE CHANNEL - writing+reading is FIFO @@ -46,7 +46,7 @@ class TestChannels(TestBase): # test callback channels - wc, rc = mkchannel(wctype = CallbackWChannel, rctype = CallbackRChannel) + wc, rc = mkchannel(wtype = CallbackWriter, rtype = CallbackReader) cb = [0, 0] # set slots to one if called def pre_write(item): diff --git a/test/git/async/test_graph.py b/test/git/async/test_graph.py index 1a153e2d..7630226b 100644 --- a/test/git/async/test_graph.py +++ b/test/git/async/test_graph.py @@ -3,6 +3,7 @@ from test.testlib import * from git.async.graph import * import time +import sys class TestGraph(TestBase): @@ -19,7 +20,7 @@ class TestGraph(TestBase): # delete unconnected nodes for n in g.nodes[:]: - g.del_node(n) + g.remove_node(n) # END del nodes # add a chain of connected nodes @@ -54,38 +55,26 @@ class TestGraph(TestBase): # deleting a connected node clears its neighbour connections assert n3.in_nodes[0] is n2 - assert g.del_node(n2) is g - assert g.del_node(n2) is g # multi-deletion okay + assert g.remove_node(n2) is g + assert g.remove_node(n2) is g # multi-deletion okay assert len(g.nodes) == nn - 1 assert len(n3.in_nodes) == 0 assert len(n1.out_nodes) == 0 # check the history from the last node - last = g.nodes[-1] - class Visitor(object): - def __init__(self, origin): - self.origin_seen = False - self.origin = origin - self.num_seen = 0 - - def __call__(self, n): - if n is self.origin: - self.origin_seen = True - else: - assert not self.origin_seen, "should see origin last" - # END check origin - self.num_seen += 1 - return True - - def _assert(self, num_expected): - assert self.origin_seen - assert self.num_seen == num_expected - # END visitor helper - end = g.nodes[-1] - visitor = Visitor(end) - g.visit_input_inclusive_depth_first(end, visitor) - + dfirst_nodes = g.input_inclusive_dfirst_reversed(end) num_nodes_seen = nn - 2 # deleted second, which leaves first one disconnected - visitor._assert(num_nodes_seen) + assert len(dfirst_nodes) == num_nodes_seen + assert dfirst_nodes[-1] == end and dfirst_nodes[-2].id == end.id-1 + + # test cleanup + # its at least kept by its graph + assert sys.getrefcount(end) > 3 + del(g) + del(n1); del(n2); del(n3) + del(dfirst_nodes) + del(last) + del(n) + assert sys.getrefcount(end) == 2 diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 2a5e4647..40c6d66e 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -1,169 +1,43 @@ """Channel testing""" from test.testlib import * +from task import * + from git.async.pool import * -from git.async.task import * from git.async.thread import terminate_threads from git.async.util import cpu_count + import threading +import weakref import time import sys -class _TestTaskBase(object): - def __init__(self, *args, **kwargs): - super(_TestTaskBase, self).__init__(*args, **kwargs) - self.should_fail = False - self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) - self.plock = threading.Lock() - self.item_count = 0 - self.process_count = 0 - - def do_fun(self, item): - self.lock.acquire() - self.item_count += 1 - self.lock.release() - if self.should_fail: - raise AssertionError("I am failing just for the fun of it") - return item - - def process(self, count=1): - # must do it first, otherwise we might read and check results before - # the thread gets here :). Its a lesson ! - self.plock.acquire() - self.process_count += 1 - self.plock.release() - super(_TestTaskBase, self).process(count) - - def _assert(self, pc, fc, check_scheduled=False): - """Assert for num process counts (pc) and num function counts (fc) - :return: self""" - self.lock.acquire() - if self.item_count != fc: - print self.item_count, fc - assert self.item_count == fc - self.lock.release() - - return self - -class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): - pass - -class TestThreadFailureNode(TestThreadTaskNode): - """Fails after X items""" - def __init__(self, *args, **kwargs): - self.fail_after = kwargs.pop('fail_after') - super(TestThreadFailureNode, self).__init__(*args, **kwargs) - - def do_fun(self, item): - item = TestThreadTaskNode.do_fun(self, item) - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) - return item - - -class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): - """Apply a transformation on items read from an input channel""" - - def do_fun(self, item): - """return tuple(i, i*2)""" - item = super(TestThreadInputChannelTaskNode, self).do_fun(item) - if isinstance(item, tuple): - i = item[0] - return item + (i * self.id, ) - else: - return (item, item * self.id) - # END handle tuple - - -class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): - """An input channel task, which verifies the result of its input channels, - should be last in the chain. - Id must be int""" - - def do_fun(self, item): - """return tuple(i, i*2)""" - item = super(TestThreadInputChannelTaskNode, self).do_fun(item) - - # make sure the computation order matches - assert isinstance(item, tuple) - - base = item[0] - for num in item[1:]: - assert num == base * 2 - base = num - # END verify order - - return item - - class TestThreadPool(TestBase): max_threads = cpu_count() - def _add_task_chain(self, p, ni, count=1): - """Create a task chain of feeder, count transformers and order verifcator - to the pool p, like t1 -> t2 -> t3 - :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" - nt = p.num_tasks() - - feeder = self._make_iterator_task(ni) - frc = p.add_task(feeder) - - assert p.num_tasks() == nt + 1 - - rcs = [frc] - tasks = [feeder] - - inrc = frc - for tc in xrange(count): - t = TestThreadInputChannelTaskNode(inrc, tc, None) - t.fun = t.do_fun - inrc = p.add_task(t) - - tasks.append(t) - rcs.append(inrc) - assert p.num_tasks() == nt + 2 + tc - # END create count transformers - - verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) - verifier.fun = verifier.do_fun - vrc = p.add_task(verifier) - - assert p.num_tasks() == nt + tc + 3 - - tasks.append(verifier) - rcs.append(vrc) - return tasks, rcs - - def _make_iterator_task(self, ni, taskcls=TestThreadTaskNode, **kwargs): - """:return: task which yields ni items - :param taskcls: the actual iterator type to use - :param **kwargs: additional kwargs to be passed to the task""" - t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) - t.fun = t.do_fun - return t - def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" + # return # DEBUG TODO: Fixme deactivated it print >> sys.stderr, "Threadpool: Starting single task (async = %i) with %i threads" % (async, p.size()) null_tasks = p.num_tasks() # in case we had some before # add a simple task # it iterates n items - ni = 5000 + ni = 1000 assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" - make_task = lambda *args, **kwargs: self._make_iterator_task(ni, *args, **kwargs) + make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs) task = make_task() assert p.num_tasks() == null_tasks rc = p.add_task(task) assert p.num_tasks() == 1 + null_tasks - assert isinstance(rc, RPoolChannel) - assert task._out_wc is not None + assert isinstance(rc, PoolReader) + assert task._out_writer is not None # pull the result completely - we should get one task, which calls its # function once. In sync mode, the order matches @@ -258,8 +132,13 @@ class TestThreadPool(TestBase): assert len(items) == nri task._assert( 5, ni) - assert task.is_done() + + # delete the handle first, causing the task to be removed and to be set + # done. We check for the set-done state later. Depending on the timing, + # The task is not yet set done when we are checking it because we were + # scheduled in before the flag could be set. del(rc) + assert task.is_done() assert p.num_tasks() == null_tasks # depleted # but this only hits if we want too many items, if we want less, it could @@ -312,8 +191,6 @@ class TestThreadPool(TestBase): print "read(0) with failure" assert len(rc.read()) == 0 # failure on first item - print >> sys.stderr, "done with everything" - assert isinstance(task.error(), AssertionError) assert task.is_done() # on error, its marked done as well del(rc) @@ -328,40 +205,181 @@ class TestThreadPool(TestBase): assert task.is_done() assert isinstance(task.error(), AssertionError) + print >> sys.stderr, "done with everything" + def _assert_async_dependent_tasks(self, pool): # includes failure in center task, 'recursive' orphan cleanup # This will also verify that the channel-close mechanism works # t1 -> t2 -> t3 - # t1 -> x -> t3 + + print >> sys.stderr, "Threadpool: starting async dependency test in %i threads" % pool.size() null_tasks = pool.num_tasks() - ni = 100 - count = 1 - make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) + ni = 1000 + count = 3 + aic = count + 2 + make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs) ts, rcs = make_task() - assert len(ts) == count + 2 - assert len(rcs) == count + 2 + assert len(ts) == aic + assert len(rcs) == aic assert pool.num_tasks() == null_tasks + len(ts) - print pool._tasks.nodes + # read(0) + ######### + st = time.time() + items = rcs[-1].read() + elapsed = time.time() - st + print len(items), ni + assert len(items) == ni + del(rcs) + assert pool.num_tasks() == 0 # tasks depleted, all done, no handles + # wait a tiny moment - there could still be something unprocessed on the + # queue, increasing the refcount + time.sleep(0.15) + assert sys.getrefcount(ts[-1]) == 2 # ts + call + assert sys.getrefcount(ts[0]) == 2 # ts + call + print >> sys.stderr, "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed, ni / elapsed) - # in the end, we expect all tasks to be gone, automatically + # read(1) + ######### + ts, rcs = make_task() + st = time.time() + for i in xrange(ni): + items = rcs[-1].read(1) + assert len(items) == 1 + # END for each item to pull + elapsed_single = time.time() - st + # another read yields nothing, its empty + assert len(rcs[-1].read()) == 0 + print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1) of %i dependent in %f s ( %i items / s )" % (ni, aic, elapsed_single, ni / elapsed_single) + + + # read with min-count size + ########################### + # must be faster, as it will read ni / 4 chunks + # Its enough to set one task, as it will force all others in the chain + # to min_size as well. + ts, rcs = make_task() + assert pool.num_tasks() == len(ts) + nri = ni / 4 + ts[-1].min_count = nri + st = time.time() + for i in xrange(ni): + items = rcs[-1].read(1) + assert len(items) == 1 + # END for each item to read + elapsed_minsize = time.time() - st + # its empty + assert len(rcs[-1].read()) == 0 + print >> sys.stderr, "Dependent Tasks: evaluated %i items with read(1), min_size=%i, of %i dependent in %f s ( %i items / s )" % (ni, nri, aic, elapsed_minsize, ni / elapsed_minsize) + + # it should have been a bit faster at least, and most of the time it is + # Sometimes, its not, mainly because: + # * The test tasks lock a lot, hence they slow down the system + # * Each read will still trigger the pool to evaluate, causing some overhead + # even though there are enough items on the queue in that case. Keeping + # track of the scheduled items helped there, but it caused further inacceptable + # slowdown + # assert elapsed_minsize < elapsed_single + + + # read with failure + ################### + # it should recover and give at least fail_after items + # t1 -> x -> t3 + fail_after = ni/2 + ts, rcs = make_task(fail_setup=[(0, fail_after)]) + items = rcs[-1].read() + assert len(items) == fail_after + + + # MULTI-POOL + # If two pools are connected, this shold work as well. + # The second one has just one more thread + ts, rcs = make_task() + + # connect verifier channel as feeder of the second pool + p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes + assert p2.size() == 0 + p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2ts[0] is None # we have no feeder task + assert rcs[-1].pool_ref()() is pool # it didnt change the pool + assert rcs[-1] is p2ts[1].reader() + assert p2.num_tasks() == len(p2ts)-1 # first is None + + # reading from the last one will evaluate all pools correctly + print "read(0) multi-pool" + st = time.time() + items = p2rcs[-1].read() + elapsed = time.time() - st + assert len(items) == ni + + print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(0), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) + + + # loose the handles of the second pool to allow others to go as well + del(p2rcs); del(p2ts) + assert p2.num_tasks() == 0 + + # now we lost our old handles as well, and the tasks go away + ts, rcs = make_task() + assert pool.num_tasks() == len(ts) + + p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + assert p2.num_tasks() == len(p2ts) - 1 + + # Test multi-read(1) + print "read(1) * %i" % ni + reader = rcs[-1] + st = time.time() + for i in xrange(ni): + items = reader.read(1) + assert len(items) == 1 + # END for each item to get + elapsed = time.time() - st + del(reader) # decrement refcount + + print >> sys.stderr, "Dependent Tasks: evaluated 2 connected pools and %i items with read(1), of %i dependent tasks in %f s ( %i items / s )" % (ni, aic + aic-1, elapsed, ni / elapsed) + + # another read is empty + assert len(rcs[-1].read()) == 0 + + # now that both are connected, I can drop my handle to the reader + # without affecting the task-count, but whats more important: + # They remove their tasks correctly once we drop our references in the + # right order + del(p2ts) + assert p2rcs[0] is rcs[-1] + del(p2rcs) + assert p2.num_tasks() == 0 + del(p2) + + assert pool.num_tasks() == null_tasks + len(ts) - # order of deletion matters - just keep the end, then delete - final_rc = rcs[-1] del(ts) del(rcs) - del(final_rc) - assert pool.num_tasks() == null_tasks + + assert pool.num_tasks() == null_tasks + + + # ASSERTION: We already tested that one pool behaves correctly when an error + # occours - if two pools handle their ref-counts correctly, which they + # do if we are here, then they should handle errors happening during + # the task processing as expected as well. Hence we can safe this here + - @terminate_threads + # for some reason, sometimes it has multiple workerthreads already when he + # enters the method ... dunno yet, pools should clean up themselvess + #@terminate_threads def test_base(self): + assert len(threading.enumerate()) == 1 + p = ThreadPool() # default pools have no workers @@ -386,17 +404,28 @@ class TestThreadPool(TestBase): # SINGLE TASK SERIAL SYNC MODE ############################## - # put a few unrelated tasks that we forget about - urc1 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) - urc2 = p.add_task(TestThreadTaskNode(iter(list()), "nothing", None)) + # put a few unrelated tasks that we forget about - check ref counts and cleanup + t1, t2 = TestThreadTaskNode(iter(list()), "nothing1", None), TestThreadTaskNode(iter(list()), "nothing2", None) + urc1 = p.add_task(t1) + urc2 = p.add_task(t2) assert p.num_tasks() == 2 ## SINGLE TASK ################# self._assert_single_task(p, False) assert p.num_tasks() == 2 del(urc1) - del(urc2) + assert p.num_tasks() == 1 + + p.remove_task(t2) + assert p.num_tasks() == 0 + assert sys.getrefcount(t2) == 2 + + t3 = TestThreadInputChannelTaskNode(urc2, "channel", None) + urc3 = p.add_task(t3) + assert p.num_tasks() == 1 + del(urc3) assert p.num_tasks() == 0 + assert sys.getrefcount(t3) == 2 # DEPENDENT TASKS SYNC MODE @@ -438,4 +467,6 @@ class TestThreadPool(TestBase): ########################### self._assert_async_dependent_tasks(p) - + print >> sys.stderr, "Done with everything" + + # TODO: test multi-pool connections |