diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 23:49:20 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 23:49:20 +0200 |
commit | 3e2ba9c2028f21d11988558f3557905d21e93808 (patch) | |
tree | ac2426a03dbc538fb970cd4fd22a404edb68ce53 /lib/git/async/task.py | |
parent | be06e87433685b5ea9cfcc131ab89c56cf8292f2 (diff) | |
parent | 898d47d1711accdfded8ee470520fdb96fb12d46 (diff) | |
download | gitpython-3e2ba9c2028f21d11988558f3557905d21e93808.tar.gz |
Merge branch 'stasks' into async
Diffstat (limited to 'lib/git/async/task.py')
-rw-r--r-- | lib/git/async/task.py | 47 |
1 files changed, 45 insertions, 2 deletions
diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 4e8aef54..cf486f48 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -21,6 +21,8 @@ class OutputChannelTask(Node): '_out_wc', # output write channel '_exc', # exception caught '_done', # True if we are done + '_scheduled_items', # amount of scheduled items that will be processed in total + '_slock', # lock for scheduled items 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -33,6 +35,8 @@ class OutputChannelTask(Node): self._out_wc = None # to be set later self._exc = None self._done = False + self._scheduled_items = 0 + self._slock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -50,6 +54,7 @@ class OutputChannelTask(Node): """Set the write channel to the given one :note: resets it done state in order to allow proper queue handling""" self._done = False + self._scheduled_items = 0 self._out_wc = wc def close(self): @@ -65,6 +70,21 @@ class OutputChannelTask(Node): """:return: Exception caught during last processing or None""" return self._exc + def add_scheduled_items(self, count): + """Add the given amount of scheduled items to this task""" + self._slock.acquire() + self._scheduled_items += count + self._slock.release() + + def scheduled_item_count(self): + """:return: amount of scheduled items for this task""" + self._slock.acquire() + try: + return self._scheduled_items + finally: + self._slock.release() + # END threadsafe return + def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -78,14 +98,33 @@ class OutputChannelTask(Node): wc = self._out_wc if self.apply_single: for item in items: - wc.write(self.fun(item)) + rval = self.fun(item) + # decrement afterwards, the its unscheduled once its produced + self._slock.acquire() + self._scheduled_items -= 1 + self._slock.release() + wc.write(rval) # END for each item else: - wc.write(self.fun(items)) + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + self._slock.acquire() + self._scheduled_items -= len(items) + self._slock.release() + for rval in rvals: + wc.write(rval) # END handle single apply except Exception, e: self._exc = e self.set_done() + # unschedule all, we don't know how many have been produced actually + # but only if we don't apply single please + if not self.apply_single: + self._slock.acquire() + self._scheduled_items -= len(items) + self._slock.release() + # END unschedule all # END exception handling del(wc) @@ -189,6 +228,10 @@ class InputChannelTask(OutputChannelTask): # for each task, which would allow to precisely determine whether # the pool as to be triggered, and bail out early. Problem would # be the + # * Perhaps one shouldn't seek the perfect solution , but instead + # document whats working and what not, or under which conditions. + # The whole system is simple, but gets more complicated the + # smarter it wants to be. if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): self._read = self._in_rc._read |