summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py57
1 files changed, 15 insertions, 42 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index cf1c2199..fce5e424 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -78,14 +78,17 @@ class RPoolChannel(RChannel):
# have an item, but its currently being produced by some worker.
# This is why we:
# * make no assumptions if there are multiple consumers
- # *
- have_enough = False
+ # *
+
+ # if the user tries to use us to read from a done task, we will never
+ # compute as all produced items are already in the channel
+ skip_compute = self._task.is_done() or self._task.error()
#if count > 0:
- # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
+ # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
# END
########## prepare ##############################
- if not have_enough:
+ if not skip_compute:
self._pool._prepare_channel_read(self._task, count)
# END prepare pool scheduling
@@ -134,7 +137,6 @@ class Pool(object):
used only from the main thread, hence you cannot consume their results
from multiple threads unless you use a task for it."""
__slots__ = ( '_tasks', # a graph of tasks
- '_consumed_tasks', # a queue with tasks that are done or had an error
'_workers', # list of worker threads
'_queue', # master queue for tasks
'_taskorder_cache', # map task id -> ordered dependent tasks
@@ -157,7 +159,6 @@ class Pool(object):
def __init__(self, size=0):
self._tasks = Graph()
- self._consumed_tasks = None
self._workers = list()
self._queue = self.TaskQueueCls()
self._taskgraph_lock = self.LockCls()
@@ -224,8 +225,10 @@ class Pool(object):
# requested one last
for task in dfirst_tasks:
if task.error() or task.is_done():
- self._consumed_tasks.put(task)
- continue
+ # in theory, the should never be consumed task in the pool, right ?
+ # They delete themselves once they are done.
+ raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
+ #continue
# END skip processing
# if the task does not have the required output on its queue, schedule
@@ -297,26 +300,8 @@ class Pool(object):
def _post_channel_read(self, task):
"""Called after we processed a read to cleanup"""
- # check whether we consumed the task, and schedule it for deletion
- # This could have happend after the read returned ( even though the pre-read
- # checks it as well )
- if task.error() or task.is_done():
- self._consumed_tasks.put(task)
- # END handle consumption
-
- self._handle_consumed_tasks()
-
- def _handle_consumed_tasks(self):
- """Remove all consumed tasks from our queue by deleting them"""
- try:
- while True:
- ct = self._consumed_tasks.get(False)
- self.del_task(ct)
- # END for each task to delete
- except Empty:
- pass
- # END pop queue empty
-
+ pass
+
def _del_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
# 1 as its stored on the task, 1 for the getrefcount call
@@ -347,11 +332,6 @@ class Pool(object):
# ourselves
cur_count = len(self._workers)
if cur_count < size:
- # make sure we have a real queue, and can store our consumed tasks properly
- if not isinstance(self._consumed_tasks, self.TaskQueueCls):
- self._consumed_tasks = Queue()
- # END init queue
-
for i in range(size - cur_count):
worker = self.WorkerCls(self._queue)
worker.start()
@@ -377,9 +357,6 @@ class Pool(object):
continue
# END while there are tasks on the queue
- if self._consumed_tasks and not self._consumed_tasks.empty():
- self._handle_consumed_tasks()
- # END assure consumed tasks are empty
self._consumed_tasks = SyncQueue()
# END process queue
return self
@@ -437,11 +414,7 @@ class Pool(object):
wc, rc = Channel()
rc = RPoolChannel(wc, task, self)
task.set_wc(wc)
-
- has_input_channel = isinstance(task, InputChannelTask)
- if has_input_channel:
- task.set_pool(self)
- # END init input channel task
+ task.set_pool(self)
self._taskgraph_lock.acquire()
try:
@@ -452,7 +425,7 @@ class Pool(object):
# END sync task addition
# If the input channel is one of our read channels, we add the relation
- if has_input_channel:
+ if isinstance(task, InputChannelTask):
ic = task.in_rc
if isinstance(ic, RPoolChannel) and ic._pool is self:
self._taskgraph_lock.acquire()