summaryrefslogtreecommitdiff
path: root/lib/git
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-08 19:25:33 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-08 19:25:33 +0200
commit772b95631916223e472989b43f3a31f61e237f31 (patch)
treecac3b488a05502b15532b07bb3848e0fdb2df339 /lib/git
parente5c0002d069382db1768349bf0c5ff40aafbf140 (diff)
downloadgitpython-772b95631916223e472989b43f3a31f61e237f31.tar.gz
workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels
Diffstat (limited to 'lib/git')
-rw-r--r--lib/git/async/channel.py3
-rw-r--r--lib/git/async/pool.py56
-rw-r--r--lib/git/async/task.py2
-rw-r--r--lib/git/async/thread.py24
-rw-r--r--lib/git/async/util.py1
5 files changed, 52 insertions, 34 deletions
diff --git a/lib/git/async/channel.py b/lib/git/async/channel.py
index 5c52b1dc..c05f7383 100644
--- a/lib/git/async/channel.py
+++ b/lib/git/async/channel.py
@@ -60,7 +60,6 @@ class WChannel(Channel):
# let the queue handle the 'closed' attribute, we write much more often
# to an open channel than to a closed one, saving a few cycles
try:
- print "putting item", item, id(self._queue.queue)
self._queue.put(item, block, timeout)
except ReadOnly:
raise IOError("Cannot write to a closed channel")
@@ -76,7 +75,7 @@ class WChannel(Channel):
an error"""
# yes, close it a little too early, better than having anyone put
# additional items
- print "closing channel", self
+ # print "closing channel", self
self._closed = True
self._queue.set_writable(False)
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index a915f7b0..1767c61c 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,5 +1,8 @@
"""Implementation of a thread-pool working with channels"""
-from thread import WorkerThread
+from thread import (
+ WorkerThread,
+ StopProcessing,
+ )
from threading import Lock
from util import (
@@ -147,7 +150,7 @@ class Pool(object):
used only from the main thread, hence you cannot consume their results
from multiple threads unless you use a task for it."""
__slots__ = ( '_tasks', # a graph of tasks
- '_workers', # list of worker threads
+ '_num_workers', # list of workers
'_queue', # master queue for tasks
'_taskorder_cache', # map task id -> ordered dependent tasks
'_taskgraph_lock', # lock for accessing the task graph
@@ -169,7 +172,7 @@ class Pool(object):
def __init__(self, size=0):
self._tasks = Graph()
- self._workers = list()
+ self._num_workers = 0
self._queue = self.TaskQueueCls()
self._taskgraph_lock = self.LockCls()
self._taskorder_cache = dict()
@@ -270,7 +273,7 @@ class Pool(object):
# into the loop would be less code, but ... slower
# DEBUG
# print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
- if self._workers:
+ if self._num_workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
queue = self._queue
@@ -323,7 +326,7 @@ class Pool(object):
#{ Interface
def size(self):
""":return: amount of workers in the pool"""
- return len(self._workers)
+ return self._num_workers
def set_size(self, size=0):
"""Set the amount of workers to use in this pool. When reducing the size,
@@ -341,34 +344,41 @@ class Pool(object):
# either start new threads, or kill existing ones.
# If we end up with no threads, we process the remaining chunks on the queue
# ourselves
- cur_count = len(self._workers)
+ cur_count = self._num_workers
if cur_count < size:
- for i in range(size - cur_count):
- worker = self.WorkerCls(self._queue)
- worker.start()
- self._workers.append(worker)
- # END for each new worker to create
- elif cur_count > size:
# we can safely increase the size, even from serial mode, as we would
# only be able to do this if the serial ( sync ) mode finished processing.
# Just adding more workers is not a problem at all.
+ add_count = size - cur_count
+ for i in range(add_count):
+ print "Add worker"
+ self.WorkerCls(self._queue).start()
+ # END for each new worker to create
+ self._num_workers += add_count
+ elif cur_count > size:
+ # We don't care which thread exactly gets hit by our stop request
+ # On their way, they will consume remaining tasks, but new ones
+ # could be added as we speak.
del_count = cur_count - size
for i in range(del_count):
- self._workers[i].stop_and_join()
+ print "stop worker"
+ self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter
# END for each thread to stop
- del(self._workers[:del_count])
+ self._num_workers -= del_count
# END handle count
if size == 0:
- while not self._queue.empty():
- try:
- taskmethod, count = self._queue.get(False)
- taskmethod(count)
- except Queue.Empty:
- continue
- # END while there are tasks on the queue
-
- self._consumed_tasks = SyncQueue()
+ # NOTE: we do not preocess any tasks still on the queue, as we ill
+ # naturally do that once we read the next time, only on the tasks
+ # that are actually required. The queue will keep the tasks,
+ # and once we are deleted, they will vanish without additional
+ # time spend on them. If there shouldn't be any consumers anyway.
+ # If we should reenable some workers again, they will continue on the
+ # remaining tasks, probably with nothing to do.
+ # We can't clear the task queue if we have removed workers
+ # as they will receive the termination signal through it, and if
+ # we had added workers, we wouldn't be here ;).
+ pass
# END process queue
return self
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 5edd40bb..f9536a45 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -89,7 +89,7 @@ class OutputChannelTask(Node):
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
items = self._read(count)
- print "task read", len(items)
+ # print "task read", len(items)
try:
# increase the ref-count - we use this to determine whether anyone else
# is currently handling our output channel. As this method runs asynchronously,
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index 5faad4f8..556b7e92 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -80,6 +80,10 @@ class TerminatableThread(threading.Thread):
self._terminated()
#} END interface
+
+class StopProcessing(Exception):
+ """If thrown in a function processed by a WorkerThread, it will terminate"""
+
class WorkerThread(TerminatableThread):
""" This base allows to call functions on class instances natively.
@@ -122,6 +126,11 @@ class WorkerThread(TerminatableThread):
self.inq = inq or Queue.Queue()
self._current_routine = None # routine we execute right now
+ @classmethod
+ def stop(cls, *args):
+ """If send via the inq of the thread, it will stop once it processed the function"""
+ raise StopProcessing
+
def run(self):
"""Process input tasks until we receive the quit signal"""
gettask = self.inq.get
@@ -131,12 +140,8 @@ class WorkerThread(TerminatableThread):
break
# END check for stop request
- # don't wait too long, instead check for the termination request more often
- try:
- tasktuple = gettask(True, 0.25)
- except Queue.Empty:
- continue
- # END get task with timeout
+ # we wait and block - to terminate, send the 'stop' method
+ tasktuple = gettask()
# needing exactly one function, and one arg
assert len(tasktuple) == 2, "Need tuple of function, arg - it could be more flexible, but its reduced to what we need"
@@ -158,6 +163,8 @@ class WorkerThread(TerminatableThread):
print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
break
# END make routine call
+ except StopProcessing:
+ break
except Exception,e:
print "%s: Task %s raised unhandled exception: %s - this really shouldn't happen !" % (self.getName(), str(tasktuple), str(e))
break # abort ...
@@ -168,5 +175,8 @@ class WorkerThread(TerminatableThread):
""":return: routine we are currently executing, or None if we have no task"""
return self._current_routine
-
+ def stop_and_join(self):
+ """Send stop message to ourselves"""
+ self.inq.put((self.stop, None))
+ super(WorkerThread, self).stop_and_join()
#} END classes
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index 6d09de59..f3213ed6 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -240,7 +240,6 @@ class AsyncQueue(Queue):
# END handle block
# can throw if we woke up because we are not writable anymore
- print len(q), id(q), current_thread()
try:
return q.popleft()
except IndexError: