From 5ff864138cd1e680a78522c26b583639f8f5e313 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 14:37:51 +0200 Subject: test.async: split test_pool up into task implenetations and related utilities, as well as the tests themselves. File became too large --- test/git/async/task.py | 190 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 test/git/async/task.py (limited to 'test/git/async/task.py') diff --git a/test/git/async/task.py b/test/git/async/task.py new file mode 100644 index 00000000..9cc3cb9d --- /dev/null +++ b/test/git/async/task.py @@ -0,0 +1,190 @@ +"""Module containing task implementations useful for testing them""" +from git.async.task import * + +import threading +import weakref + +class _TestTaskBase(object): + """Note: causes great slowdown due to the required locking of task variables""" + def __init__(self, *args, **kwargs): + super(_TestTaskBase, self).__init__(*args, **kwargs) + self.should_fail = False + self.lock = threading.Lock() # yes, can't safely do x = x + 1 :) + self.plock = threading.Lock() + self.item_count = 0 + self.process_count = 0 + + def do_fun(self, item): + self.lock.acquire() + self.item_count += 1 + self.lock.release() + if self.should_fail: + raise AssertionError("I am failing just for the fun of it") + return item + + def process(self, count=1): + # must do it first, otherwise we might read and check results before + # the thread gets here :). Its a lesson ! + self.plock.acquire() + self.process_count += 1 + self.plock.release() + super(_TestTaskBase, self).process(count) + + def _assert(self, pc, fc, check_scheduled=False): + """Assert for num process counts (pc) and num function counts (fc) + :return: self""" + self.lock.acquire() + if self.item_count != fc: + print self.item_count, fc + assert self.item_count == fc + self.lock.release() + + # NOTE: asserting num-writers fails every now and then, implying a thread is + # still processing (an empty chunk) when we are checking it. This can + # only be prevented by checking the scheduled items, which requires locking + # and causes slowdows, so we don't do that. If the num_writers + # counter wouldn't be maintained properly, more tests would fail, so + # we can safely refrain from checking this here + # self._wlock.acquire() + # assert self._num_writers == 0 + # self._wlock.release() + return self + + +class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): + pass + + +class TestThreadFailureNode(TestThreadTaskNode): + """Fails after X items""" + def __init__(self, *args, **kwargs): + self.fail_after = kwargs.pop('fail_after') + super(TestThreadFailureNode, self).__init__(*args, **kwargs) + + def do_fun(self, item): + item = TestThreadTaskNode.do_fun(self, item) + + self.lock.acquire() + try: + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + finally: + self.lock.release() + # END handle fail after + return item + + +class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): + """Apply a transformation on items read from an input channel""" + def __init__(self, *args, **kwargs): + self.fail_after = kwargs.pop('fail_after', 0) + super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs) + + def do_fun(self, item): + """return tuple(i, i*2)""" + item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + + # fail after support + if self.fail_after: + self.lock.acquire() + try: + if self.item_count > self.fail_after: + raise AssertionError("Simulated failure after processing %i items" % self.fail_after) + finally: + self.lock.release() + # END handle fail-after + + if isinstance(item, tuple): + i = item[0] + return item + (i * self.id, ) + else: + return (item, item * self.id) + # END handle tuple + + +class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): + """An input channel task, which verifies the result of its input channels, + should be last in the chain. + Id must be int""" + + def do_fun(self, item): + """return tuple(i, i*2)""" + item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) + + # make sure the computation order matches + assert isinstance(item, tuple), "input was no tuple: %s" % item + + base = item[0] + for id, num in enumerate(item[1:]): + assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item)) + # END verify order + + return item + + +#{ Utilities + +def make_proxy_method(t): + """required to prevent binding self into the method we call""" + wt = weakref.proxy(t) + return lambda item: wt.do_fun(item) + +def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0): + """Create a task chain of feeder, count transformers and order verifcator + to the pool p, like t1 -> t2 -> t3 + :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would + make the third transformer fail after 20 items + :param feeder_channel: if set to a channel, it will be used as input of the + first transformation task. The respective first task in the return value + will be None. + :param id_offset: defines the id of the first transformation task, all subsequent + ones will add one + :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))""" + nt = p.num_tasks() + + feeder = None + frc = feeder_channel + if feeder_channel is None: + feeder = make_iterator_task(ni) + frc = p.add_task(feeder) + # END handle specific feeder + + rcs = [frc] + tasks = [feeder] + + inrc = frc + for tc in xrange(count): + t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None) + + t.fun = make_proxy_method(t) + #t.fun = t.do_fun + inrc = p.add_task(t) + + tasks.append(t) + rcs.append(inrc) + # END create count transformers + + # setup failure + for id, fail_after in fail_setup: + tasks[1+id].fail_after = fail_after + # END setup failure + + verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) + #verifier.fun = verifier.do_fun + verifier.fun = make_proxy_method(verifier) + vrc = p.add_task(verifier) + + + tasks.append(verifier) + rcs.append(vrc) + return tasks, rcs + +def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs): + """:return: task which yields ni items + :param taskcls: the actual iterator type to use + :param **kwargs: additional kwargs to be passed to the task""" + t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) + t.fun = make_proxy_method(t) + return t + +#} END utilities -- cgit v1.2.1 From a28942bdf01f4ddb9d0b5a0489bd6f4e101dd775 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 20:13:21 +0200 Subject: Added performance test, improved iterator task which will now be usable by default. It shows that there must be the notion of a producer, which can work if there are no items read --- test/git/async/task.py | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) (limited to 'test/git/async/task.py') diff --git a/test/git/async/task.py b/test/git/async/task.py index 9cc3cb9d..f3599efe 100644 --- a/test/git/async/task.py +++ b/test/git/async/task.py @@ -102,6 +102,14 @@ class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): # END handle tuple +class TestThreadPerformanceTaskNode(InputChannelTask): + """Applies no operation to the item, and does not lock, measuring + the actual throughput of the system""" + + def do_fun(self, item): + return item + + class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): """An input channel task, which verifies the result of its input channels, should be last in the chain. @@ -121,7 +129,6 @@ class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): return item - #{ Utilities def make_proxy_method(t): @@ -129,7 +136,9 @@ def make_proxy_method(t): wt = weakref.proxy(t) return lambda item: wt.do_fun(item) -def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0): +def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0, + feedercls=TestThreadTaskNode, transformercls=TestThreadInputChannelTaskNode, + include_verifier=True): """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would @@ -145,7 +154,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of feeder = None frc = feeder_channel if feeder_channel is None: - feeder = make_iterator_task(ni) + feeder = make_iterator_task(ni, taskcls=feedercls) frc = p.add_task(feeder) # END handle specific feeder @@ -154,7 +163,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of inrc = frc for tc in xrange(count): - t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None) + t = transformercls(inrc, tc+id_offset, None) t.fun = make_proxy_method(t) #t.fun = t.do_fun @@ -169,14 +178,16 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of tasks[1+id].fail_after = fail_after # END setup failure - verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) - #verifier.fun = verifier.do_fun - verifier.fun = make_proxy_method(verifier) - vrc = p.add_task(verifier) - - - tasks.append(verifier) - rcs.append(vrc) + if include_verifier: + verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) + #verifier.fun = verifier.do_fun + verifier.fun = make_proxy_method(verifier) + vrc = p.add_task(verifier) + + + tasks.append(verifier) + rcs.append(vrc) + # END handle include verifier return tasks, rcs def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs): @@ -184,7 +195,8 @@ def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs): :param taskcls: the actual iterator type to use :param **kwargs: additional kwargs to be passed to the task""" t = taskcls(iter(range(ni)), 'iterator', None, **kwargs) - t.fun = make_proxy_method(t) + if isinstance(t, _TestTaskBase): + t.fun = make_proxy_method(t) return t #} END utilities -- cgit v1.2.1 From 7a0b79ee574999ecbc76696506352e4a5a0d7159 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Jun 2010 12:38:02 +0200 Subject: task: improved naming of task types, improved pool test to be less dependent on starting with just the main thread --- test/git/async/task.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) (limited to 'test/git/async/task.py') diff --git a/test/git/async/task.py b/test/git/async/task.py index f3599efe..583cb1f8 100644 --- a/test/git/async/task.py +++ b/test/git/async/task.py @@ -51,18 +51,18 @@ class _TestTaskBase(object): return self -class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask): +class TestThreadTask(_TestTaskBase, IteratorThreadTask): pass -class TestThreadFailureNode(TestThreadTaskNode): +class TestFailureThreadTask(TestThreadTask): """Fails after X items""" def __init__(self, *args, **kwargs): self.fail_after = kwargs.pop('fail_after') - super(TestThreadFailureNode, self).__init__(*args, **kwargs) + super(TestFailureThreadTask, self).__init__(*args, **kwargs) def do_fun(self, item): - item = TestThreadTaskNode.do_fun(self, item) + item = TestThreadTask.do_fun(self, item) self.lock.acquire() try: @@ -74,15 +74,15 @@ class TestThreadFailureNode(TestThreadTaskNode): return item -class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): +class TestChannelThreadTask(_TestTaskBase, ChannelThreadTask): """Apply a transformation on items read from an input channel""" def __init__(self, *args, **kwargs): self.fail_after = kwargs.pop('fail_after', 0) - super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs) + super(TestChannelThreadTask, self).__init__(*args, **kwargs) def do_fun(self, item): """return tuple(i, i*2)""" - item = super(TestThreadInputChannelTaskNode, self).do_fun(item) + item = super(TestChannelThreadTask, self).do_fun(item) # fail after support if self.fail_after: @@ -102,7 +102,7 @@ class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask): # END handle tuple -class TestThreadPerformanceTaskNode(InputChannelTask): +class TestPerformanceThreadTask(ChannelThreadTask): """Applies no operation to the item, and does not lock, measuring the actual throughput of the system""" @@ -110,14 +110,14 @@ class TestThreadPerformanceTaskNode(InputChannelTask): return item -class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask): +class TestVerifyChannelThreadTask(_TestTaskBase, ChannelThreadTask): """An input channel task, which verifies the result of its input channels, should be last in the chain. Id must be int""" def do_fun(self, item): """return tuple(i, i*2)""" - item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item) + item = super(TestVerifyChannelThreadTask, self).do_fun(item) # make sure the computation order matches assert isinstance(item, tuple), "input was no tuple: %s" % item @@ -137,7 +137,7 @@ def make_proxy_method(t): return lambda item: wt.do_fun(item) def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0, - feedercls=TestThreadTaskNode, transformercls=TestThreadInputChannelTaskNode, + feedercls=TestThreadTask, transformercls=TestChannelThreadTask, include_verifier=True): """Create a task chain of feeder, count transformers and order verifcator to the pool p, like t1 -> t2 -> t3 @@ -179,7 +179,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of # END setup failure if include_verifier: - verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None) + verifier = TestVerifyChannelThreadTask(inrc, 'verifier', None) #verifier.fun = verifier.do_fun verifier.fun = make_proxy_method(verifier) vrc = p.add_task(verifier) @@ -190,7 +190,7 @@ def add_task_chain(p, ni, count=1, fail_setup=list(), feeder_channel=None, id_of # END handle include verifier return tasks, rcs -def make_iterator_task(ni, taskcls=TestThreadTaskNode, **kwargs): +def make_iterator_task(ni, taskcls=TestThreadTask, **kwargs): """:return: task which yields ni items :param taskcls: the actual iterator type to use :param **kwargs: additional kwargs to be passed to the task""" -- cgit v1.2.1