diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 14:34:09 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-12 14:34:09 +0200 |
commit | 86ea63504f3e8a74cfb1d533be9d9602d2d17e27 (patch) | |
tree | a2c59af267666a4b44bda748b806585c46faae99 /lib/git/async/util.py | |
parent | f91495e271597034226f1b9651345091083172c4 (diff) | |
download | gitpython-86ea63504f3e8a74cfb1d533be9d9602d2d17e27.tar.gz |
Removed async from tree
Diffstat (limited to 'lib/git/async/util.py')
-rw-r--r-- | lib/git/async/util.py | 268 |
1 files changed, 0 insertions, 268 deletions
diff --git a/lib/git/async/util.py b/lib/git/async/util.py deleted file mode 100644 index 4c4f3929..00000000 --- a/lib/git/async/util.py +++ /dev/null @@ -1,268 +0,0 @@ -"""Module with utilities related to async operations""" - -from threading import ( - Lock, - _allocate_lock, - _Condition, - _sleep, - _time, - ) - -from Queue import ( - Empty, - ) - -from collections import deque -import sys -import os - -#{ Routines - -def cpu_count(): - """:return:number of CPUs in the system - :note: inspired by multiprocessing""" - num = 0 - try: - if sys.platform == 'win32': - num = int(os.environ['NUMBER_OF_PROCESSORS']) - elif 'bsd' in sys.platform or sys.platform == 'darwin': - num = int(os.popen('sysctl -n hw.ncpu').read()) - else: - num = os.sysconf('SC_NPROCESSORS_ONLN') - except (ValueError, KeyError, OSError, AttributeError): - pass - # END exception handling - - if num == 0: - raise NotImplementedError('cannot determine number of cpus') - - return num - -#} END routines - - - -class DummyLock(object): - """An object providing a do-nothing lock interface for use in sync mode""" - __slots__ = tuple() - - def acquire(self): - pass - - def release(self): - pass - - -class SyncQueue(deque): - """Adapter to allow using a deque like a queue, without locking""" - def get(self, block=True, timeout=None): - try: - return self.popleft() - except IndexError: - raise Empty - # END raise empty - - def empty(self): - return len(self) == 0 - - def set_writable(self, state): - pass - - def writable(self): - return True - - def put(self, item, block=True, timeout=None): - self.append(item) - - -class HSCondition(deque): - """Cleaned up code of the original condition object in order - to make it run and respond faster.""" - __slots__ = ("_lock") - delay = 0.0002 # reduces wait times, but increases overhead - - def __init__(self, lock=None): - if lock is None: - lock = Lock() - self._lock = lock - - def release(self): - self._lock.release() - - def acquire(self, block=None): - if block is None: - self._lock.acquire() - else: - self._lock.acquire(block) - - def wait(self, timeout=None): - waiter = _allocate_lock() - waiter.acquire() # get it the first time, no blocking - self.append(waiter) - - - try: - # restore state no matter what (e.g., KeyboardInterrupt) - # now we block, as we hold the lock already - # in the momemnt we release our lock, someone else might actually resume - self._lock.release() - if timeout is None: - waiter.acquire() - else: - # Balancing act: We can't afford a pure busy loop, because of the - # GIL, so we have to sleep - # We try to sleep only tiny amounts of time though to be very responsive - # NOTE: this branch is not used by the async system anyway, but - # will be hit when the user reads with timeout - endtime = _time() + timeout - delay = self.delay - acquire = waiter.acquire - while True: - gotit = acquire(0) - if gotit: - break - remaining = endtime - _time() - if remaining <= 0: - break - # this makes 4 threads working as good as two, but of course - # it causes more frequent micro-sleeping - #delay = min(delay * 2, remaining, .05) - _sleep(delay) - # END endless loop - if not gotit: - try: - self.remove(waiter) - except ValueError: - pass - # END didn't ever get it - finally: - # reacquire the lock - self._lock.acquire() - # END assure release lock - - def notify(self, n=1): - """Its vital that this method is threadsafe - we absolutely have to - get a lock at the beginning of this method to be sure we get the - correct amount of waiters back. If we bail out, although a waiter - is about to be added, it will miss its wakeup notification, and block - forever (possibly)""" - self._lock.acquire() - try: - if not self: # len(self) == 0, but this should be faster - return - if n == 1: - try: - self.popleft().release() - except IndexError: - pass - else: - for i in range(min(n, len(self))): - self.popleft().release() - # END for each waiter to resume - # END handle n = 1 case faster - finally: - self._lock.release() - # END assure lock is released - - def notify_all(self): - self.notify(len(self)) - - -class ReadOnly(Exception): - """Thrown when trying to write to a read-only queue""" - -class AsyncQueue(deque): - """A queue using different condition objects to gain multithreading performance. - Additionally it has a threadsafe writable flag, which will alert all readers - that there is nothing more to get here. - All default-queue code was cleaned up for performance.""" - __slots__ = ('mutex', 'not_empty', '_writable') - - def __init__(self, maxsize=0): - self.mutex = Lock() - self.not_empty = HSCondition(self.mutex) - self._writable = True - - def qsize(self): - self.mutex.acquire() - try: - return len(self) - finally: - self.mutex.release() - - def writable(self): - self.mutex.acquire() - try: - return self._writable - finally: - self.mutex.release() - - def set_writable(self, state): - """Set the writable flag of this queue to True or False - :return: The previous state""" - self.mutex.acquire() - try: - old = self._writable - self._writable = state - return old - finally: - self.mutex.release() - # if we won't receive anymore items, inform the getters - if not state: - self.not_empty.notify_all() - # END tell everyone - # END handle locking - - def empty(self): - self.mutex.acquire() - try: - return not len(self) - finally: - self.mutex.release() - - def put(self, item, block=True, timeout=None): - self.mutex.acquire() - # NOTE: we explicitly do NOT check for our writable state - # Its just used as a notification signal, and we need to be able - # to continue writing to prevent threads ( easily ) from failing - # to write their computed results, which we want in fact - # NO: we want them to fail and stop processing, as the one who caused - # the channel to close had a reason and wants the threads to - # stop on the task as soon as possible - if not self._writable: - self.mutex.release() - raise ReadOnly - # END handle read-only - self.append(item) - self.mutex.release() - self.not_empty.notify() - - def get(self, block=True, timeout=None): - self.mutex.acquire() - try: - if block: - if timeout is None: - while not len(self) and self._writable: - self.not_empty.wait() - else: - endtime = _time() + timeout - while not len(self) and self._writable: - remaining = endtime - _time() - if remaining <= 0.0: - raise Empty - self.not_empty.wait(remaining) - # END handle timeout mode - # END handle block - - # can throw if we woke up because we are not writable anymore - try: - return self.popleft() - except IndexError: - raise Empty - # END handle unblocking reason - finally: - self.mutex.release() - # END assure lock is released - - -#} END utilities |