summaryrefslogtreecommitdiff
path: root/lib/git
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-09 11:28:37 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-09 11:28:37 +0200
commitf2c8d26d3b25b864ad48e6de018757266b59f708 (patch)
tree4c455469b09e049ff8e29e6166bc2118356dc9cd /lib/git
parent15941ca090a2c3c987324fc911bbc6f89e941c47 (diff)
downloadgitpython-f2c8d26d3b25b864ad48e6de018757266b59f708.tar.gz
thread: fixed initialization problem if an empty iterable was handed in
queue: Queue now derives from deque directly, which safes one dict lookup as the queue does not need to be accessed through self anymore pool test improved to better verify threads are started correctly
Diffstat (limited to 'lib/git')
-rw-r--r--lib/git/async/pool.py5
-rw-r--r--lib/git/async/task.py23
-rw-r--r--lib/git/async/thread.py8
-rw-r--r--lib/git/async/util.py19
4 files changed, 38 insertions, 17 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 7bddf7da..7ed6fd8e 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -331,9 +331,8 @@ class Pool(object):
def set_size(self, size=0):
"""Set the amount of workers to use in this pool. When reducing the size,
- the call may block as it waits for threads to finish.
- When reducing the size to zero, this thread will process all remaining
- items on the queue.
+ threads will continue with their work until they are done before effectively
+ being removed.
:return: self
:param size: if 0, the pool will do all work itself in the calling thread,
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index f1448f96..dd2bd351 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -1,4 +1,5 @@
from graph import Node
+from util import ReadOnly
import threading
import sys
@@ -117,8 +118,9 @@ class OutputChannelTask(Node):
wc.write(rval)
# END handle single apply
except Exception, e:
- self._exc = e
print >> sys.stderr, "task error:", str(e) # TODO: REMOVE DEBUG, or make it use logging
+
+ # be sure our task is not scheduled again
self.set_done()
# unschedule all, we don't know how many have been produced actually
# but only if we don't apply single please
@@ -127,6 +129,25 @@ class OutputChannelTask(Node):
self._scheduled_items -= len(items)
self._slock.release()
# END unschedule all
+
+ # PROBLEM: We have failed to create at least one item, hence its not
+ # garantueed that enough items will be produced for a possibly blocking
+ # client on the other end. This is why we have no other choice but
+ # to close the channel, preventing the possibility of blocking.
+ # This implies that dependent tasks will go down with us, but that is
+ # just the right thing to do of course - one loose link in the chain ...
+ # Other chunks of our kind currently being processed will then
+ # fail to write to the channel and fail as well
+ # self.close()
+
+ # If some other chunk of our Task had an error, the channel will be closed
+ # This is not an issue, just be sure we don't overwrite the original
+ # exception with the ReadOnly error that would be emitted in that case.
+ # We imply that ReadOnly is exclusive to us, as it won't be an error
+ # if the user emits it
+ if not isinstance(e, ReadOnly):
+ self._exc = e
+ # END set error flag
# END exception handling
del(wc)
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index cd964f1c..faeda04f 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -125,7 +125,9 @@ class WorkerThread(TerminatableThread):
def __init__(self, inq = None):
super(WorkerThread, self).__init__()
- self.inq = inq or Queue.Queue()
+ self.inq = inq
+ if inq is None:
+ self.inq = Queue.Queue()
self._current_routine = None # routine we execute right now
@classmethod
@@ -135,6 +137,8 @@ class WorkerThread(TerminatableThread):
def run(self):
"""Process input tasks until we receive the quit signal"""
+ print self.name, "starts processing" # DEBUG
+
gettask = self.inq.get
while True:
self._current_routine = None
@@ -166,7 +170,7 @@ class WorkerThread(TerminatableThread):
break
# END make routine call
except StopProcessing:
- print self.name, "stops processing"
+ print self.name, "stops processing" # DEBUG
break
except Exception,e:
print >> sys.stderr, "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index dff38f58..b5e1a0c0 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -10,7 +10,6 @@ from threading import (
)
from Queue import (
- Queue,
Empty,
)
@@ -171,15 +170,14 @@ class HSCondition(object):
class ReadOnly(Exception):
"""Thrown when trying to write to a read-only queue"""
-class AsyncQueue(Queue):
+class AsyncQueue(deque):
"""A queue using different condition objects to gain multithreading performance.
Additionally it has a threadsafe writable flag, which will alert all readers
that there is nothing more to get here.
All default-queue code was cleaned up for performance."""
- __slots__ = ('mutex', 'not_empty', 'queue', '_writable')
+ __slots__ = ('mutex', 'not_empty', '_writable')
def __init__(self, maxsize=0):
- self.queue = deque()
self.mutex = Lock()
self.not_empty = HSCondition(self.mutex)
self._writable = True
@@ -187,7 +185,7 @@ class AsyncQueue(Queue):
def qsize(self):
self.mutex.acquire()
try:
- return len(self.queue)
+ return len(self)
finally:
self.mutex.release()
@@ -218,7 +216,7 @@ class AsyncQueue(Queue):
def empty(self):
self.mutex.acquire()
try:
- return not len(self.queue)
+ return not len(self)
finally:
self.mutex.release()
@@ -228,21 +226,20 @@ class AsyncQueue(Queue):
self.mutex.release()
raise ReadOnly
# END handle read-only
- self.queue.append(item)
+ self.append(item)
self.mutex.release()
self.not_empty.notify()
def get(self, block=True, timeout=None):
self.mutex.acquire()
- q = self.queue
try:
if block:
if timeout is None:
- while not len(q) and self._writable:
+ while not len(self) and self._writable:
self.not_empty.wait()
else:
endtime = _time() + timeout
- while not len(q) and self._writable:
+ while not len(self) and self._writable:
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
@@ -252,7 +249,7 @@ class AsyncQueue(Queue):
# can throw if we woke up because we are not writable anymore
try:
- return q.popleft()
+ return self.popleft()
except IndexError:
raise Empty
# END handle unblocking reason