summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/git/async/pool.py29
-rw-r--r--lib/git/async/thread.py5
-rw-r--r--test/git/async/test_channel.py15
3 files changed, 31 insertions, 18 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index fcb0f442..5518e37e 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -1,7 +1,7 @@
"""Implementation of a thread-pool working with channels"""
from thread import WorkerThread
from task import InputChannelTask
-from Queue import Queue
+from Queue import Queue, Empty
from graph import (
Graph,
@@ -103,14 +103,14 @@ class ThreadPool(object):
used only from the main thread, hence you cannot consume their results
from multiple threads unless you use a task for it."""
__slots__ = ( '_tasks', # a graph of tasks
- '_consumed_tasks', # a list with tasks that are done or had an error
+ '_consumed_tasks', # a queue with tasks that are done or had an error
'_workers', # list of worker threads
'_queue', # master queue for tasks
)
def __init__(self, size=0):
self._tasks = Graph()
- self._consumed_tasks = list()
+ self._consumed_tasks = Queue() # make sure its threadsafe
self._workers = list()
self._queue = Queue()
self.set_size(size)
@@ -123,7 +123,7 @@ class ThreadPool(object):
"""Walk the graph and find tasks that are done for later cleanup, and
queue all others for processing by our worker threads ( if available )."""
if task.error() or task.is_done():
- self._consumed_tasks.append(task)
+ self._consumed_tasks.put(task)
return True
# END stop processing
@@ -206,16 +206,21 @@ class ThreadPool(object):
def _post_channel_read(self, task):
"""Called after we processed a read to cleanup"""
# check whether we consumed the task, and schedule it for deletion
+ # This could have happend after the read returned ( even though the pre-read
+ # checks it as well )
if task.error() or task.is_done():
- self._consumed_tasks.append(task)
+ self._consumed_tasks.put(task)
# END handle consumption
# delete consumed tasks to cleanup
- for task in self._consumed_tasks:
- self.del_task(task)
- # END for each task to delete
-
- del(self._consumed_tasks[:])
+ try:
+ while True:
+ ct = self._consumed_tasks.get(False)
+ self.del_task(ct)
+ # END for each task to delete
+ except Empty:
+ pass
+ # END pop queue empty
def _del_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
@@ -236,7 +241,9 @@ class ThreadPool(object):
:return: self
:param size: if 0, the pool will do all work itself in the calling thread,
- otherwise the work will be distributed among the given amount of threads"""
+ otherwise the work will be distributed among the given amount of threads
+
+ :note: currently NOT threadsafe !"""
# either start new threads, or kill existing ones.
# If we end up with no threads, we process the remaining chunks on the queue
# ourselves
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index 0292289d..2ed002e9 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -115,6 +115,11 @@ class WorkerThread(TerminatableThread):
"""
__slots__ = ('inq', 'outq')
+
+ # define how often we should check for a shutdown request in case our
+ # taskqueue is empty
+ shutdown_check_time_s = 0.5
+
def __init__(self, inq = None):
super(WorkerThread, self).__init__()
self.inq = inq or Queue.Queue()
diff --git a/test/git/async/test_channel.py b/test/git/async/test_channel.py
index 6472b5b5..acfbd15e 100644
--- a/test/git/async/test_channel.py
+++ b/test/git/async/test_channel.py
@@ -22,14 +22,15 @@ class TestChannels(TestBase):
wc.write(item2)
# read all - it blocks as its still open for writing
+ to = 0.2
st = time.time()
- assert rc.read(timeout=1) == [item, item2]
- assert time.time() - st >= 1.0
+ assert rc.read(timeout=to) == [item, item2]
+ assert time.time() - st >= to
# next read blocks. it waits a second
st = time.time()
- assert len(rc.read(1, True, 1)) == 0
- assert time.time() - st >= 1.0
+ assert len(rc.read(1, True, to)) == 0
+ assert time.time() - st >= to
# writing to a closed channel raises
assert not wc.closed
@@ -50,10 +51,10 @@ class TestChannels(TestBase):
wc, rc = Channel(1)
wc.write(item) # fine
- # blocks for a second, its full
+ # blocks for a a moment, its full
st = time.time()
- self.failUnlessRaises(EOFError, wc.write, item, True, 1)
- assert time.time() - st >= 1.0
+ self.failUnlessRaises(EOFError, wc.write, item, True, to)
+ assert time.time() - st >= to
# get our only one
assert rc.read(1)[0] == item