diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 12:48:25 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 12:48:25 +0200 |
commit | ec28ad575ce1d7bb6a616ffc404f32bbb1af67b2 (patch) | |
tree | 90003f8f93becbb0b8aacd4c2ff7119842fa8003 /lib/git/async/pool.py | |
parent | b72e2704022d889f116e49abf3e1e5d3e3192d3b (diff) | |
download | gitpython-ec28ad575ce1d7bb6a616ffc404f32bbb1af67b2.tar.gz |
thread: adjusted worker thread not to provide an output queue anymore - this is handled by the task system
graph: implemented it including test according to the pools requirements
pool: implemented set_pool_size
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 53 |
1 files changed, 48 insertions, 5 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7798d3d4..9a24cbc5 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -25,7 +25,8 @@ class TaskNode(Node): '_out_wc', # output write channel '_pool_ref', # ref to our pool '_exc', # exception caught - 'fun', # function to call with items read from in_rc + 'fun', # function to call with items read from in_rc + 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call 'apply_single' # apply single items even if multiple where read ) @@ -36,6 +37,7 @@ class TaskNode(Node): self._pool_ref = None self._exc = None self.fun = fun + self.min_count = None self.max_chunksize = 0 # note set self.apply_single = apply_single @@ -174,6 +176,12 @@ class ThreadPool(object): if task.error() or task.is_done(): self._consumed_tasks.append(task) + # allow min-count override. This makes sure we take at least min-count + # items off the input queue ( later ) + if task.min_count is not None: + count = task.min_count + # END handle min-count + # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. @@ -213,7 +221,7 @@ class ThreadPool(object): Tasks which are not done will be put onto the queue for processing, which is fine as we walked them depth-first.""" - self._tasks.visit_input_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) # delete consumed tasks to cleanup for task in self._consumed_tasks: @@ -233,7 +241,9 @@ class ThreadPool(object): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume - its items.""" + its items. + + :return: self""" # now delete our actual node - must set it done os it closes its channels. # Otherwise further reads of output tasks will block. # Actually they may still block if anyone wants to read all ... without @@ -246,12 +256,45 @@ class ThreadPool(object): for t in in_tasks self._del_task_if_orphaned(t) # END handle orphans recursively + + return self def set_pool_size(self, size=0): - """Set the amount of workers to use in this pool. + """Set the amount of workers to use in this pool. When reducing the size, + the call may block as it waits for threads to finish. + When reducing the size to zero, this thread will process all remaining + items on the queue. + + :return: self :param size: if 0, the pool will do all work itself in the calling thread, otherwise the work will be distributed among the given amount of threads""" - raise NotImplementedError() + # either start new threads, or kill existing ones. + # If we end up with no threads, we process the remaining chunks on the queue + # ourselves + cur_count = len(self._workers) + if cur_count < size: + for i in range(size - cur_count): + worker = WorkerThread(self._queue) + self._workers.append(worker) + # END for each new worker to create + elif cur_count > size: + del_count = cur_count - size + for i in range(del_count): + self._workers[i].stop_and_join() + # END for each thread to stop + del(self._workers[:del_count]) + # END handle count + + if size == 0: + while not self._queue.empty(): + try: + taskmethod, count = self._queue.get(False) + taskmethod(count) + except Queue.Empty: + continue + # END while there are tasks on the queue + # END process queue + return self def add_task(self, task): """Add a new task to be processed. |