diff options
-rw-r--r-- | lib/git/async/util.py | 53 | ||||
-rw-r--r-- | test/git/async/test_pool.py | 6 |
2 files changed, 54 insertions, 5 deletions
diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 55766579..e3556c05 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -133,12 +133,55 @@ class HSCondition(_Condition): class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance""" + __slots__ = ('mutex', 'not_empty', 'queue') + def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - + self.queue = deque() + self.mutex = Lock() self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) - + def qsize(self): + self.mutex.acquire() + try: + return len(self.queue) + finally: + self.mutex.release() + + def empty(self): + self.mutex.acquire() + try: + return not len(self.queue) + finally: + self.mutex.release() + + def put(self, item, block=True, timeout=None): + self.mutex.acquire() + self.queue.append(item) + self.mutex.release() + self.not_empty.notify() + + def get(self, block=True, timeout=None): + self.not_empty.acquire() + q = self.queue + try: + if not block: + if not len(q): + raise Empty + elif timeout is None: + while not len(q): + self.not_empty.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a positive number") + else: + endtime = _time() + timeout + while not len(q): + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + return q.popleft() + finally: + self.not_empty.release() + + #} END utilities diff --git a/test/git/async/test_pool.py b/test/git/async/test_pool.py index 29c13188..0d779f39 100644 --- a/test/git/async/test_pool.py +++ b/test/git/async/test_pool.py @@ -61,6 +61,12 @@ class TestThreadPool(TestBase): max_threads = cpu_count() + def _add_triple_task(self, p): + """Add a triplet of feeder, transformer and finalizer to the pool, like + t1 -> t2 -> t3, return all 3 return channels in order""" + t1 = TestThreadTaskNode(make_iter(), 'iterator', None) + # TODO: + def _assert_single_task(self, p, async=False): """Performs testing in a synchronized environment""" null_tasks = p.num_tasks() # in case we had some before |