diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 15:40:51 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-09 15:40:51 +0200 |
commit | 07996a1a1e53ffdd2680d4bfbc2f4059687859a5 (patch) | |
tree | ae3e8d6fc5f322fb633bc8c8ef45a1eccb2a605b /lib/git | |
parent | 57a4e09294230a36cc874a6272c71757c48139f2 (diff) | |
download | gitpython-07996a1a1e53ffdd2680d4bfbc2f4059687859a5.tar.gz |
task: removed scheduled task support, which at some point was introduced to improve performance, but which now hinders performance, besides being unnecessary ;)
Diffstat (limited to 'lib/git')
-rw-r--r-- | lib/git/async/pool.py | 43 | ||||
-rw-r--r-- | lib/git/async/task.py | 33 |
2 files changed, 11 insertions, 65 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 549c801e..284c41c7 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -80,27 +80,21 @@ class RPoolChannel(RChannel): self._pre_cb() # END pre callback - # if we have count items, don't do any queue preparation - if someone - # depletes the queue in the meanwhile, the channel will close and - # we will unblock naturally - # PROBLEM: If there are multiple consumer of this channel, we might - # run out of items without being replenished == block forever in the - # worst case. task.min_count could have triggered to produce more ... - # usually per read with n items, we put n items on to the queue, - # so we wouldn't check this - # Even if we have just one consumer ( we could determine that with - # the reference count ), it could be that in one moment we don't yet - # have an item, but its currently being produced by some worker. - # This is why we: - # * make no assumptions if there are multiple consumers - # * + # NOTE: we always queue the operation that would give us count items + # as tracking the scheduled items or testing the channels size + # is in herently unsafe depending on the design of the task network + # If we put on tasks onto the queue for every request, we are sure + # to always produce enough items, even if the task.min_count actually + # provided enough - its better to have some possibly empty task runs + # than having and empty queue that blocks. + + # NOTE: TODO: that case is only possible if one Task could be connected + # to multiple input channels in a manner known by the system. Currently + # this is not possible, but should be implemented at some point # if the user tries to use us to read from a done task, we will never # compute as all produced items are already in the channel skip_compute = self._task.is_done() or self._task.error() - #if count > 0: - # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count - # END ########## prepare ############################## if not skip_compute: @@ -249,13 +243,6 @@ class Pool(object): # raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?") # END skip processing - # if the task does not have the required output on its queue, schedule - # it for processing. If we should process all, we don't care about the - # amount as it should process until its all done. - #if count > 1 and task._out_wc.size() >= count: - # continue - # END skip if we have enough - # but use the actual count to produce the output, we may produce # more than requested numchunks = 1 @@ -283,33 +270,26 @@ class Pool(object): queue = self._queue if numchunks > 1: for i in xrange(numchunks): - # schedule them as early as we know about them - task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END for each chunk to put else: - task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END try efficient looping if remainder: - task.add_scheduled_items(remainder) queue.put((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves if numchunks > 1: for i in xrange(numchunks): - task.add_scheduled_items(chunksize) task.process(chunksize) # END for each chunk to put else: - task.add_scheduled_items(chunksize) task.process(chunksize) # END try efficient looping if remainder: - task.add_scheduled_items(remainder) task.process(remainder) # END handle chunksize # END handle serial mode @@ -452,7 +432,6 @@ class Pool(object): # This brings about 15% more performance, but sacrifices thread-safety # when reading from multiple threads. if self.size() == 0: - task._slock = DummyLock() wctype = SerialWChannel # END improve locks diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 539b240f..be02cfe8 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -23,8 +23,6 @@ class OutputChannelTask(Node): '_out_wc', # output write channel '_exc', # exception caught '_done', # True if we are done - '_scheduled_items', # amount of scheduled items that will be processed in total - '_slock', # lock for scheduled items 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -37,8 +35,6 @@ class OutputChannelTask(Node): self._out_wc = None # to be set later self._exc = None self._done = False - self._scheduled_items = 0 - self._slock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -72,21 +68,6 @@ class OutputChannelTask(Node): """:return: Exception caught during last processing or None""" return self._exc - def add_scheduled_items(self, count): - """Add the given amount of scheduled items to this task""" - self._slock.acquire() - self._scheduled_items += count - self._slock.release() - - def scheduled_item_count(self): - """:return: amount of scheduled items for this task""" - self._slock.acquire() - try: - return self._scheduled_items - finally: - self._slock.release() - # END threadsafe return - def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -101,19 +82,12 @@ class OutputChannelTask(Node): if self.apply_single: for item in items: rval = self.fun(item) - # decrement afterwards, the its unscheduled once its produced - self._slock.acquire() - self._scheduled_items -= 1 - self._slock.release() wc.write(rval) # END for each item else: # shouldn't apply single be the default anyway ? # The task designers should chunk them up in advance rvals = self.fun(items) - self._slock.acquire() - self._scheduled_items -= len(items) - self._slock.release() for rval in rvals: wc.write(rval) # END handle single apply @@ -122,13 +96,6 @@ class OutputChannelTask(Node): # be sure our task is not scheduled again self.set_done() - # unschedule all, we don't know how many have been produced actually - # but only if we don't apply single please - if not self.apply_single: - self._slock.acquire() - self._scheduled_items -= len(items) - self._slock.release() - # END unschedule all # PROBLEM: We have failed to create at least one item, hence its not # garantueed that enough items will be produced for a possibly blocking |