summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-08 19:25:33 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-08 19:25:33 +0200
commit772b95631916223e472989b43f3a31f61e237f31 (patch)
treecac3b488a05502b15532b07bb3848e0fdb2df339 /lib/git/async/pool.py
parente5c0002d069382db1768349bf0c5ff40aafbf140 (diff)
downloadgitpython-772b95631916223e472989b43f3a31f61e237f31.tar.gz
workerthread: adjusted to use a blocking queue, it will receive termination events only with its queue, with boosts performance into brigt green levels
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py56
1 files changed, 33 insertions, 23 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index a915f7b0..1767c61c 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,5 +1,8 @@
"""Implementation of a thread-pool working with channels"""
-from thread import WorkerThread
+from thread import (
+ WorkerThread,
+ StopProcessing,
+ )
from threading import Lock
from util import (
@@ -147,7 +150,7 @@ class Pool(object):
used only from the main thread, hence you cannot consume their results
from multiple threads unless you use a task for it."""
__slots__ = ( '_tasks', # a graph of tasks
- '_workers', # list of worker threads
+ '_num_workers', # list of workers
'_queue', # master queue for tasks
'_taskorder_cache', # map task id -> ordered dependent tasks
'_taskgraph_lock', # lock for accessing the task graph
@@ -169,7 +172,7 @@ class Pool(object):
def __init__(self, size=0):
self._tasks = Graph()
- self._workers = list()
+ self._num_workers = 0
self._queue = self.TaskQueueCls()
self._taskgraph_lock = self.LockCls()
self._taskorder_cache = dict()
@@ -270,7 +273,7 @@ class Pool(object):
# into the loop would be less code, but ... slower
# DEBUG
# print actual_count, numchunks, chunksize, remainder, task._out_wc.size()
- if self._workers:
+ if self._num_workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
queue = self._queue
@@ -323,7 +326,7 @@ class Pool(object):
#{ Interface
def size(self):
""":return: amount of workers in the pool"""
- return len(self._workers)
+ return self._num_workers
def set_size(self, size=0):
"""Set the amount of workers to use in this pool. When reducing the size,
@@ -341,34 +344,41 @@ class Pool(object):
# either start new threads, or kill existing ones.
# If we end up with no threads, we process the remaining chunks on the queue
# ourselves
- cur_count = len(self._workers)
+ cur_count = self._num_workers
if cur_count < size:
- for i in range(size - cur_count):
- worker = self.WorkerCls(self._queue)
- worker.start()
- self._workers.append(worker)
- # END for each new worker to create
- elif cur_count > size:
# we can safely increase the size, even from serial mode, as we would
# only be able to do this if the serial ( sync ) mode finished processing.
# Just adding more workers is not a problem at all.
+ add_count = size - cur_count
+ for i in range(add_count):
+ print "Add worker"
+ self.WorkerCls(self._queue).start()
+ # END for each new worker to create
+ self._num_workers += add_count
+ elif cur_count > size:
+ # We don't care which thread exactly gets hit by our stop request
+ # On their way, they will consume remaining tasks, but new ones
+ # could be added as we speak.
del_count = cur_count - size
for i in range(del_count):
- self._workers[i].stop_and_join()
+ print "stop worker"
+ self._queue.put((self.WorkerCls.stop, True)) # arg doesnt matter
# END for each thread to stop
- del(self._workers[:del_count])
+ self._num_workers -= del_count
# END handle count
if size == 0:
- while not self._queue.empty():
- try:
- taskmethod, count = self._queue.get(False)
- taskmethod(count)
- except Queue.Empty:
- continue
- # END while there are tasks on the queue
-
- self._consumed_tasks = SyncQueue()
+ # NOTE: we do not preocess any tasks still on the queue, as we ill
+ # naturally do that once we read the next time, only on the tasks
+ # that are actually required. The queue will keep the tasks,
+ # and once we are deleted, they will vanish without additional
+ # time spend on them. If there shouldn't be any consumers anyway.
+ # If we should reenable some workers again, they will continue on the
+ # remaining tasks, probably with nothing to do.
+ # We can't clear the task queue if we have removed workers
+ # as they will receive the termination signal through it, and if
+ # we had added workers, we wouldn't be here ;).
+ pass
# END process queue
return self