summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/pool.py57
-rw-r--r--lib/git/async/task.py22
2 files changed, 31 insertions, 48 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index cf1c2199..fce5e424 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -78,14 +78,17 @@ class RPoolChannel(RChannel):
# have an item, but its currently being produced by some worker.
# This is why we:
# * make no assumptions if there are multiple consumers
- # *
- have_enough = False
+ # *
+
+ # if the user tries to use us to read from a done task, we will never
+ # compute as all produced items are already in the channel
+ skip_compute = self._task.is_done() or self._task.error()
#if count > 0:
- # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
+ # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
# END
########## prepare ##############################
- if not have_enough:
+ if not skip_compute:
self._pool._prepare_channel_read(self._task, count)
# END prepare pool scheduling
@@ -134,7 +137,6 @@ class Pool(object):
used only from the main thread, hence you cannot consume their results
from multiple threads unless you use a task for it."""
__slots__ = ( '_tasks', # a graph of tasks
- '_consumed_tasks', # a queue with tasks that are done or had an error
'_workers', # list of worker threads
'_queue', # master queue for tasks
'_taskorder_cache', # map task id -> ordered dependent tasks
@@ -157,7 +159,6 @@ class Pool(object):
def __init__(self, size=0):
self._tasks = Graph()
- self._consumed_tasks = None
self._workers = list()
self._queue = self.TaskQueueCls()
self._taskgraph_lock = self.LockCls()
@@ -224,8 +225,10 @@ class Pool(object):
# requested one last
for task in dfirst_tasks:
if task.error() or task.is_done():
- self._consumed_tasks.put(task)
- continue
+ # in theory, the should never be consumed task in the pool, right ?
+ # They delete themselves once they are done.
+ raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
+ #continue
# END skip processing
# if the task does not have the required output on its queue, schedule
@@ -297,26 +300,8 @@ class Pool(object):
def _post_channel_read(self, task):
"""Called after we processed a read to cleanup"""
- # check whether we consumed the task, and schedule it for deletion
- # This could have happend after the read returned ( even though the pre-read
- # checks it as well )
- if task.error() or task.is_done():
- self._consumed_tasks.put(task)
- # END handle consumption
-
- self._handle_consumed_tasks()
-
- def _handle_consumed_tasks(self):
- """Remove all consumed tasks from our queue by deleting them"""
- try:
- while True:
- ct = self._consumed_tasks.get(False)
- self.del_task(ct)
- # END for each task to delete
- except Empty:
- pass
- # END pop queue empty
-
+ pass
+
def _del_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
# 1 as its stored on the task, 1 for the getrefcount call
@@ -347,11 +332,6 @@ class Pool(object):
# ourselves
cur_count = len(self._workers)
if cur_count < size:
- # make sure we have a real queue, and can store our consumed tasks properly
- if not isinstance(self._consumed_tasks, self.TaskQueueCls):
- self._consumed_tasks = Queue()
- # END init queue
-
for i in range(size - cur_count):
worker = self.WorkerCls(self._queue)
worker.start()
@@ -377,9 +357,6 @@ class Pool(object):
continue
# END while there are tasks on the queue
- if self._consumed_tasks and not self._consumed_tasks.empty():
- self._handle_consumed_tasks()
- # END assure consumed tasks are empty
self._consumed_tasks = SyncQueue()
# END process queue
return self
@@ -437,11 +414,7 @@ class Pool(object):
wc, rc = Channel()
rc = RPoolChannel(wc, task, self)
task.set_wc(wc)
-
- has_input_channel = isinstance(task, InputChannelTask)
- if has_input_channel:
- task.set_pool(self)
- # END init input channel task
+ task.set_pool(self)
self._taskgraph_lock.acquire()
try:
@@ -452,7 +425,7 @@ class Pool(object):
# END sync task addition
# If the input channel is one of our read channels, we add the relation
- if has_input_channel:
+ if isinstance(task, InputChannelTask):
ic = task.in_rc
if isinstance(ic, RPoolChannel) and ic._pool is self:
self._taskgraph_lock.acquire()
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 97521cae..dc207c33 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -23,6 +23,7 @@ class OutputChannelTask(Node):
'_done', # True if we are done
'_scheduled_items', # amount of scheduled items that will be processed in total
'_slock', # lock for scheduled items
+ '_pool_ref', # to be set by Pool
'fun', # function to call with items read
'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
@@ -84,6 +85,10 @@ class OutputChannelTask(Node):
finally:
self._slock.release()
# END threadsafe return
+
+ def set_pool(self, pool):
+ """Set our pool to the given one, it will be weakref'd"""
+ self._pool_ref = weakref.ref(pool)
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
@@ -147,6 +152,16 @@ class OutputChannelTask(Node):
# + 1 for the instance we provide to refcount
if self.is_done() and sys.getrefcount(self._out_wc) < 4:
self.close()
+ # additionally, remove ourselves from the pool, this is thread-safe
+ # Previously the pool collected done tasks and removed them,
+ # but this could happen after a read finished, potentially
+ # leaving them on the queue until the read-handle was dropped.
+ # This should assure its more in-time.
+ # I don't like this back-ref.
+ pool = self._pool_ref()
+ if pool:
+ pool.del_task(self)
+ # END remove ourselves from the pool
# END handle channel closure
#{ Configuration
@@ -204,8 +219,7 @@ class InputChannelTask(OutputChannelTask):
For instantiation, it takes all arguments of its base, the first one needs
to be the input channel to read from though."""
__slots__ = (
- 'in_rc', # channel to read items from
- '_pool_ref' # to be set by Pool
+ 'in_rc' # channel to read items from
)
def __init__(self, in_rc, *args, **kwargs):
@@ -242,9 +256,5 @@ class InputChannelTask(OutputChannelTask):
# and call it
return OutputChannelTask.process(self, count)
-
- def set_pool(self, pool):
- """Set our pool to the given one, it will be weakref'd"""
- self._pool_ref = weakref.ref(pool)
#{ Configuration