summaryrefslogtreecommitdiff
path: root/lib/git
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-09 15:40:51 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-09 15:40:51 +0200
commit07996a1a1e53ffdd2680d4bfbc2f4059687859a5 (patch)
treeae3e8d6fc5f322fb633bc8c8ef45a1eccb2a605b /lib/git
parent57a4e09294230a36cc874a6272c71757c48139f2 (diff)
downloadgitpython-07996a1a1e53ffdd2680d4bfbc2f4059687859a5.tar.gz
task: removed scheduled task support, which at some point was introduced to improve performance, but which now hinders performance, besides being unnecessary ;)
Diffstat (limited to 'lib/git')
-rw-r--r--lib/git/async/pool.py43
-rw-r--r--lib/git/async/task.py33
2 files changed, 11 insertions, 65 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 549c801e..284c41c7 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -80,27 +80,21 @@ class RPoolChannel(RChannel):
self._pre_cb()
# END pre callback
- # if we have count items, don't do any queue preparation - if someone
- # depletes the queue in the meanwhile, the channel will close and
- # we will unblock naturally
- # PROBLEM: If there are multiple consumer of this channel, we might
- # run out of items without being replenished == block forever in the
- # worst case. task.min_count could have triggered to produce more ...
- # usually per read with n items, we put n items on to the queue,
- # so we wouldn't check this
- # Even if we have just one consumer ( we could determine that with
- # the reference count ), it could be that in one moment we don't yet
- # have an item, but its currently being produced by some worker.
- # This is why we:
- # * make no assumptions if there are multiple consumers
- # *
+ # NOTE: we always queue the operation that would give us count items
+ # as tracking the scheduled items or testing the channels size
+ # is in herently unsafe depending on the design of the task network
+ # If we put on tasks onto the queue for every request, we are sure
+ # to always produce enough items, even if the task.min_count actually
+ # provided enough - its better to have some possibly empty task runs
+ # than having and empty queue that blocks.
+
+ # NOTE: TODO: that case is only possible if one Task could be connected
+ # to multiple input channels in a manner known by the system. Currently
+ # this is not possible, but should be implemented at some point
# if the user tries to use us to read from a done task, we will never
# compute as all produced items are already in the channel
skip_compute = self._task.is_done() or self._task.error()
- #if count > 0:
- # skip_compute = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
- # END
########## prepare ##############################
if not skip_compute:
@@ -249,13 +243,6 @@ class Pool(object):
# raise AssertionError("Shouldn't have consumed tasks on the pool, they delete themeselves, what happend ?")
# END skip processing
- # if the task does not have the required output on its queue, schedule
- # it for processing. If we should process all, we don't care about the
- # amount as it should process until its all done.
- #if count > 1 and task._out_wc.size() >= count:
- # continue
- # END skip if we have enough
-
# but use the actual count to produce the output, we may produce
# more than requested
numchunks = 1
@@ -283,33 +270,26 @@ class Pool(object):
queue = self._queue
if numchunks > 1:
for i in xrange(numchunks):
- # schedule them as early as we know about them
- task.add_scheduled_items(chunksize)
queue.put((task.process, chunksize))
# END for each chunk to put
else:
- task.add_scheduled_items(chunksize)
queue.put((task.process, chunksize))
# END try efficient looping
if remainder:
- task.add_scheduled_items(remainder)
queue.put((task.process, remainder))
# END handle chunksize
else:
# no workers, so we have to do the work ourselves
if numchunks > 1:
for i in xrange(numchunks):
- task.add_scheduled_items(chunksize)
task.process(chunksize)
# END for each chunk to put
else:
- task.add_scheduled_items(chunksize)
task.process(chunksize)
# END try efficient looping
if remainder:
- task.add_scheduled_items(remainder)
task.process(remainder)
# END handle chunksize
# END handle serial mode
@@ -452,7 +432,6 @@ class Pool(object):
# This brings about 15% more performance, but sacrifices thread-safety
# when reading from multiple threads.
if self.size() == 0:
- task._slock = DummyLock()
wctype = SerialWChannel
# END improve locks
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 539b240f..be02cfe8 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -23,8 +23,6 @@ class OutputChannelTask(Node):
'_out_wc', # output write channel
'_exc', # exception caught
'_done', # True if we are done
- '_scheduled_items', # amount of scheduled items that will be processed in total
- '_slock', # lock for scheduled items
'fun', # function to call with items read
'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
@@ -37,8 +35,6 @@ class OutputChannelTask(Node):
self._out_wc = None # to be set later
self._exc = None
self._done = False
- self._scheduled_items = 0
- self._slock = threading.Lock()
self.fun = fun
self.min_count = None
self.max_chunksize = 0 # note set
@@ -72,21 +68,6 @@ class OutputChannelTask(Node):
""":return: Exception caught during last processing or None"""
return self._exc
- def add_scheduled_items(self, count):
- """Add the given amount of scheduled items to this task"""
- self._slock.acquire()
- self._scheduled_items += count
- self._slock.release()
-
- def scheduled_item_count(self):
- """:return: amount of scheduled items for this task"""
- self._slock.acquire()
- try:
- return self._scheduled_items
- finally:
- self._slock.release()
- # END threadsafe return
-
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
items = self._read(count)
@@ -101,19 +82,12 @@ class OutputChannelTask(Node):
if self.apply_single:
for item in items:
rval = self.fun(item)
- # decrement afterwards, the its unscheduled once its produced
- self._slock.acquire()
- self._scheduled_items -= 1
- self._slock.release()
wc.write(rval)
# END for each item
else:
# shouldn't apply single be the default anyway ?
# The task designers should chunk them up in advance
rvals = self.fun(items)
- self._slock.acquire()
- self._scheduled_items -= len(items)
- self._slock.release()
for rval in rvals:
wc.write(rval)
# END handle single apply
@@ -122,13 +96,6 @@ class OutputChannelTask(Node):
# be sure our task is not scheduled again
self.set_done()
- # unschedule all, we don't know how many have been produced actually
- # but only if we don't apply single please
- if not self.apply_single:
- self._slock.acquire()
- self._scheduled_items -= len(items)
- self._slock.release()
- # END unschedule all
# PROBLEM: We have failed to create at least one item, hence its not
# garantueed that enough items will be produced for a possibly blocking