diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 10:34:12 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 10:34:12 +0200 |
commit | 15941ca090a2c3c987324fc911bbc6f89e941c47 (patch) | |
tree | 3c508eb2e8be484e8685cddaa1de72826fbf9302 /lib/git/async/util.py | |
parent | f78d4a28f307a9d7943a06be9f919304c25ac2d9 (diff) | |
download | gitpython-15941ca090a2c3c987324fc911bbc6f89e941c47.tar.gz |
queue: fixed critical bug in the notify method, as it was not at all thread-safe, causing locks to be released multiple times. Now it runs very fast, and very stable apparently.
Now its about putting previous features back in, and studying their results, before more complex task graphs can be examined
Diffstat (limited to 'lib/git/async/util.py')
-rw-r--r-- | lib/git/async/util.py | 50 |
1 files changed, 31 insertions, 19 deletions
diff --git a/lib/git/async/util.py b/lib/git/async/util.py index f3213ed6..dff38f58 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -71,18 +71,15 @@ class SyncQueue(deque): class HSCondition(object): - """An attempt to make conditions less blocking, which gains performance - in return by sleeping less""" - # __slots__ = ("acquire", "release", "_lock", '_waiters') + """Cleaned up code of the original condition object in order + to make it run and respond faster.""" __slots__ = ("_lock", '_waiters') - delay = 0.00002 # reduces wait times, but increases overhead + delay = 0.0002 # reduces wait times, but increases overhead def __init__(self, lock=None): if lock is None: lock = Lock() self._lock = lock - #self.acquire = lock.acquire - #self.release = lock.release self._waiters = list() def release(self): @@ -109,6 +106,8 @@ class HSCondition(object): # Balancing act: We can't afford a pure busy loop, because of the # GIL, so we have to sleep # We try to sleep only tiny amounts of time though to be very responsive + # NOTE: this branch is not used by the async system anyway, but + # will be hit when the user reads with timeout endtime = _time() + timeout delay = self.delay acquire = waiter.acquire @@ -133,25 +132,36 @@ class HSCondition(object): finally: # reacquire the lock self.acquire() + # END assure release lock def notify(self, n=1): + """Its vital that this method is threadsafe - to be fast we don'd get a lock, + but instead rely on pseudo-atomic operations that come with the GIL. + Hence we use pop in the n=1 case to be truly atomic. + In the multi-notify case, we acquire a lock just for safety, as otherwise + we might pop too much of someone else notifies n waiters as well, which + would in the worst case lead to double-releases of locks.""" if not self._waiters: return - waiters = self._waiters if n == 1: - waiters[0].release() + # so here we assume this is thead-safe ! It wouldn't be in any other + # language, but python it is. try: - waiters.pop(0) + self._waiters.pop(0).release() except IndexError: pass else: - print "notify", waiters, n - for waiter in waiters[:n]: - waiter.release() - try: - waiters.remove(waiter) - except ValueError: - pass + self.acquire() + # once the waiter resumes, he will want to acquire the lock + # and waits again, but only until we are done, which is important + # to do that in a thread-safe fashion + try: + for i in range(min(n, len(self._waiters))): + self._waiters.pop(0).release() + # END for each waiter to resume + finally: + self.release() + # END assure we release our lock # END handle n = 1 case faster def notify_all(self): @@ -164,7 +174,8 @@ class ReadOnly(Exception): class AsyncQueue(Queue): """A queue using different condition objects to gain multithreading performance. Additionally it has a threadsafe writable flag, which will alert all readers - that there is nothing more to get here.""" + that there is nothing more to get here. + All default-queue code was cleaned up for performance.""" __slots__ = ('mutex', 'not_empty', 'queue', '_writable') def __init__(self, maxsize=0): @@ -222,7 +233,7 @@ class AsyncQueue(Queue): self.not_empty.notify() def get(self, block=True, timeout=None): - self.not_empty.acquire() # == self.mutex.acquire in that case + self.mutex.acquire() q = self.queue try: if block: @@ -246,7 +257,8 @@ class AsyncQueue(Queue): raise Empty # END handle unblocking reason finally: - self.not_empty.release() + self.mutex.release() + # END assure lock is released #} END utilities |