diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 10:38:22 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 10:38:22 +0200 |
commit | a8a448b7864e21db46184eab0f0a21d7725d074f (patch) | |
tree | e8bd0c6a27d4c611a0a380dabf894f67b8907566 /lib/git/async/pool.py | |
parent | 6a252661c3bf4202a4d571f9c41d2afa48d9d75f (diff) | |
download | gitpython-a8a448b7864e21db46184eab0f0a21d7725d074f.tar.gz |
pool.consumed_tasks: is now a queue to be thread safe, in preparation for multiple connected pools
Reduced waiting time in tests to make them complete faster
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index fcb0f442..5518e37e 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -1,7 +1,7 @@ """Implementation of a thread-pool working with channels""" from thread import WorkerThread from task import InputChannelTask -from Queue import Queue +from Queue import Queue, Empty from graph import ( Graph, @@ -103,14 +103,14 @@ class ThreadPool(object): used only from the main thread, hence you cannot consume their results from multiple threads unless you use a task for it.""" __slots__ = ( '_tasks', # a graph of tasks - '_consumed_tasks', # a list with tasks that are done or had an error + '_consumed_tasks', # a queue with tasks that are done or had an error '_workers', # list of worker threads '_queue', # master queue for tasks ) def __init__(self, size=0): self._tasks = Graph() - self._consumed_tasks = list() + self._consumed_tasks = Queue() # make sure its threadsafe self._workers = list() self._queue = Queue() self.set_size(size) @@ -123,7 +123,7 @@ class ThreadPool(object): """Walk the graph and find tasks that are done for later cleanup, and queue all others for processing by our worker threads ( if available ).""" if task.error() or task.is_done(): - self._consumed_tasks.append(task) + self._consumed_tasks.put(task) return True # END stop processing @@ -206,16 +206,21 @@ class ThreadPool(object): def _post_channel_read(self, task): """Called after we processed a read to cleanup""" # check whether we consumed the task, and schedule it for deletion + # This could have happend after the read returned ( even though the pre-read + # checks it as well ) if task.error() or task.is_done(): - self._consumed_tasks.append(task) + self._consumed_tasks.put(task) # END handle consumption # delete consumed tasks to cleanup - for task in self._consumed_tasks: - self.del_task(task) - # END for each task to delete - - del(self._consumed_tasks[:]) + try: + while True: + ct = self._consumed_tasks.get(False) + self.del_task(ct) + # END for each task to delete + except Empty: + pass + # END pop queue empty def _del_task_if_orphaned(self, task): """Check the task, and delete it if it is orphaned""" @@ -236,7 +241,9 @@ class ThreadPool(object): :return: self :param size: if 0, the pool will do all work itself in the calling thread, - otherwise the work will be distributed among the given amount of threads""" + otherwise the work will be distributed among the given amount of threads + + :note: currently NOT threadsafe !""" # either start new threads, or kill existing ones. # If we end up with no threads, we process the remaining chunks on the queue # ourselves |