summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py29
1 files changed, 18 insertions, 11 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index fcb0f442..5518e37e 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,7 +1,7 @@
"""Implementation of a thread-pool working with channels"""
from thread import WorkerThread
from task import InputChannelTask
-from Queue import Queue
+from Queue import Queue, Empty
from graph import (
Graph,
@@ -103,14 +103,14 @@ class ThreadPool(object):
used only from the main thread, hence you cannot consume their results
from multiple threads unless you use a task for it."""
__slots__ = ( '_tasks', # a graph of tasks
- '_consumed_tasks', # a list with tasks that are done or had an error
+ '_consumed_tasks', # a queue with tasks that are done or had an error
'_workers', # list of worker threads
'_queue', # master queue for tasks
)
def __init__(self, size=0):
self._tasks = Graph()
- self._consumed_tasks = list()
+ self._consumed_tasks = Queue() # make sure its threadsafe
self._workers = list()
self._queue = Queue()
self.set_size(size)
@@ -123,7 +123,7 @@ class ThreadPool(object):
"""Walk the graph and find tasks that are done for later cleanup, and
queue all others for processing by our worker threads ( if available )."""
if task.error() or task.is_done():
- self._consumed_tasks.append(task)
+ self._consumed_tasks.put(task)
return True
# END stop processing
@@ -206,16 +206,21 @@ class ThreadPool(object):
def _post_channel_read(self, task):
"""Called after we processed a read to cleanup"""
# check whether we consumed the task, and schedule it for deletion
+ # This could have happend after the read returned ( even though the pre-read
+ # checks it as well )
if task.error() or task.is_done():
- self._consumed_tasks.append(task)
+ self._consumed_tasks.put(task)
# END handle consumption
# delete consumed tasks to cleanup
- for task in self._consumed_tasks:
- self.del_task(task)
- # END for each task to delete
-
- del(self._consumed_tasks[:])
+ try:
+ while True:
+ ct = self._consumed_tasks.get(False)
+ self.del_task(ct)
+ # END for each task to delete
+ except Empty:
+ pass
+ # END pop queue empty
def _del_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
@@ -236,7 +241,9 @@ class ThreadPool(object):
:return: self
:param size: if 0, the pool will do all work itself in the calling thread,
- otherwise the work will be distributed among the given amount of threads"""
+ otherwise the work will be distributed among the given amount of threads
+
+ :note: currently NOT threadsafe !"""
# either start new threads, or kill existing ones.
# If we end up with no threads, we process the remaining chunks on the queue
# ourselves