summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-06 12:48:25 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-06 12:48:25 +0200
commitec28ad575ce1d7bb6a616ffc404f32bbb1af67b2 (patch)
tree90003f8f93becbb0b8aacd4c2ff7119842fa8003 /lib/git/async/pool.py
parentb72e2704022d889f116e49abf3e1e5d3e3192d3b (diff)
downloadgitpython-ec28ad575ce1d7bb6a616ffc404f32bbb1af67b2.tar.gz
thread: adjusted worker thread not to provide an output queue anymore - this is handled by the task system
graph: implemented it including test according to the pools requirements pool: implemented set_pool_size
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py53
1 files changed, 48 insertions, 5 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 7798d3d4..9a24cbc5 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -25,7 +25,8 @@ class TaskNode(Node):
'_out_wc', # output write channel
'_pool_ref', # ref to our pool
'_exc', # exception caught
- 'fun', # function to call with items read from in_rc
+ 'fun', # function to call with items read from in_rc
+ 'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
'apply_single' # apply single items even if multiple where read
)
@@ -36,6 +37,7 @@ class TaskNode(Node):
self._pool_ref = None
self._exc = None
self.fun = fun
+ self.min_count = None
self.max_chunksize = 0 # note set
self.apply_single = apply_single
@@ -174,6 +176,12 @@ class ThreadPool(object):
if task.error() or task.is_done():
self._consumed_tasks.append(task)
+ # allow min-count override. This makes sure we take at least min-count
+ # items off the input queue ( later )
+ if task.min_count is not None:
+ count = task.min_count
+ # END handle min-count
+
# if the task does not have the required output on its queue, schedule
# it for processing. If we should process all, we don't care about the
# amount as it should process until its all done.
@@ -213,7 +221,7 @@ class ThreadPool(object):
Tasks which are not done will be put onto the queue for processing, which
is fine as we walked them depth-first."""
- self._tasks.visit_input_depth_first(task, lambda n: self._queue_feeder_visitor(n, count))
+ self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count))
# delete consumed tasks to cleanup
for task in self._consumed_tasks:
@@ -233,7 +241,9 @@ class ThreadPool(object):
"""Delete the task
Additionally we will remove orphaned tasks, which can be identified if their
output channel is only held by themselves, so no one will ever consume
- its items."""
+ its items.
+
+ :return: self"""
# now delete our actual node - must set it done os it closes its channels.
# Otherwise further reads of output tasks will block.
# Actually they may still block if anyone wants to read all ... without
@@ -246,12 +256,45 @@ class ThreadPool(object):
for t in in_tasks
self._del_task_if_orphaned(t)
# END handle orphans recursively
+
+ return self
def set_pool_size(self, size=0):
- """Set the amount of workers to use in this pool.
+ """Set the amount of workers to use in this pool. When reducing the size,
+ the call may block as it waits for threads to finish.
+ When reducing the size to zero, this thread will process all remaining
+ items on the queue.
+
+ :return: self
:param size: if 0, the pool will do all work itself in the calling thread,
otherwise the work will be distributed among the given amount of threads"""
- raise NotImplementedError()
+ # either start new threads, or kill existing ones.
+ # If we end up with no threads, we process the remaining chunks on the queue
+ # ourselves
+ cur_count = len(self._workers)
+ if cur_count < size:
+ for i in range(size - cur_count):
+ worker = WorkerThread(self._queue)
+ self._workers.append(worker)
+ # END for each new worker to create
+ elif cur_count > size:
+ del_count = cur_count - size
+ for i in range(del_count):
+ self._workers[i].stop_and_join()
+ # END for each thread to stop
+ del(self._workers[:del_count])
+ # END handle count
+
+ if size == 0:
+ while not self._queue.empty():
+ try:
+ taskmethod, count = self._queue.get(False)
+ taskmethod(count)
+ except Queue.Empty:
+ continue
+ # END while there are tasks on the queue
+ # END process queue
+ return self
def add_task(self, task):
"""Add a new task to be processed.