summaryrefslogtreecommitdiff
path: root/lib/git/async/channel.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-08 13:24:44 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-08 13:24:44 +0200
commit3776f7a766851058f6435b9f606b16766425d7ca (patch)
tree8096d6d84090f4abc5aad374c1fe6f64088572a6 /lib/git/async/channel.py
parent09c3f39ceb545e1198ad7a3f470d4ec896ce1add (diff)
downloadgitpython-3776f7a766851058f6435b9f606b16766425d7ca.tar.gz
The new channeldesign actually works, but it also shows that its located at the wrong spot. The channel is nothing more than an adapter allowing to read multiple items from a thread-safe queue, the queue itself though must be 'closable' for writing, or needs something like a writable flag.
Diffstat (limited to 'lib/git/async/channel.py')
-rw-r--r--lib/git/async/channel.py108
1 files changed, 56 insertions, 52 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 2d5ab79c..655024fe 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -4,7 +4,11 @@ from Queue import (
Full
)
-from util import AsyncQueue
+from util import (
+ AsyncQueue,
+ DummyLock
+ )
+
from time import time
import sys
@@ -23,12 +27,9 @@ class Channel(object):
def __new__(cls, *args):
if cls is Channel:
- max_items = 0
- if len(args) == 1:
- max_items = args[0]
- if len(args) > 1:
- raise ValueError("Specify not more than the number of items the channel should take")
- wc = WChannel(max_items)
+ if len(args) > 0:
+ raise ValueError("Cannot take any arguments when creating a new channel")
+ wc = WChannel()
rc = RChannel(wc)
return wc, rc
# END constructor mode
@@ -39,11 +40,11 @@ class WChannel(Channel):
"""The write end of a channel"""
__slots__ = ('_closed', '_queue')
- def __init__(self, max_items=0):
+ def __init__(self):
"""initialize this instance, able to hold max_items at once
Write calls will block if the channel is full, until someone reads from it"""
self._closed = False
- self._queue = AsyncQueue(max_items)
+ self._queue = AsyncQueue()
#{ Interface
@@ -74,7 +75,21 @@ class WChannel(Channel):
def close(self):
"""Close the channel. Multiple close calls on a closed channel are no
an error"""
+ mutex = self._queue.mutex
+ mutex.acquire()
+ # this is atomic already, due to the GIL - no need to get the queue's mutex
+ print "channel.close()"
self._closed = True
+ # now make sure that the people waiting for an item are released now
+ # As we it could be that some readers are already on their way to initiate
+ # a blocking get, we must make sure that locks never block before that happens
+
+ # now we are the only one accessing the queue, so change it
+ self._queue.mutex = DummyLock()
+ print self._queue.not_empty._waiters
+ self._queue.not_empty.notify_all()
+ print self._queue.not_empty._waiters
+ mutex.release()
@property
def closed(self):
@@ -134,58 +149,47 @@ class RChannel(Channel):
pass
# END handle exceptions
else:
- # if we have really bad timing, the source of the channel
- # marks itself closed, but before setting it, the thread
- # switches to us. We read it, read False, and try to fetch
- # something, and never return. The whole closed channel thing
- # is not atomic ( of course )
- # This is why we never block for long, to get a chance to recheck
- # for closed channels.
- # We blend this into the timeout of the user
- ourtimeout = 0.25 # the smaller, the more responsive, but the slower
- wc = self._wc
- timeout = (timeout is None and sys.maxint) or timeout # make sure we can compute with it
- assert timeout != 0.0, "shouldn't block if timeout is 0" # okay safe
- if timeout and ourtimeout > timeout:
- ourtimeout = timeout
- # END setup timeout
-
# to get everything into one loop, we set the count accordingly
if count == 0:
count = sys.maxint
# END handle count
+ endtime = sys.maxint # allows timeout for whole operation
+ if timeout is not None:
+ endtime = time() + timeout
+ # could be improved by a separate: no-endtime branch, saving the time calls
for i in xrange(count):
- have_timeout = False
- st = time()
- while True:
+ try:
+ print "about to read", i, count, block, timeout
+ out.append(queue.get(block, timeout))
+ print "got one"
+ except Empty:
+ pass
+ # END ignore empty
+
+ # if we have been unblocked because the closed state changed
+ # in the meanwhile, stop trying
+ # NOTE: must NOT cache _wc
+ if self._wc.closed:
+ # its racing time - all threads waiting for the queue
+ # are awake now, and we actually can't be sure its empty
+ # Hence we pop it empty without blocking, getting as much
+ # as we can. This effectively lets us race ( with mutexes )
+ # of the other threads.
+ print "stopped because it was closed"
try:
- if wc.closed:
- have_timeout = True
- # its about the 'in the meanwhile' :) - get everything
- # we can in non-blocking mode. This will raise
- try:
- while True:
- out.append(queue.get(False))
- # END until it raises Empty
- except Empty:
- break
- # END finally, out of here
- # END don't continue on closed channels
-
- # END abort reading if it was closed ( in the meanwhile )
- out.append(queue.get(block, ourtimeout))
- break # breakout right away
+ while True:
+ out.append(queue.get(False))
+ # END pop it empty
except Empty:
- if timeout - (time() - st) <= 0:
- # hitting timeout
- have_timeout = True
- break
- # END abort if the user wants no more time spent here
- # END handle timeout
- # END endless timer loop
- if have_timeout:
+ pass
+ # END ignore emptyness, we have all
+
break
+ # END handle cloased
+
+ if time() >= endtime:
+ break
# END stop on timeout
# END for each item
# END handle blocking