diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 19:12:44 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 19:12:44 +0200 |
commit | 583cd8807259a69fc01874b798f657c1f9ab7828 (patch) | |
tree | 046847c4dcd33f5b30c00ff65770039fc72dd148 /lib/git/async/util.py | |
parent | edd9e23c766cfd51b3a6f6eee5aac0b791ef2fd0 (diff) | |
download | gitpython-583cd8807259a69fc01874b798f657c1f9ab7828.tar.gz |
Moved pool utilities into util module, fixed critical issue that caused havok - lets call this a safe-state
Diffstat (limited to 'lib/git/async/util.py')
-rw-r--r-- | lib/git/async/util.py | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/lib/git/async/util.py b/lib/git/async/util.py index dabd8a42..432d1736 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -1,8 +1,23 @@ """Module with utilities related to async operations""" +from threading import ( + Lock, + _Condition, + _sleep, + _time, + ) + +from Queue import ( + Queue, + Empty, + ) + +from collections import deque import sys import os +#{ Routines + def cpu_count(): """:return:number of CPUs in the system :note: inspired by multiprocessing""" @@ -22,3 +37,94 @@ def cpu_count(): raise NotImplementedError('cannot determine number of cpus') return num + +#} END routines + + +class SyncQueue(deque): + """Adapter to allow using a deque like a queue, without locking""" + def get(self, block=True, timeout=None): + try: + return self.pop() + except IndexError: + raise Empty + # END raise empty + + def empty(self): + return len(self) == 0 + + put = deque.append + + +class HSCondition(_Condition): + """An attempt to make conditions less blocking, which gains performance + in return by sleeping less""" + delay = 0.00002 # reduces wait times, but increases overhead + + def wait(self, timeout=None): + waiter = Lock() + waiter.acquire() + self.__dict__['_Condition__waiters'].append(waiter) + saved_state = self._release_save() + try: # restore state no matter what (e.g., KeyboardInterrupt) + if timeout is None: + waiter.acquire() + else: + # Balancing act: We can't afford a pure busy loop, so we + # have to sleep; but if we sleep the whole timeout time, + # we'll be unresponsive. The scheme here sleeps very + # little at first, longer as time goes on, but never longer + # than 20 times per second (or the timeout time remaining). + endtime = _time() + timeout + delay = self.delay + acquire = waiter.acquire + while True: + gotit = acquire(0) + if gotit: + break + remaining = endtime - _time() + if remaining <= 0: + break + delay = min(delay * 2, remaining, .05) + _sleep(delay) + # END endless loop + if not gotit: + try: + self.__dict__['_Condition__waiters'].remove(waiter) + except ValueError: + pass + # END didn't ever get it + finally: + self._acquire_restore(saved_state) + + def notify(self, n=1): + __waiters = self.__dict__['_Condition__waiters'] + if not __waiters: + return + if n == 1: + __waiters[0].release() + try: + __waiters.pop(0) + except IndexError: + pass + else: + waiters = __waiters[:n] + for waiter in waiters: + waiter.release() + try: + __waiters.remove(waiter) + except ValueError: + pass + # END handle n = 1 case faster + +class AsyncQueue(Queue): + """A queue using different condition objects to gain multithreading performance""" + def __init__(self, maxsize=0): + Queue.__init__(self, maxsize) + + self.not_empty = HSCondition(self.mutex) + self.not_full = HSCondition(self.mutex) + self.all_tasks_done = HSCondition(self.mutex) + + +#} END utilities |