summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-07 10:38:22 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 10:38:22 +0200
commita8a448b7864e21db46184eab0f0a21d7725d074f (patch)
treee8bd0c6a27d4c611a0a380dabf894f67b8907566 /lib/git/async
parent6a252661c3bf4202a4d571f9c41d2afa48d9d75f (diff)
downloadgitpython-a8a448b7864e21db46184eab0f0a21d7725d074f.tar.gz
pool.consumed_tasks: is now a queue to be thread safe, in preparation for multiple connected pools
Reduced waiting time in tests to make them complete faster
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/pool.py29
-rw-r--r--lib/git/async/thread.py5
2 files changed, 23 insertions, 11 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index fcb0f442..5518e37e 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,7 +1,7 @@
"""Implementation of a thread-pool working with channels"""
from thread import WorkerThread
from task import InputChannelTask
-from Queue import Queue
+from Queue import Queue, Empty
from graph import (
Graph,
@@ -103,14 +103,14 @@ class ThreadPool(object):
used only from the main thread, hence you cannot consume their results
from multiple threads unless you use a task for it."""
__slots__ = ( '_tasks', # a graph of tasks
- '_consumed_tasks', # a list with tasks that are done or had an error
+ '_consumed_tasks', # a queue with tasks that are done or had an error
'_workers', # list of worker threads
'_queue', # master queue for tasks
)
def __init__(self, size=0):
self._tasks = Graph()
- self._consumed_tasks = list()
+ self._consumed_tasks = Queue() # make sure its threadsafe
self._workers = list()
self._queue = Queue()
self.set_size(size)
@@ -123,7 +123,7 @@ class ThreadPool(object):
"""Walk the graph and find tasks that are done for later cleanup, and
queue all others for processing by our worker threads ( if available )."""
if task.error() or task.is_done():
- self._consumed_tasks.append(task)
+ self._consumed_tasks.put(task)
return True
# END stop processing
@@ -206,16 +206,21 @@ class ThreadPool(object):
def _post_channel_read(self, task):
"""Called after we processed a read to cleanup"""
# check whether we consumed the task, and schedule it for deletion
+ # This could have happend after the read returned ( even though the pre-read
+ # checks it as well )
if task.error() or task.is_done():
- self._consumed_tasks.append(task)
+ self._consumed_tasks.put(task)
# END handle consumption
# delete consumed tasks to cleanup
- for task in self._consumed_tasks:
- self.del_task(task)
- # END for each task to delete
-
- del(self._consumed_tasks[:])
+ try:
+ while True:
+ ct = self._consumed_tasks.get(False)
+ self.del_task(ct)
+ # END for each task to delete
+ except Empty:
+ pass
+ # END pop queue empty
def _del_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
@@ -236,7 +241,9 @@ class ThreadPool(object):
:return: self
:param size: if 0, the pool will do all work itself in the calling thread,
- otherwise the work will be distributed among the given amount of threads"""
+ otherwise the work will be distributed among the given amount of threads
+
+ :note: currently NOT threadsafe !"""
# either start new threads, or kill existing ones.
# If we end up with no threads, we process the remaining chunks on the queue
# ourselves
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index 0292289d..2ed002e9 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -115,6 +115,11 @@ class WorkerThread(TerminatableThread):
"""
__slots__ = ('inq', 'outq')
+
+ # define how often we should check for a shutdown request in case our
+ # taskqueue is empty
+ shutdown_check_time_s = 0.5
+
def __init__(self, inq = None):
super(WorkerThread, self).__init__()
self.inq = inq or Queue.Queue()