diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/pool.py | 5 | ||||
-rw-r--r-- | lib/git/async/task.py | 23 | ||||
-rw-r--r-- | lib/git/async/thread.py | 8 | ||||
-rw-r--r-- | lib/git/async/util.py | 19 |
4 files changed, 38 insertions, 17 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7bddf7da..7ed6fd8e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -331,9 +331,8 @@ class Pool(object): def set_size(self, size=0): """Set the amount of workers to use in this pool. When reducing the size, - the call may block as it waits for threads to finish. - When reducing the size to zero, this thread will process all remaining - items on the queue. + threads will continue with their work until they are done before effectively + being removed. :return: self :param size: if 0, the pool will do all work itself in the calling thread, diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f1448f96..dd2bd351 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,4 +1,5 @@ from graph import Node +from util import ReadOnly import threading import sys @@ -117,8 +118,9 @@ class OutputChannelTask(Node): wc.write(rval) # END handle single apply except Exception, e: - self._exc = e print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging + + # be sure our task is not scheduled again self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please @@ -127,6 +129,25 @@ class OutputChannelTask(Node): self._scheduled_items -= len(items) self._slock.release() # END unschedule all + + # PROBLEM: We have failed to create at least one item, hence its not + # garantueed that enough items will be produced for a possibly blocking + # client on the other end. This is why we have no other choice but + # to close the channel, preventing the possibility of blocking. + # This implies that dependent tasks will go down with us, but that is + # just the right thing to do of course - one loose link in the chain ... + # Other chunks of our kind currently being processed will then + # fail to write to the channel and fail as well + # self.close() + + # If some other chunk of our Task had an error, the channel will be closed + # This is not an issue, just be sure we don't overwrite the original + # exception with the ReadOnly error that would be emitted in that case. + # We imply that ReadOnly is exclusive to us, as it won't be an error + # if the user emits it + if not isinstance(e, ReadOnly): + self._exc = e + # END set error flag # END exception handling del(wc) diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index cd964f1c..faeda04f 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -125,7 +125,9 @@ class WorkerThread(TerminatableThread): def __init__(self, inq = None): super(WorkerThread, self).__init__() - self.inq = inq or Queue.Queue() + self.inq = inq + if inq is None: + self.inq = Queue.Queue() self._current_routine = None # routine we execute right now @classmethod @@ -135,6 +137,8 @@ class WorkerThread(TerminatableThread): def run(self): """Process input tasks until we receive the quit signal""" + print self.name, "starts processing" # DEBUG + gettask = self.inq.get while True: self._current_routine = None @@ -166,7 +170,7 @@ class WorkerThread(TerminatableThread): break # END make routine call except StopProcessing: - print self.name, "stops processing" + print self.name, "stops processing" # DEBUG break except Exception,e: print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) diff --git a/lib/git/async/util.py b/lib/git/async/util.py index dff38f58..b5e1a0c0 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -10,7 +10,6 @@ from threading import ( ) from Queue import ( - Queue, Empty, ) @@ -171,15 +170,14 @@ class HSCondition(object): class ReadOnly(Exception): """Thrown when trying to write to a read-only queue""" -class AsyncQueue(Queue): +class AsyncQueue(deque): """A queue using different condition objects to gain multithreading performance. Additionally it has a threadsafe writable flag, which will alert all readers that there is nothing more to get here. All default-queue code was cleaned up for performance.""" - __slots__ = ('mutex', 'not_empty', 'queue', '_writable') + __slots__ = ('mutex', 'not_empty', '_writable') def __init__(self, maxsize=0): - self.queue = deque() self.mutex = Lock() self.not_empty = HSCondition(self.mutex) self._writable = True @@ -187,7 +185,7 @@ class AsyncQueue(Queue): def qsize(self): self.mutex.acquire() try: - return len(self.queue) + return len(self) finally: self.mutex.release() @@ -218,7 +216,7 @@ class AsyncQueue(Queue): def empty(self): self.mutex.acquire() try: - return not len(self.queue) + return not len(self) finally: self.mutex.release() @@ -228,21 +226,20 @@ class AsyncQueue(Queue): self.mutex.release() raise ReadOnly # END handle read-only - self.queue.append(item) + self.append(item) self.mutex.release() self.not_empty.notify() def get(self, block=True, timeout=None): self.mutex.acquire() - q = self.queue try: if block: if timeout is None: - while not len(q) and self._writable: + while not len(self) and self._writable: self.not_empty.wait() else: endtime = _time() + timeout - while not len(q) and self._writable: + while not len(self) and self._writable: remaining = endtime - _time() if remaining <= 0.0: raise Empty @@ -252,7 +249,7 @@ class AsyncQueue(Queue): # can throw if we woke up because we are not writable anymore try: - return q.popleft() + return self.popleft() except IndexError: raise Empty # END handle unblocking reason |