summaryrefslogtreecommitdiff
path: root/lib/git/async/util.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-08 13:24:44 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-08 13:24:44 +0200
commit3776f7a766851058f6435b9f606b16766425d7ca (patch)
tree8096d6d84090f4abc5aad374c1fe6f64088572a6 /lib/git/async/util.py
parent09c3f39ceb545e1198ad7a3f470d4ec896ce1add (diff)
downloadgitpython-3776f7a766851058f6435b9f606b16766425d7ca.tar.gz
The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag.
Diffstat (limited to 'lib/git/async/util.py')
-rw-r--r--lib/git/async/util.py32
1 files changed, 20 insertions, 12 deletions
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index fb63ccaa..01073f6d 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -73,21 +73,22 @@ class SyncQueue(deque):
class HSCondition(object):
"""An attempt to make conditions less blocking, which gains performance
in return by sleeping less"""
- __slots__ = ("acquire", "release", "_lock", '_waiters')
+ # __slots__ = ("acquire", "release", "_lock", '_waiters')
+ __slots__ = ("_lock", '_waiters')
delay = 0.00002 # reduces wait times, but increases overhead
def __init__(self, lock=None):
if lock is None:
lock = Lock()
self._lock = lock
- self.acquire = lock.acquire
- self.release = lock.release
+ #self.acquire = lock.acquire
+ #self.release = lock.release
self._waiters = list()
- def __release(self):
+ def release(self):
return self._lock.release()
- def __acquire(self, block=None):
+ def acquire(self, block=None):
if block is None:
self._lock.acquire()
else:
@@ -156,7 +157,7 @@ class HSCondition(object):
self.notify(len(self._waiters))
-class AsyncQueue(Queue):
+class _AsyncQueue(Queue):
"""A queue using different condition objects to gain multithreading performance"""
def __init__(self, maxsize=0):
Queue.__init__(self, maxsize)
@@ -166,7 +167,7 @@ class AsyncQueue(Queue):
self.all_tasks_done = HSCondition(self.mutex)
-class _AsyncQueue(Queue):
+class AsyncQueue(Queue):
"""A queue using different condition objects to gain multithreading performance"""
__slots__ = ('mutex', 'not_empty', 'queue')
@@ -194,9 +195,9 @@ class _AsyncQueue(Queue):
self.queue.append(item)
self.mutex.release()
self.not_empty.notify()
-
+
def get(self, block=True, timeout=None):
- self.not_empty.acquire()
+ self.not_empty.acquire() # == self.mutex.acquire in that case
q = self.queue
try:
if not block:
@@ -205,16 +206,23 @@ class _AsyncQueue(Queue):
elif timeout is None:
while not len(q):
self.not_empty.wait()
- elif timeout < 0:
- raise ValueError("'timeout' must be a positive number")
else:
+ print "with timeout", timeout
+ import traceback
+ traceback.print_stack()
endtime = _time() + timeout
while not len(q):
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
- return q.popleft()
+ # END handle block
+ # can happen if someone else woke us up
+ try:
+ return q.popleft()
+ except IndexError:
+ raise Empty
+ # END handle unblocking reason
finally:
self.not_empty.release()