summaryrefslogtreecommitdiff
path: root/lib/git
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-08 14:23:58 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-08 14:23:58 +0200
commit53152a824f5186452504f0b68306d10ebebee416 (patch)
tree9861c0337b4aa9a9600ce840910f7aac2de7425c /lib/git
parent3776f7a766851058f6435b9f606b16766425d7ca (diff)
downloadgitpython-53152a824f5186452504f0b68306d10ebebee416.tar.gz
queue: adjusted queue to be closable ( without own testing yet, except for the pool which runs it ) - its not yet stable, but should be solvable.
Diffstat (limited to 'lib/git')
-rw-r--r--lib/git/async/channel.py44
-rw-r--r--lib/git/async/util.py68
2 files changed, 62 insertions, 50 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 655024fe..08323582 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -6,7 +6,6 @@ from Queue import (
from util import (
AsyncQueue,
- DummyLock
)
from time import time
@@ -56,15 +55,13 @@ class WChannel(Channel):
channel
:param timeout: timeout in seconds for blocking calls.
:raise IOError: when writing into closed file
- :raise EOFError: when writing into a non-blocking full channel
- :note: may block if the channel has a limited capacity"""
- if self._closed:
- raise IOError("Cannot write to a closed channel")
-
+ :raise EOFError: when writing into a non-blocking full channel"""
+ # let the queue handle the 'closed' attribute, we write much more often
+ # to an open channel than to a closed one, saving a few cycles
try:
self._queue.put(item, block, timeout)
- except Full:
- raise EOFError("Capacity of the channel was exeeded")
+ except ReadOnly:
+ raise IOError("Cannot write to a closed channel")
# END exception handling
def size(self):
@@ -75,21 +72,10 @@ class WChannel(Channel):
def close(self):
"""Close the channel. Multiple close calls on a closed channel are no
an error"""
- mutex = self._queue.mutex
- mutex.acquire()
- # this is atomic already, due to the GIL - no need to get the queue's mutex
- print "channel.close()"
+ # yes, close it a little too early, better than having anyone put
+ # additional items
self._closed = True
- # now make sure that the people waiting for an item are released now
- # As we it could be that some readers are already on their way to initiate
- # a blocking get, we must make sure that locks never block before that happens
-
- # now we are the only one accessing the queue, so change it
- self._queue.mutex = DummyLock()
- print self._queue.not_empty._waiters
- self._queue.not_empty.notify_all()
- print self._queue.not_empty._waiters
- mutex.release()
+ self._queue.set_writable(False)
@property
def closed(self):
@@ -124,6 +110,7 @@ class RChannel(Channel):
If count was < 1, a list with all items that could be read will be
returned."""
# if the channel is closed for writing, we never block
+ # NOTE: is handled by the queue
if self._wc.closed or timeout == 0:
block = False
@@ -160,9 +147,7 @@ class RChannel(Channel):
# could be improved by a separate: no-endtime branch, saving the time calls
for i in xrange(count):
try:
- print "about to read", i, count, block, timeout
out.append(queue.get(block, timeout))
- print "got one"
except Empty:
pass
# END ignore empty
@@ -176,7 +161,6 @@ class RChannel(Channel):
# Hence we pop it empty without blocking, getting as much
# as we can. This effectively lets us race ( with mutexes )
# of the other threads.
- print "stopped because it was closed"
try:
while True:
out.append(queue.get(False))
@@ -186,11 +170,11 @@ class RChannel(Channel):
# END ignore emptyness, we have all
break
- # END handle cloased
-
- if time() >= endtime:
- break
- # END stop on timeout
+ # END handle channel cloased
+
+ if time() >= endtime:
+ break
+ # END stop operation on timeout
# END for each item
# END handle blocking
return out
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index 01073f6d..51219cc4 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -166,15 +166,21 @@ class _AsyncQueue(Queue):
self.not_full = HSCondition(self.mutex)
self.all_tasks_done = HSCondition(self.mutex)
-
+
+class ReadOnly(Exception):
+ """Thrown when trying to write to a read-only queue"""
+
class AsyncQueue(Queue):
- """A queue using different condition objects to gain multithreading performance"""
- __slots__ = ('mutex', 'not_empty', 'queue')
+ """A queue using different condition objects to gain multithreading performance.
+ Additionally it has a threadsafe writable flag, which will alert all readers
+ that there is nothing more to get here."""
+ __slots__ = ('mutex', 'not_empty', 'queue', '_writable')
def __init__(self, maxsize=0):
self.queue = deque()
self.mutex = Lock()
self.not_empty = HSCondition(self.mutex)
+ self._writable = True
def qsize(self):
self.mutex.acquire()
@@ -183,6 +189,29 @@ class AsyncQueue(Queue):
finally:
self.mutex.release()
+ def writable(self):
+ self.mutex.acquire()
+ try:
+ return self._writable
+ finally:
+ self.mutex.release()
+
+ def set_writable(self, state):
+ """Set the writable flag of this queue to True or False
+ :return: The previous state"""
+ self.mutex.acquire()
+ try:
+ old = self._writable
+ self._writable = state
+ return old
+ finally:
+ # if we won't receive anymore items, inform the getters
+ if not state:
+ self.not_empty.notify_all()
+ # END tell everyone
+ self.mutex.release()
+ # END handle locking
+
def empty(self):
self.mutex.acquire()
try:
@@ -192,6 +221,9 @@ class AsyncQueue(Queue):
def put(self, item, block=True, timeout=None):
self.mutex.acquire()
+ if not self._writable:
+ raise ReadOnly
+ # END handle read-only
self.queue.append(item)
self.mutex.release()
self.not_empty.notify()
@@ -200,24 +232,20 @@ class AsyncQueue(Queue):
self.not_empty.acquire() # == self.mutex.acquire in that case
q = self.queue
try:
- if not block:
- if not len(q):
- raise Empty
- elif timeout is None:
- while not len(q):
- self.not_empty.wait()
- else:
- print "with timeout", timeout
- import traceback
- traceback.print_stack()
- endtime = _time() + timeout
- while not len(q):
- remaining = endtime - _time()
- if remaining <= 0.0:
- raise Empty
- self.not_empty.wait(remaining)
+ if block:
+ if timeout is None:
+ while not len(q) and self._writable:
+ self.not_empty.wait()
+ else:
+ endtime = _time() + timeout
+ while not len(q) and self._writable:
+ remaining = endtime - _time()
+ if remaining <= 0.0:
+ raise Empty
+ self.not_empty.wait(remaining)
+ # END handle timeout mode
# END handle block
- # can happen if someone else woke us up
+ # can happen if we woke up because we are not writable anymore
try:
return q.popleft()
except IndexError: