summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-08 17:25:43 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-08 17:25:43 +0200
commit13dd59ba5b3228820841682b59bad6c22476ff66 (patch)
tree7b324d19f031112c78cafea469ed10ed689c57e1 /lib/git/async
parent619c11787742ce00a0ee8f841cec075897873c79 (diff)
downloadgitpython-13dd59ba5b3228820841682b59bad6c22476ff66.tar.gz
task: now deletes itself once its done - for the test this doesn't change a thing as the task deletes itself too late - its time for a paradigm change, the task should be deleted with its RPoolChannel or explicitly by the user. The test needs to adapt, and shouldn't assume anything unless the RPoolChannel is gone
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/pool.py57
-rw-r--r--lib/git/async/task.py22
2 files changed, 31 insertions, 48 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index cf1c2199..fce5e424 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -78,14 +78,17 @@ class RPoolChannel(RChannel):
# have an item, but its currently being produced by some worker.
# This is why we:
# * make no assumptions if there are multiple consumers
- # *
- have_enough = False
+ # *
+
+ # if the user tries to use us to read from a done task, we will never
+ # compute as all produced items are already in the channel
+ skip_compute = self._task.is_done() or self._task.error()
#if count > 0:
- # have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
+ # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
# END
########## prepare ##############################
- if not have_enough:
+ if not skip_compute:
self._pool._prepare_channel_read(self._task, count)
# END prepare pool scheduling
@@ -134,7 +137,6 @@ class Pool(object):
used only from the main thread, hence you cannot consume their results
from multiple threads unless you use a task for it."""
__slots__ = ( '_tasks', # a graph of tasks
- '_consumed_tasks', # a queue with tasks that are done or had an error
'_workers', # list of worker threads
'_queue', # master queue for tasks
'_taskorder_cache', # map task id -> ordered dependent tasks
@@ -157,7 +159,6 @@ class Pool(object):
def __init__(self, size=0):
self._tasks = Graph()
- self._consumed_tasks = None
self._workers = list()
self._queue = self.TaskQueueCls()
self._taskgraph_lock = self.LockCls()
@@ -224,8 +225,10 @@ class Pool(object):
# requested one last
for task in dfirst_tasks:
if task.error() or task.is_done():
- self._consumed_tasks.put(task)
- continue
+ # in theory, the should never be consumed task in the pool, right ?
+ # They delete themselves once they are done.
+ raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
+ #continue
# END skip processing
# if the task does not have the required output on its queue, schedule
@@ -297,26 +300,8 @@ class Pool(object):
def _post_channel_read(self, task):
"""Called after we processed a read to cleanup"""
- # check whether we consumed the task, and schedule it for deletion
- # This could have happend after the read returned ( even though the pre-read
- # checks it as well )
- if task.error() or task.is_done():
- self._consumed_tasks.put(task)
- # END handle consumption
-
- self._handle_consumed_tasks()
-
- def _handle_consumed_tasks(self):
- """Remove all consumed tasks from our queue by deleting them"""
- try:
- while True:
- ct = self._consumed_tasks.get(False)
- self.del_task(ct)
- # END for each task to delete
- except Empty:
- pass
- # END pop queue empty
-
+ pass
+
def _del_task_if_orphaned(self, task):
"""Check the task, and delete it if it is orphaned"""
# 1 as its stored on the task, 1 for the getrefcount call
@@ -347,11 +332,6 @@ class Pool(object):
# ourselves
cur_count = len(self._workers)
if cur_count < size:
- # make sure we have a real queue, and can store our consumed tasks properly
- if not isinstance(self._consumed_tasks, self.TaskQueueCls):
- self._consumed_tasks = Queue()
- # END init queue
-
for i in range(size - cur_count):
worker = self.WorkerCls(self._queue)
worker.start()
@@ -377,9 +357,6 @@ class Pool(object):
continue
# END while there are tasks on the queue
- if self._consumed_tasks and not self._consumed_tasks.empty():
- self._handle_consumed_tasks()
- # END assure consumed tasks are empty
self._consumed_tasks = SyncQueue()
# END process queue
return self
@@ -437,11 +414,7 @@ class Pool(object):
wc, rc = Channel()
rc = RPoolChannel(wc, task, self)
task.set_wc(wc)
-
- has_input_channel = isinstance(task, InputChannelTask)
- if has_input_channel:
- task.set_pool(self)
- # END init input channel task
+ task.set_pool(self)
self._taskgraph_lock.acquire()
try:
@@ -452,7 +425,7 @@ class Pool(object):
# END sync task addition
# If the input channel is one of our read channels, we add the relation
- if has_input_channel:
+ if isinstance(task, InputChannelTask):
ic = task.in_rc
if isinstance(ic, RPoolChannel) and ic._pool is self:
self._taskgraph_lock.acquire()
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 97521cae..dc207c33 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -23,6 +23,7 @@ class OutputChannelTask(Node):
'_done', # True if we are done
'_scheduled_items', # amount of scheduled items that will be processed in total
'_slock', # lock for scheduled items
+ '_pool_ref', # to be set by Pool
'fun', # function to call with items read
'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
@@ -84,6 +85,10 @@ class OutputChannelTask(Node):
finally:
self._slock.release()
# END threadsafe return
+
+ def set_pool(self, pool):
+ """Set our pool to the given one, it will be weakref'd"""
+ self._pool_ref = weakref.ref(pool)
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
@@ -147,6 +152,16 @@ class OutputChannelTask(Node):
# + 1 for the instance we provide to refcount
if self.is_done() and sys.getrefcount(self._out_wc) < 4:
self.close()
+ # additionally, remove ourselves from the pool, this is thread-safe
+ # Previously the pool collected done tasks and removed them,
+ # but this could happen after a read finished, potentially
+ # leaving them on the queue until the read-handle was dropped.
+ # This should assure its more in-time.
+ # I don't like this back-ref.
+ pool = self._pool_ref()
+ if pool:
+ pool.del_task(self)
+ # END remove ourselves from the pool
# END handle channel closure
#{ Configuration
@@ -204,8 +219,7 @@ class InputChannelTask(OutputChannelTask):
For instantiation, it takes all arguments of its base, the first one needs
to be the input channel to read from though."""
__slots__ = (
- 'in_rc', # channel to read items from
- '_pool_ref' # to be set by Pool
+ 'in_rc' # channel to read items from
)
def __init__(self, in_rc, *args, **kwargs):
@@ -242,9 +256,5 @@ class InputChannelTask(OutputChannelTask):
# and call it
return OutputChannelTask.process(self, count)
-
- def set_pool(self, pool):
- """Set our pool to the given one, it will be weakref'd"""
- self._pool_ref = weakref.ref(pool)
#{ Configuration