summaryrefslogtreecommitdiff
path: root/lib/git/async/task.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r--lib/git/async/task.py47
1 files changed, 45 insertions, 2 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 4e8aef54..cf486f48 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -21,6 +21,8 @@ class OutputChannelTask(Node):
'_out_wc', # output write channel
'_exc', # exception caught
'_done', # True if we are done
+ '_scheduled_items', # amount of scheduled items that will be processed in total
+ '_slock', # lock for scheduled items
'fun', # function to call with items read
'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
@@ -33,6 +35,8 @@ class OutputChannelTask(Node):
self._out_wc = None # to be set later
self._exc = None
self._done = False
+ self._scheduled_items = 0
+ self._slock = threading.Lock()
self.fun = fun
self.min_count = None
self.max_chunksize = 0 # note set
@@ -50,6 +54,7 @@ class OutputChannelTask(Node):
"""Set the write channel to the given one
:note: resets it done state in order to allow proper queue handling"""
self._done = False
+ self._scheduled_items = 0
self._out_wc = wc
def close(self):
@@ -65,6 +70,21 @@ class OutputChannelTask(Node):
""":return: Exception caught during last processing or None"""
return self._exc
+ def add_scheduled_items(self, count):
+ """Add the given amount of scheduled items to this task"""
+ self._slock.acquire()
+ self._scheduled_items += count
+ self._slock.release()
+
+ def scheduled_item_count(self):
+ """:return: amount of scheduled items for this task"""
+ self._slock.acquire()
+ try:
+ return self._scheduled_items
+ finally:
+ self._slock.release()
+ # END threadsafe return
+
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
items = self._read(count)
@@ -78,14 +98,33 @@ class OutputChannelTask(Node):
wc = self._out_wc
if self.apply_single:
for item in items:
- wc.write(self.fun(item))
+ rval = self.fun(item)
+ # decrement afterwards, the its unscheduled once its produced
+ self._slock.acquire()
+ self._scheduled_items -= 1
+ self._slock.release()
+ wc.write(rval)
# END for each item
else:
- wc.write(self.fun(items))
+ # shouldn't apply single be the default anyway ?
+ # The task designers should chunk them up in advance
+ rvals = self.fun(items)
+ self._slock.acquire()
+ self._scheduled_items -= len(items)
+ self._slock.release()
+ for rval in rvals:
+ wc.write(rval)
# END handle single apply
except Exception, e:
self._exc = e
self.set_done()
+ # unschedule all, we don't know how many have been produced actually
+ # but only if we don't apply single please
+ if not self.apply_single:
+ self._slock.acquire()
+ self._scheduled_items -= len(items)
+ self._slock.release()
+ # END unschedule all
# END exception handling
del(wc)
@@ -189,6 +228,10 @@ class InputChannelTask(OutputChannelTask):
# for each task, which would allow to precisely determine whether
# the pool as to be triggered, and bail out early. Problem would
# be the
+ # * Perhaps one shouldn't seek the perfect solution , but instead
+ # document whats working and what not, or under which conditions.
+ # The whole system is simple, but gets more complicated the
+ # smarter it wants to be.
if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref():
self._read = self._in_rc._read