summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/pool.py5
-rw-r--r--lib/git/async/task.py23
-rw-r--r--lib/git/async/thread.py8
-rw-r--r--lib/git/async/util.py19
4 files changed, 38 insertions, 17 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 7bddf7da..7ed6fd8e 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -331,9 +331,8 @@ class Pool(object):
def set_size(self, size=0):
"""Set the amount of workers to use in this pool. When reducing the size,
- the call may block as it waits for threads to finish.
- When reducing the size to zero, this thread will process all remaining
- items on the queue.
+ threads will continue with their work until they are done before effectively
+ being removed.
:return: self
:param size: if 0, the pool will do all work itself in the calling thread,
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index f1448f96..dd2bd351 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -1,4 +1,5 @@
from graph import Node
+from util import ReadOnly
import threading
import sys
@@ -117,8 +118,9 @@ class OutputChannelTask(Node):
wc.write(rval)
# END handle single apply
except Exception, e:
- self._exc = e
print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging
+
+ # be sure our task is not scheduled again
self.set_done()
# unschedule all, we don't know how many have been produced actually
# but only if we don't apply single please
@@ -127,6 +129,25 @@ class OutputChannelTask(Node):
self._scheduled_items -= len(items)
self._slock.release()
# END unschedule all
+
+ # PROBLEM: We have failed to create at least one item, hence its not
+ # garantueed that enough items will be produced for a possibly blocking
+ # client on the other end. This is why we have no other choice but
+ # to close the channel, preventing the possibility of blocking.
+ # This implies that dependent tasks will go down with us, but that is
+ # just the right thing to do of course - one loose link in the chain ...
+ # Other chunks of our kind currently being processed will then
+ # fail to write to the channel and fail as well
+ # self.close()
+
+ # If some other chunk of our Task had an error, the channel will be closed
+ # This is not an issue, just be sure we don't overwrite the original
+ # exception with the ReadOnly error that would be emitted in that case.
+ # We imply that ReadOnly is exclusive to us, as it won't be an error
+ # if the user emits it
+ if not isinstance(e, ReadOnly):
+ self._exc = e
+ # END set error flag
# END exception handling
del(wc)
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index cd964f1c..faeda04f 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -125,7 +125,9 @@ class WorkerThread(TerminatableThread):
def __init__(self, inq = None):
super(WorkerThread, self).__init__()
- self.inq = inq or Queue.Queue()
+ self.inq = inq
+ if inq is None:
+ self.inq = Queue.Queue()
self._current_routine = None # routine we execute right now
@classmethod
@@ -135,6 +137,8 @@ class WorkerThread(TerminatableThread):
def run(self):
"""Process input tasks until we receive the quit signal"""
+ print self.name, "starts processing" # DEBUG
+
gettask = self.inq.get
while True:
self._current_routine = None
@@ -166,7 +170,7 @@ class WorkerThread(TerminatableThread):
break
# END make routine call
except StopProcessing:
- print self.name, "stops processing"
+ print self.name, "stops processing" # DEBUG
break
except Exception,e:
print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index dff38f58..b5e1a0c0 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -10,7 +10,6 @@ from threading import (
)
from Queue import (
- Queue,
Empty,
)
@@ -171,15 +170,14 @@ class HSCondition(object):
class ReadOnly(Exception):
"""Thrown when trying to write to a read-only queue"""
-class AsyncQueue(Queue):
+class AsyncQueue(deque):
"""A queue using different condition objects to gain multithreading performance.
Additionally it has a threadsafe writable flag, which will alert all readers
that there is nothing more to get here.
All default-queue code was cleaned up for performance."""
- __slots__ = ('mutex', 'not_empty', 'queue', '_writable')
+ __slots__ = ('mutex', 'not_empty', '_writable')
def __init__(self, maxsize=0):
- self.queue = deque()
self.mutex = Lock()
self.not_empty = HSCondition(self.mutex)
self._writable = True
@@ -187,7 +185,7 @@ class AsyncQueue(Queue):
def qsize(self):
self.mutex.acquire()
try:
- return len(self.queue)
+ return len(self)
finally:
self.mutex.release()
@@ -218,7 +216,7 @@ class AsyncQueue(Queue):
def empty(self):
self.mutex.acquire()
try:
- return not len(self.queue)
+ return not len(self)
finally:
self.mutex.release()
@@ -228,21 +226,20 @@ class AsyncQueue(Queue):
self.mutex.release()
raise ReadOnly
# END handle read-only
- self.queue.append(item)
+ self.append(item)
self.mutex.release()
self.not_empty.notify()
def get(self, block=True, timeout=None):
self.mutex.acquire()
- q = self.queue
try:
if block:
if timeout is None:
- while not len(q) and self._writable:
+ while not len(self) and self._writable:
self.not_empty.wait()
else:
endtime = _time() + timeout
- while not len(q) and self._writable:
+ while not len(self) and self._writable:
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
@@ -252,7 +249,7 @@ class AsyncQueue(Queue):
# can throw if we woke up because we are not writable anymore
try:
- return q.popleft()
+ return self.popleft()
except IndexError:
raise Empty
# END handle unblocking reason