diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 11:28:37 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 11:28:37 +0200 |
commit | f2c8d26d3b25b864ad48e6de018757266b59f708 (patch) | |
tree | 4c455469b09e049ff8e29e6166bc2118356dc9cd /lib/git | |
parent | 15941ca090a2c3c987324fc911bbc6f89e941c47 (diff) | |
download | gitpython-f2c8d26d3b25b864ad48e6de018757266b59f708.tar.gz |
thread: fixed initialization problem if an empty iterable was handed in
queue: Queue now derives from deque directly, which safes one dict lookup as the queue does not need to be accessed through self anymore
pool test improved to better verify threads are started correctly
Diffstat (limited to 'lib/git')
-rw-r--r-- | lib/git/async/pool.py | 5 | ||||
-rw-r--r-- | lib/git/async/task.py | 23 | ||||
-rw-r--r-- | lib/git/async/thread.py | 8 | ||||
-rw-r--r-- | lib/git/async/util.py | 19 |
4 files changed, 38 insertions, 17 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7bddf7da..7ed6fd8e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -331,9 +331,8 @@ class Pool(object): def set_size(self, size=0): """Set the amount of workers to use in this pool. When reducing the size, - the call may block as it waits for threads to finish. - When reducing the size to zero, this thread will process all remaining - items on the queue. + threads will continue with their work until they are done before effectively + being removed. :return: self :param size: if 0, the pool will do all work itself in the calling thread, diff --git a/lib/git/async/task.py b/lib/git/async/task.py index f1448f96..dd2bd351 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -1,4 +1,5 @@ from graph import Node +from util import ReadOnly import threading import sys @@ -117,8 +118,9 @@ class OutputChannelTask(Node): wc.write(rval) # END handle single apply except Exception, e: - self._exc = e print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging + + # be sure our task is not scheduled again self.set_done() # unschedule all, we don't know how many have been produced actually # but only if we don't apply single please @@ -127,6 +129,25 @@ class OutputChannelTask(Node): self._scheduled_items -= len(items) self._slock.release() # END unschedule all + + # PROBLEM: We have failed to create at least one item, hence its not + # garantueed that enough items will be produced for a possibly blocking + # client on the other end. This is why we have no other choice but + # to close the channel, preventing the possibility of blocking. + # This implies that dependent tasks will go down with us, but that is + # just the right thing to do of course - one loose link in the chain ... + # Other chunks of our kind currently being processed will then + # fail to write to the channel and fail as well + # self.close() + + # If some other chunk of our Task had an error, the channel will be closed + # This is not an issue, just be sure we don't overwrite the original + # exception with the ReadOnly error that would be emitted in that case. + # We imply that ReadOnly is exclusive to us, as it won't be an error + # if the user emits it + if not isinstance(e, ReadOnly): + self._exc = e + # END set error flag # END exception handling del(wc) diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index cd964f1c..faeda04f 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -125,7 +125,9 @@ class WorkerThread(TerminatableThread): def __init__(self, inq = None): super(WorkerThread, self).__init__() - self.inq = inq or Queue.Queue() + self.inq = inq + if inq is None: + self.inq = Queue.Queue() self._current_routine = None # routine we execute right now @classmethod @@ -135,6 +137,8 @@ class WorkerThread(TerminatableThread): def run(self): """Process input tasks until we receive the quit signal""" + print self.name, "starts processing" # DEBUG + gettask = self.inq.get while True: self._current_routine = None @@ -166,7 +170,7 @@ class WorkerThread(TerminatableThread): break # END make routine call except StopProcessing: - print self.name, "stops processing" + print self.name, "stops processing" # DEBUG break except Exception,e: print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e)) diff --git a/lib/git/async/util.py b/lib/git/async/util.py index dff38f58..b5e1a0c0 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -10,7 +10,6 @@ from threading import ( ) from Queue import ( - Queue, Empty, ) @@ -171,15 +170,14 @@ class HSCondition(object): class ReadOnly(Exception): """Thrown when trying to write to a read-only queue""" -class AsyncQueue(Queue): +class AsyncQueue(deque): """A queue using different condition objects to gain multithreading performance. Additionally it has a threadsafe writable flag, which will alert all readers that there is nothing more to get here. All default-queue code was cleaned up for performance.""" - __slots__ = ('mutex', 'not_empty', 'queue', '_writable') + __slots__ = ('mutex', 'not_empty', '_writable') def __init__(self, maxsize=0): - self.queue = deque() self.mutex = Lock() self.not_empty = HSCondition(self.mutex) self._writable = True @@ -187,7 +185,7 @@ class AsyncQueue(Queue): def qsize(self): self.mutex.acquire() try: - return len(self.queue) + return len(self) finally: self.mutex.release() @@ -218,7 +216,7 @@ class AsyncQueue(Queue): def empty(self): self.mutex.acquire() try: - return not len(self.queue) + return not len(self) finally: self.mutex.release() @@ -228,21 +226,20 @@ class AsyncQueue(Queue): self.mutex.release() raise ReadOnly # END handle read-only - self.queue.append(item) + self.append(item) self.mutex.release() self.not_empty.notify() def get(self, block=True, timeout=None): self.mutex.acquire() - q = self.queue try: if block: if timeout is None: - while not len(q) and self._writable: + while not len(self) and self._writable: self.not_empty.wait() else: endtime = _time() + timeout - while not len(q) and self._writable: + while not len(self) and self._writable: remaining = endtime - _time() if remaining <= 0.0: raise Empty @@ -252,7 +249,7 @@ class AsyncQueue(Queue): # can throw if we woke up because we are not writable anymore try: - return q.popleft() + return self.popleft() except IndexError: raise Empty # END handle unblocking reason |