summaryrefslogtreecommitdiff
path: root/lib/git/async/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/util.py')
-rw-r--r--lib/git/async/util.py50
1 files changed, 31 insertions, 19 deletions
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index f3213ed6..dff38f58 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -71,18 +71,15 @@ class SyncQueue(deque):
class HSCondition(object):
- """An attempt to make conditions less blocking, which gains performance
- in return by sleeping less"""
- # __slots__ = ("acquire", "release", "_lock", '_waiters')
+ """Cleaned up code of the original condition object in order
+ to make it run and respond faster."""
__slots__ = ("_lock", '_waiters')
- delay = 0.00002 # reduces wait times, but increases overhead
+ delay = 0.0002 # reduces wait times, but increases overhead
def __init__(self, lock=None):
if lock is None:
lock = Lock()
self._lock = lock
- #self.acquire = lock.acquire
- #self.release = lock.release
self._waiters = list()
def release(self):
@@ -109,6 +106,8 @@ class HSCondition(object):
# Balancing act: We can't afford a pure busy loop, because of the
# GIL, so we have to sleep
# We try to sleep only tiny amounts of time though to be very responsive
+ # NOTE: this branch is not used by the async system anyway, but
+ # will be hit when the user reads with timeout
endtime = _time() + timeout
delay = self.delay
acquire = waiter.acquire
@@ -133,25 +132,36 @@ class HSCondition(object):
finally:
# reacquire the lock
self.acquire()
+ # END assure release lock
def notify(self, n=1):
+ """Its vital that this method is threadsafe - to be fast we don'd get a lock,
+ but instead rely on pseudo-atomic operations that come with the GIL.
+ Hence we use pop in the n=1 case to be truly atomic.
+ In the multi-notify case, we acquire a lock just for safety, as otherwise
+ we might pop too much of someone else notifies n waiters as well, which
+ would in the worst case lead to double-releases of locks."""
if not self._waiters:
return
- waiters = self._waiters
if n == 1:
- waiters[0].release()
+ # so here we assume this is thead-safe ! It wouldn't be in any other
+ # language, but python it is.
try:
- waiters.pop(0)
+ self._waiters.pop(0).release()
except IndexError:
pass
else:
- print "notify", waiters, n
- for waiter in waiters[:n]:
- waiter.release()
- try:
- waiters.remove(waiter)
- except ValueError:
- pass
+ self.acquire()
+ # once the waiter resumes, he will want to acquire the lock
+ # and waits again, but only until we are done, which is important
+ # to do that in a thread-safe fashion
+ try:
+ for i in range(min(n, len(self._waiters))):
+ self._waiters.pop(0).release()
+ # END for each waiter to resume
+ finally:
+ self.release()
+ # END assure we release our lock
# END handle n = 1 case faster
def notify_all(self):
@@ -164,7 +174,8 @@ class ReadOnly(Exception):
class AsyncQueue(Queue):
"""A queue using different condition objects to gain multithreading performance.
Additionally it has a threadsafe writable flag, which will alert all readers
- that there is nothing more to get here."""
+ that there is nothing more to get here.
+ All default-queue code was cleaned up for performance."""
__slots__ = ('mutex', 'not_empty', 'queue', '_writable')
def __init__(self, maxsize=0):
@@ -222,7 +233,7 @@ class AsyncQueue(Queue):
self.not_empty.notify()
def get(self, block=True, timeout=None):
- self.not_empty.acquire() # == self.mutex.acquire in that case
+ self.mutex.acquire()
q = self.queue
try:
if block:
@@ -246,7 +257,8 @@ class AsyncQueue(Queue):
raise Empty
# END handle unblocking reason
finally:
- self.not_empty.release()
+ self.mutex.release()
+ # END assure lock is released
#} END utilities