summaryrefslogtreecommitdiff
path: root/lib/git/async/util.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-09 10:34:12 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-09 10:34:12 +0200
commit15941ca090a2c3c987324fc911bbc6f89e941c47 (patch)
tree3c508eb2e8be484e8685cddaa1de72826fbf9302 /lib/git/async/util.py
parentf78d4a28f307a9d7943a06be9f919304c25ac2d9 (diff)
downloadgitpython-15941ca090a2c3c987324fc911bbc6f89e941c47.tar.gz
queue: fixed critical bug in the notify method, as it was not at all thread-safe, causing locks to be released multiple times. Now it runs very fast, and very stable apparently.
Now its about putting previous features back in, and studying their results, before more complex task graphs can be examined
Diffstat (limited to 'lib/git/async/util.py')
-rw-r--r--lib/git/async/util.py50
1 files changed, 31 insertions, 19 deletions
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index f3213ed6..dff38f58 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -71,18 +71,15 @@ class SyncQueue(deque):
class HSCondition(object):
- """An attempt to make conditions less blocking, which gains performance
- in return by sleeping less"""
- # __slots__ = ("acquire", "release", "_lock", '_waiters')
+ """Cleaned up code of the original condition object in order
+ to make it run and respond faster."""
__slots__ = ("_lock", '_waiters')
- delay = 0.00002 # reduces wait times, but increases overhead
+ delay = 0.0002 # reduces wait times, but increases overhead
def __init__(self, lock=None):
if lock is None:
lock = Lock()
self._lock = lock
- #self.acquire = lock.acquire
- #self.release = lock.release
self._waiters = list()
def release(self):
@@ -109,6 +106,8 @@ class HSCondition(object):
# Balancing act: We can't afford a pure busy loop, because of the
# GIL, so we have to sleep
# We try to sleep only tiny amounts of time though to be very responsive
+ # NOTE: this branch is not used by the async system anyway, but
+ # will be hit when the user reads with timeout
endtime = _time() + timeout
delay = self.delay
acquire = waiter.acquire
@@ -133,25 +132,36 @@ class HSCondition(object):
finally:
# reacquire the lock
self.acquire()
+ # END assure release lock
def notify(self, n=1):
+ """Its vital that this method is threadsafe - to be fast we don'd get a lock,
+ but instead rely on pseudo-atomic operations that come with the GIL.
+ Hence we use pop in the n=1 case to be truly atomic.
+ In the multi-notify case, we acquire a lock just for safety, as otherwise
+ we might pop too much of someone else notifies n waiters as well, which
+ would in the worst case lead to double-releases of locks."""
if not self._waiters:
return
- waiters = self._waiters
if n == 1:
- waiters[0].release()
+ # so here we assume this is thead-safe ! It wouldn't be in any other
+ # language, but python it is.
try:
- waiters.pop(0)
+ self._waiters.pop(0).release()
except IndexError:
pass
else:
- print "notify", waiters, n
- for waiter in waiters[:n]:
- waiter.release()
- try:
- waiters.remove(waiter)
- except ValueError:
- pass
+ self.acquire()
+ # once the waiter resumes, he will want to acquire the lock
+ # and waits again, but only until we are done, which is important
+ # to do that in a thread-safe fashion
+ try:
+ for i in range(min(n, len(self._waiters))):
+ self._waiters.pop(0).release()
+ # END for each waiter to resume
+ finally:
+ self.release()
+ # END assure we release our lock
# END handle n = 1 case faster
def notify_all(self):
@@ -164,7 +174,8 @@ class ReadOnly(Exception):
class AsyncQueue(Queue):
"""A queue using different condition objects to gain multithreading performance.
Additionally it has a threadsafe writable flag, which will alert all readers
- that there is nothing more to get here."""
+ that there is nothing more to get here.
+ All default-queue code was cleaned up for performance."""
__slots__ = ('mutex', 'not_empty', 'queue', '_writable')
def __init__(self, maxsize=0):
@@ -222,7 +233,7 @@ class AsyncQueue(Queue):
self.not_empty.notify()
def get(self, block=True, timeout=None):
- self.not_empty.acquire() # == self.mutex.acquire in that case
+ self.mutex.acquire()
q = self.queue
try:
if block:
@@ -246,7 +257,8 @@ class AsyncQueue(Queue):
raise Empty
# END handle unblocking reason
finally:
- self.not_empty.release()
+ self.mutex.release()
+ # END assure lock is released
#} END utilities