diff options
Diffstat (limited to 'lib/git/async/util.py')
-rw-r--r-- | lib/git/async/util.py | 164 |
1 files changed, 136 insertions, 28 deletions
diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 55766579..f3213ed6 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -2,6 +2,8 @@ from threading import ( Lock, + current_thread, + _allocate_lock, _Condition, _sleep, _time, @@ -57,36 +59,56 @@ class SyncQueue(deque): """Adapter to allow using a deque like a queue, without locking""" def get(self, block=True, timeout=None): try: - return self.pop() + return self.popleft() except IndexError: raise Empty # END raise empty - + def empty(self): return len(self) == 0 put = deque.append - -class HSCondition(_Condition): + +class HSCondition(object): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" + # __slots__ = ("acquire", "release", "_lock", '_waiters') + __slots__ = ("_lock", '_waiters') delay = 0.00002 # reduces wait times, but increases overhead + def __init__(self, lock=None): + if lock is None: + lock = Lock() + self._lock = lock + #self.acquire = lock.acquire + #self.release = lock.release + self._waiters = list() + + def release(self): + self._lock.release() + + def acquire(self, block=None): + if block is None: + self._lock.acquire() + else: + self._lock.acquire(block) + def wait(self, timeout=None): - waiter = Lock() - waiter.acquire() - self.__dict__['_Condition__waiters'].append(waiter) - saved_state = self._release_save() + waiter = _allocate_lock() + waiter.acquire() # get it the first time, no blocking + self._waiters.append(waiter) + + # in the momemnt we release our lock, someone else might actually resume + self.release() try: # restore state no matter what (e.g., KeyboardInterrupt) + # now we block, as we hold the lock already if timeout is None: waiter.acquire() else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). + # Balancing act: We can't afford a pure busy loop, because of the + # GIL, so we have to sleep + # We try to sleep only tiny amounts of time though to be very responsive endtime = _time() + timeout delay = self.delay acquire = waiter.acquire @@ -104,41 +126,127 @@ class HSCondition(_Condition): # END endless loop if not gotit: try: - self.__dict__['_Condition__waiters'].remove(waiter) + self._waiters.remove(waiter) except ValueError: pass # END didn't ever get it finally: - self._acquire_restore(saved_state) + # reacquire the lock + self.acquire() def notify(self, n=1): - __waiters = self.__dict__['_Condition__waiters'] - if not __waiters: + if not self._waiters: return + waiters = self._waiters if n == 1: - __waiters[0].release() + waiters[0].release() try: - __waiters.pop(0) + waiters.pop(0) except IndexError: pass else: - waiters = __waiters[:n] - for waiter in waiters: + print "notify", waiters, n + for waiter in waiters[:n]: waiter.release() try: - __waiters.remove(waiter) + waiters.remove(waiter) except ValueError: pass # END handle n = 1 case faster + def notify_all(self): + self.notify(len(self._waiters)) + + +class ReadOnly(Exception): + """Thrown when trying to write to a read-only queue""" + class AsyncQueue(Queue): - """A queue using different condition objects to gain multithreading performance""" + """A queue using different condition objects to gain multithreading performance. + Additionally it has a threadsafe writable flag, which will alert all readers + that there is nothing more to get here.""" + __slots__ = ('mutex', 'not_empty', 'queue', '_writable') + def __init__(self, maxsize=0): - Queue.__init__(self, maxsize) - + self.queue = deque() + self.mutex = Lock() self.not_empty = HSCondition(self.mutex) - self.not_full = HSCondition(self.mutex) - self.all_tasks_done = HSCondition(self.mutex) + self._writable = True - + def qsize(self): + self.mutex.acquire() + try: + return len(self.queue) + finally: + self.mutex.release() + + def writable(self): + self.mutex.acquire() + try: + return self._writable + finally: + self.mutex.release() + + def set_writable(self, state): + """Set the writable flag of this queue to True or False + :return: The previous state""" + self.mutex.acquire() + try: + old = self._writable + self._writable = state + return old + finally: + self.mutex.release() + + # if we won't receive anymore items, inform the getters + if not state: + self.not_empty.notify_all() + # END tell everyone + # END handle locking + + def empty(self): + self.mutex.acquire() + try: + return not len(self.queue) + finally: + self.mutex.release() + + def put(self, item, block=True, timeout=None): + self.mutex.acquire() + if not self._writable: + self.mutex.release() + raise ReadOnly + # END handle read-only + self.queue.append(item) + self.mutex.release() + self.not_empty.notify() + + def get(self, block=True, timeout=None): + self.not_empty.acquire() # == self.mutex.acquire in that case + q = self.queue + try: + if block: + if timeout is None: + while not len(q) and self._writable: + self.not_empty.wait() + else: + endtime = _time() + timeout + while not len(q) and self._writable: + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + # END handle timeout mode + # END handle block + + # can throw if we woke up because we are not writable anymore + try: + return q.popleft() + except IndexError: + raise Empty + # END handle unblocking reason + finally: + self.not_empty.release() + + #} END utilities |