summaryrefslogtreecommitdiff
path: root/test/git/async/test_pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-11 14:37:51 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-11 14:37:51 +0200
commit5ff864138cd1e680a78522c26b583639f8f5e313 (patch)
treef78ba0335c9a620406c04837ffe60d0fff8d9531 /test/git/async/test_pool.py
parent583e6a25b0d891a2f531a81029f2bac0c237cbf9 (diff)
downloadgitpython-5ff864138cd1e680a78522c26b583639f8f5e313.tar.gz
test.async: split test_pool up into task implenetations and related utilities, as well as the tests themselves. File became too large
Diffstat (limited to 'test/git/async/test_pool.py')
-rw-r--r--test/git/async/test_pool.py186
1 files changed, 6 insertions, 180 deletions
diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py
index 5bb48cc2..0fa34f6a 100644
--- a/test/git/async/test_pool.py
+++ b/test/git/async/test_pool.py
@@ -1,196 +1,22 @@
"""Channel testing"""
from test.testlib import *
+from task import *
+
from git.async.pool import *
-from git.async.task import *
from git.async.thread import terminate_threads
from git.async.util import cpu_count
+
import threading
import weakref
import time
import sys
-class _TestTaskBase(object):
- """Note: causes great slowdown due to the required locking of task variables"""
- def __init__(self, *args, **kwargs):
- super(_TestTaskBase, self).__init__(*args, **kwargs)
- self.should_fail = False
- self.lock = threading.Lock() # yes, can't safely do x = x + 1 :)
- self.plock = threading.Lock()
- self.item_count = 0
- self.process_count = 0
-
- def do_fun(self, item):
- self.lock.acquire()
- self.item_count += 1
- self.lock.release()
- if self.should_fail:
- raise AssertionError("I am failing just for the fun of it")
- return item
-
- def process(self, count=1):
- # must do it first, otherwise we might read and check results before
- # the thread gets here :). Its a lesson !
- self.plock.acquire()
- self.process_count += 1
- self.plock.release()
- super(_TestTaskBase, self).process(count)
-
- def _assert(self, pc, fc, check_scheduled=False):
- """Assert for num process counts (pc) and num function counts (fc)
- :return: self"""
- self.lock.acquire()
- if self.item_count != fc:
- print self.item_count, fc
- assert self.item_count == fc
- self.lock.release()
- self._wlock.acquire()
- assert self._num_writers == 0
- self._wlock.release()
- return self
-
-
-class TestThreadTaskNode(_TestTaskBase, InputIteratorThreadTask):
- pass
-
-
-class TestThreadFailureNode(TestThreadTaskNode):
- """Fails after X items"""
- def __init__(self, *args, **kwargs):
- self.fail_after = kwargs.pop('fail_after')
- super(TestThreadFailureNode, self).__init__(*args, **kwargs)
-
- def do_fun(self, item):
- item = TestThreadTaskNode.do_fun(self, item)
-
- self.lock.acquire()
- try:
- if self.item_count > self.fail_after:
- raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
- finally:
- self.lock.release()
- # END handle fail after
- return item
-
-
-class TestThreadInputChannelTaskNode(_TestTaskBase, InputChannelTask):
- """Apply a transformation on items read from an input channel"""
- def __init__(self, *args, **kwargs):
- self.fail_after = kwargs.pop('fail_after', 0)
- super(TestThreadInputChannelTaskNode, self).__init__(*args, **kwargs)
-
- def do_fun(self, item):
- """return tuple(i, i*2)"""
- item = super(TestThreadInputChannelTaskNode, self).do_fun(item)
-
- # fail after support
- if self.fail_after:
- self.lock.acquire()
- try:
- if self.item_count > self.fail_after:
- raise AssertionError("Simulated failure after processing %i items" % self.fail_after)
- finally:
- self.lock.release()
- # END handle fail-after
-
- if isinstance(item, tuple):
- i = item[0]
- return item + (i * self.id, )
- else:
- return (item, item * self.id)
- # END handle tuple
-class TestThreadInputChannelVerifyTaskNode(_TestTaskBase, InputChannelTask):
- """An input channel task, which verifies the result of its input channels,
- should be last in the chain.
- Id must be int"""
-
- def do_fun(self, item):
- """return tuple(i, i*2)"""
- item = super(TestThreadInputChannelVerifyTaskNode, self).do_fun(item)
-
- # make sure the computation order matches
- assert isinstance(item, tuple), "input was no tuple: %s" % item
-
- base = item[0]
- for id, num in enumerate(item[1:]):
- assert num == base * id, "%i != %i, orig = %s" % (num, base * id, str(item))
- # END verify order
-
- return item
-
-
-
class TestThreadPool(TestBase):
max_threads = cpu_count()
-
- def _make_proxy_method(self, t):
- """required to prevent binding self into the method we call"""
- wt = weakref.proxy(t)
- return lambda item: wt.do_fun(item)
-
- def _add_task_chain(self, p, ni, count=1, fail_setup=list(), feeder_channel=None, id_offset=0):
- """Create a task chain of feeder, count transformers and order verifcator
- to the pool p, like t1 -> t2 -> t3
- :param fail_setup: a list of pairs, task_id, fail_after, i.e. [(2, 20)] would
- make the third transformer fail after 20 items
- :param feeder_channel: if set to a channel, it will be used as input of the
- first transformation task. The respective first task in the return value
- will be None.
- :param id_offset: defines the id of the first transformation task, all subsequent
- ones will add one
- :return: tuple(list(task1, taskN, ...), list(rc1, rcN, ...))"""
- nt = p.num_tasks()
-
- feeder = None
- frc = feeder_channel
- if feeder_channel is None:
- feeder = self._make_iterator_task(ni)
- frc = p.add_task(feeder)
- # END handle specific feeder
-
- rcs = [frc]
- tasks = [feeder]
-
- make_proxy_method = self._make_proxy_method
-
- inrc = frc
- for tc in xrange(count):
- t = TestThreadInputChannelTaskNode(inrc, tc+id_offset, None)
-
- t.fun = make_proxy_method(t)
- #t.fun = t.do_fun
- inrc = p.add_task(t)
-
- tasks.append(t)
- rcs.append(inrc)
- # END create count transformers
-
- # setup failure
- for id, fail_after in fail_setup:
- tasks[1+id].fail_after = fail_after
- # END setup failure
-
- verifier = TestThreadInputChannelVerifyTaskNode(inrc, 'verifier', None)
- #verifier.fun = verifier.do_fun
- verifier.fun = make_proxy_method(verifier)
- vrc = p.add_task(verifier)
-
-
- tasks.append(verifier)
- rcs.append(vrc)
- return tasks, rcs
-
- def _make_iterator_task(self, ni, taskcls=TestThreadTaskNode, **kwargs):
- """:return: task which yields ni items
- :param taskcls: the actual iterator type to use
- :param **kwargs: additional kwargs to be passed to the task"""
- t = taskcls(iter(range(ni)), 'iterator', None, **kwargs)
- t.fun = self._make_proxy_method(t)
- return t
-
def _assert_single_task(self, p, async=False):
"""Performs testing in a synchronized environment"""
# return # DEBUG TODO: Fixme deactivated it
@@ -203,7 +29,7 @@ class TestThreadPool(TestBase):
assert ni % 2 == 0, "ni needs to be dividable by 2"
assert ni % 4 == 0, "ni needs to be dividable by 4"
- make_task = lambda *args, **kwargs: self._make_iterator_task(ni, *args, **kwargs)
+ make_task = lambda *args, **kwargs: make_iterator_task(ni, *args, **kwargs)
task = make_task()
@@ -388,7 +214,7 @@ class TestThreadPool(TestBase):
ni = 1000
count = 3
aic = count + 2
- make_task = lambda *args, **kwargs: self._add_task_chain(pool, ni, count, *args, **kwargs)
+ make_task = lambda *args, **kwargs: add_task_chain(pool, ni, count, *args, **kwargs)
ts, rcs = make_task()
assert len(ts) == aic
@@ -473,7 +299,7 @@ class TestThreadPool(TestBase):
# connect verifier channel as feeder of the second pool
p2 = ThreadPool(0) # don't spawn new threads, they have the tendency not to wake up on mutexes
assert p2.size() == 0
- p2ts, p2rcs = self._add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
+ p2ts, p2rcs = add_task_chain(p2, ni, count, feeder_channel=rcs[-1], id_offset=count)
assert p2ts[0] is None # we have no feeder task
assert rcs[-1].pool_ref()() is pool # it didnt change the pool
assert rcs[-1] is p2ts[1].reader()