summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/channel.py35
-rw-r--r--lib/git/async/pool.py7
-rw-r--r--lib/git/async/task.py2
-rw-r--r--lib/git/async/thread.py3
-rw-r--r--lib/git/async/util.py25
5 files changed, 38 insertions, 34 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 08323582..5c52b1dc 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -6,6 +6,7 @@ from Queue import (
from util import (
AsyncQueue,
+ ReadOnly
)
from time import time
@@ -59,6 +60,7 @@ class WChannel(Channel):
# let the queue handle the 'closed' attribute, we write much more often
# to an open channel than to a closed one, saving a few cycles
try:
+ print "putting item", item, id(self._queue.queue)
self._queue.put(item, block, timeout)
except ReadOnly:
raise IOError("Cannot write to a closed channel")
@@ -74,6 +76,7 @@ class WChannel(Channel):
an error"""
# yes, close it a little too early, better than having anyone put
# additional items
+ print "closing channel", self
self._closed = True
self._queue.set_writable(False)
@@ -102,7 +105,7 @@ class RChannel(Channel):
:param count: given amount of items to read. If < 1, all items will be read
:param block: if True, the call will block until an item is available
:param timeout: if positive and block is True, it will block only for the
- given amount of seconds.
+ given amount of seconds, returning the items it received so far.
:return: single item in a list if count is 1, or a list of count items.
If the channel was empty and count was 1, an empty list will be returned.
If count was greater 1, a list with less than count items will be
@@ -149,27 +152,29 @@ class RChannel(Channel):
try:
out.append(queue.get(block, timeout))
except Empty:
- pass
+ # here we are only if there is nothing on the queue,
+ # and if we are blocking. If we are not blocking, this
+ # indiccates that the queue was set unwritable in the meanwhile.
+ # hence we can abort now to prevent reading (possibly) forever
+ # Besides, this is racy as all threads will rip on the channel
+ # without waiting until its empty
+ if not block:
+ break
# END ignore empty
# if we have been unblocked because the closed state changed
# in the meanwhile, stop trying
# NOTE: must NOT cache _wc
if self._wc.closed:
- # its racing time - all threads waiting for the queue
- # are awake now, and we actually can't be sure its empty
- # Hence we pop it empty without blocking, getting as much
- # as we can. This effectively lets us race ( with mutexes )
- # of the other threads.
- try:
- while True:
- out.append(queue.get(False))
- # END pop it empty
- except Empty:
- pass
- # END ignore emptyness, we have all
+ # If we were closed, we drop out even if there might still
+ # be items. Now its time to get these items, according to
+ # our count. Just switch to unblocking mode.
+ # If we are to read unlimited items, this would run forever,
+ # but the EmptyException handler takes care of this
+ block = False
- break
+ # we don't continue, but let the timer decide whether
+ # it wants to abort
# END handle channel cloased
if time() >= endtime:
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index d6b5711d..cf1c2199 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -80,8 +80,8 @@ class RPoolChannel(RChannel):
# * make no assumptions if there are multiple consumers
# *
have_enough = False
- if count > 0:
- have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
+ #if count > 0:
+ # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
# END
########## prepare ##############################
@@ -319,6 +319,7 @@ class Pool(object):
def _del_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
+ # 1 as its stored on the task, 1 for the getrefcount call
if sys.getrefcount(task._out_wc) < 3:
self.del_task(task)
#} END internal
@@ -403,7 +404,7 @@ class Pool(object):
if not task in self._tasks.nodes:
return self
# END early abort
-
+ print "deleting ", id(task)
# the task we are currently deleting could also be processed by
# a thread right now. We don't care about it as its taking care about
# its write channel itself, and sends everything it can to it.
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index ce701c86..97521cae 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -88,6 +88,7 @@ class OutputChannelTask(Node):
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
items = self._read(count)
+ print "task read", len(items)
try:
# increase the ref-count - we use this to determine whether anyone else
# is currently handling our output channel. As this method runs asynchronously,
@@ -117,6 +118,7 @@ class OutputChannelTask(Node):
# END handle single apply
except Exception, e:
self._exc = e
+ print str(e) # TODO: REMOVE DEBUG, or make it use logging
self.set_done()
# unschedule all, we don't know how many have been produced actually
# but only if we don't apply single please
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index 4240a664..5faad4f8 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -124,6 +124,7 @@ class WorkerThread(TerminatableThread):
def run(self):
"""Process input tasks until we receive the quit signal"""
+ gettask = self.inq.get
while True:
self._current_routine = None
if self._should_terminate():
@@ -132,7 +133,7 @@ class WorkerThread(TerminatableThread):
# don't wait too long, instead check for the termination request more often
try:
- tasktuple = self.inq.get(True, 1)
+ tasktuple = gettask(True, 0.25)
except Queue.Empty:
continue
# END get task with timeout
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index 51219cc4..6d09de59 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -63,7 +63,7 @@ class SyncQueue(deque):
except IndexError:
raise Empty
# END raise empty
-
+
def empty(self):
return len(self) == 0
@@ -86,13 +86,13 @@ class HSCondition(object):
self._waiters = list()
def release(self):
- return self._lock.release()
+ self._lock.release()
def acquire(self, block=None):
if block is None:
self._lock.acquire()
else:
- return self._lock.acquire(block)
+ self._lock.acquire(block)
def wait(self, timeout=None):
waiter = _allocate_lock()
@@ -145,6 +145,7 @@ class HSCondition(object):
except IndexError:
pass
else:
+ print "notify", waiters, n
for waiter in waiters[:n]:
waiter.release()
try:
@@ -156,16 +157,6 @@ class HSCondition(object):
def notify_all(self):
self.notify(len(self._waiters))
-
-class _AsyncQueue(Queue):
- """A queue using different condition objects to gain multithreading performance"""
- def __init__(self, maxsize=0):
- Queue.__init__(self, maxsize)
-
- self.not_empty = HSCondition(self.mutex)
- self.not_full = HSCondition(self.mutex)
- self.all_tasks_done = HSCondition(self.mutex)
-
class ReadOnly(Exception):
"""Thrown when trying to write to a read-only queue"""
@@ -205,11 +196,12 @@ class AsyncQueue(Queue):
self._writable = state
return old
finally:
+ self.mutex.release()
+
# if we won't receive anymore items, inform the getters
if not state:
self.not_empty.notify_all()
# END tell everyone
- self.mutex.release()
# END handle locking
def empty(self):
@@ -222,6 +214,7 @@ class AsyncQueue(Queue):
def put(self, item, block=True, timeout=None):
self.mutex.acquire()
if not self._writable:
+ self.mutex.release()
raise ReadOnly
# END handle read-only
self.queue.append(item)
@@ -245,7 +238,9 @@ class AsyncQueue(Queue):
self.not_empty.wait(remaining)
# END handle timeout mode
# END handle block
- # can happen if we woke up because we are not writable anymore
+
+ # can throw if we woke up because we are not writable anymore
+ print len(q), id(q), current_thread()
try:
return q.popleft()
except IndexError: