diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-11 14:37:51 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-11 14:37:51 +0200 |
commit | 5ff864138cd1e680a78522c26b583639f8f5e313 (patch) | |
tree | f78ba0335c9a620406c04837ffe60d0fff8d9531 /test/git/async/test_pool.py | |
parent | 583e6a25b0d891a2f531a81029f2bac0c237cbf9 (diff) | |
download | gitpython-5ff864138cd1e680a78522c26b583639f8f5e313.tar.gz |
test.async: split test_pool up into task implenetations and related utilities, as well as the tests themselves. File became too large
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r-- | test/git/async/test_pool.py | 186 |
1 files changed, 6 insertions, 180 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 5bb48cc2..0fa34f6a 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -1,196 +1,22 @@ """Channel testing""" from test.testlib import * +from task import * + from git.async.pool import * -from git.async.task import * from git.async.thread import terminate_threads from git.async.util import cpu_count + import threading import weakref import time import sys -class _TestTaskBase(object): - """Note: causes great slowdown due to the required locking of task variables""" - def __init__(self, *args, **kwargs): - super(_TestTaskBase, self).__init__(*args, **kwargs) - self.should_fail = False - self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) - self.plock = threading.Lock() - self.item_count = 0 - self.process_count = 0 - - def do_fun(self, item): - self.lock.acquire() - self.item_count += 1 - self.lock.release() - if self.should_fail: - raise AssertionError("I am failing just for the fun of it") - return item - - def process(self, count=1): - # must do it first, otherwise we might read and check results before - # the thread gets here :). Its a lesson ! - self.plock.acquire() - self.process_count += 1 - self.plock.release() - super(_TestTaskBase, self).process(count) - - def _assert(self, pc, fc, check_scheduled=False): - """Assert for num process counts (pc) and num function counts (fc) - :return: self""" - self.lock.acquire() - if self.item_count != fc: - print self.item_count, fc - assert self.item_count == fc - self.lock.release() - self._wlock.acquire() - assert self._num_writers == 0 - self._wlock.release() - return self - - -class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): - pass - - -class TestThreadFailureNode(TestThreadTaskNode): - """Fails after X items""" - def __init__(self, *args, **kwargs): - self.fail_after = kwargs.pop('fail_after') - super(TestThreadFailureNode, self).__init__(*args, **kwargs) - - def do_fun(self, item): - item = TestThreadTaskNode.do_fun(self, item) - - self.lock.acquire() - try: - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) - finally: - self.lock.release() - # END handle fail after - return item - - -class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): - """Apply a transformation on items read from an input channel""" - def __init__(self, *args, **kwargs): - self.fail_after = kwargs.pop('fail_after', 0) - super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs) - - def do_fun(self, item): - """return tuple(i, i*2)""" - item = super(TestThreadInputChannelTaskNode, self).do_fun(item) - - # fail after support - if self.fail_after: - self.lock.acquire() - try: - if self.item_count > self.fail_after: - raise AssertionError("Simulated failure after processing %i items" % self.fail_after) - finally: - self.lock.release() - # END handle fail-after - - if isinstance(item, tuple): - i = item[0] - return item + (i * self.id, ) - else: - return (item, item * self.id) - # END handle tuple -class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): - """An input channel task, which verifies the result of its input channels, - should be last in the chain. - Id must be int""" - - def do_fun(self, item): - """return tuple(i, i*2)""" - item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) - - # make sure the computation order matches - assert isinstance(item, tuple), "input was no tuple: %s" % item - - base = item[0] - for id, num in enumerate(item[1:]): - assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item)) - # END verify order - - return item - - - class TestThreadPool(TestBase): max_threads = cpu_count() - - def _make_proxy_method(self, t): - """required to prevent binding self into the method we call""" - wt = weakref.proxy(t) - return lambda item: wt.do_fun(item) - - def _add_task_chain(self, p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0): - """Create a task chain of feeder, count transformers and order verifcator - to the pool p, like t1 -> t2 -> t3 - :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would - make the third transformer fail after 20 items - :param feeder_channel: if set to a channel, it will be used as input of the - first transformation task. The respective first task in the return value - will be None. - :param id_offset: defines the id of the first transformation task, all subsequent - ones will add one - :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" - nt = p.num_tasks() - - feeder = None - frc = feeder_channel - if feeder_channel is None: - feeder = self._make_iterator_task(ni) - frc = p.add_task(feeder) - # END handle specific feeder - - rcs = [frc] - tasks = [feeder] - - make_proxy_method = self._make_proxy_method - - inrc = frc - for tc in xrange(count): - t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None) - - t.fun = make_proxy_method(t) - #t.fun = t.do_fun - inrc = p.add_task(t) - - tasks.append(t) - rcs.append(inrc) - # END create count transformers - - # setup failure - for id, fail_after in fail_setup: - tasks[1+id].fail_after = fail_after - # END setup failure - - verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) - #verifier.fun = verifier.do_fun - verifier.fun = make_proxy_method(verifier) - vrc = p.add_task(verifier) - - - tasks.append(verifier) - rcs.append(vrc) - return tasks, rcs - - def _make_iterator_task(self, ni, taskcls=TestThreadTaskNode, **kwargs): - """:return: task which yields ni items - :param taskcls: the actual iterator type to use - :param **kwargs: additional kwargs to be passed to the task""" - t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) - t.fun = self._make_proxy_method(t) - return t - def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" # return # DEBUG TODO: Fixme deactivated it @@ -203,7 +29,7 @@ class TestThreadPool(TestBase): assert ni % 2 == 0, "ni needs to be dividable by 2" assert ni % 4 == 0, "ni needs to be dividable by 4" - make_task = lambda *args, **kwargs: self._make_iterator_task(ni, *args, **kwargs) + make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs) task = make_task() @@ -388,7 +214,7 @@ class TestThreadPool(TestBase): ni = 1000 count = 3 aic = count + 2 - make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs) + make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs) ts, rcs = make_task() assert len(ts) == aic @@ -473,7 +299,7 @@ class TestThreadPool(TestBase): # connect verifier channel as feeder of the second pool p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes assert p2.size() == 0 - p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) + p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count) assert p2ts[0] is None # we have no feeder task assert rcs[-1].pool_ref()() is pool # it didnt change the pool assert rcs[-1] is p2ts[1].reader() |