summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-07 23:20:37 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-07 23:28:01 +0200
commitdef0f73989047c4ddf9b11da05ad2c9c8e387331 (patch)
tree0b5afab413885df75a31d36d430a5a5323aaeaa9 /lib/git/async
parentbe06e87433685b5ea9cfcc131ab89c56cf8292f2 (diff)
downloadgitpython-def0f73989047c4ddf9b11da05ad2c9c8e387331.tar.gz
introduced a new counter keeping track of the scheduled tasks - this prevent unnecessary tasks to be scheduled as we keep track of how many items will be produced for the task at hand. This introduces additional locking, but performns well in multithreaded mode. Performance of the master queue is still a huge issue, its currently the limiting factor, as bypassing the master queue in serial moode gives 15x performance, wich is what I would need
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/pool.py15
-rw-r--r--lib/git/async/task.py47
-rw-r--r--lib/git/async/util.py2
3 files changed, 59 insertions, 5 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 19fc9f6e..4c97feb0 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -80,12 +80,13 @@ class RPoolChannel(RChannel):
# *
have_enough = False
if count > 0:
- have_enough = self._wc._queue.qsize() >= count
- # END risky game
+ have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
+ # END
########## prepare ##############################
if not have_enough:
self._pool._prepare_channel_read(self._task, count)
+ # END prepare pool scheduling
####### read data ########
@@ -260,26 +261,33 @@ class Pool(object):
queue = self._queue
if numchunks > 1:
for i in xrange(numchunks):
+ # schedule them as early as we know about them
+ task.add_scheduled_items(chunksize)
queue.put((task.process, chunksize))
# END for each chunk to put
else:
+ task.add_scheduled_items(chunksize)
queue.put((task.process, chunksize))
# END try efficient looping
if remainder:
+ task.add_scheduled_items(remainder)
queue.put((task.process, remainder))
# END handle chunksize
else:
# no workers, so we have to do the work ourselves
if numchunks > 1:
for i in xrange(numchunks):
+ task.add_scheduled_items(chunksize)
task.process(chunksize)
# END for each chunk to put
else:
+ task.add_scheduled_items(chunksize)
task.process(chunksize)
# END try efficient looping
if remainder:
+ task.add_scheduled_items(remainder)
task.process(remainder)
# END handle chunksize
# END handle serial mode
@@ -348,6 +356,9 @@ class Pool(object):
self._workers.append(worker)
# END for each new worker to create
elif cur_count > size:
+ # we can safely increase the size, even from serial mode, as we would
+ # only be able to do this if the serial ( sync ) mode finished processing.
+ # Just adding more workers is not a problem at all.
del_count = cur_count - size
for i in range(del_count):
self._workers[i].stop_and_join()
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index 4e8aef54..cf486f48 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -21,6 +21,8 @@ class OutputChannelTask(Node):
'_out_wc', # output write channel
'_exc', # exception caught
'_done', # True if we are done
+ '_scheduled_items', # amount of scheduled items that will be processed in total
+ '_slock', # lock for scheduled items
'fun', # function to call with items read
'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
@@ -33,6 +35,8 @@ class OutputChannelTask(Node):
self._out_wc = None # to be set later
self._exc = None
self._done = False
+ self._scheduled_items = 0
+ self._slock = threading.Lock()
self.fun = fun
self.min_count = None
self.max_chunksize = 0 # note set
@@ -50,6 +54,7 @@ class OutputChannelTask(Node):
"""Set the write channel to the given one
:note: resets it done state in order to allow proper queue handling"""
self._done = False
+ self._scheduled_items = 0
self._out_wc = wc
def close(self):
@@ -65,6 +70,21 @@ class OutputChannelTask(Node):
""":return: Exception caught during last processing or None"""
return self._exc
+ def add_scheduled_items(self, count):
+ """Add the given amount of scheduled items to this task"""
+ self._slock.acquire()
+ self._scheduled_items += count
+ self._slock.release()
+
+ def scheduled_item_count(self):
+ """:return: amount of scheduled items for this task"""
+ self._slock.acquire()
+ try:
+ return self._scheduled_items
+ finally:
+ self._slock.release()
+ # END threadsafe return
+
def process(self, count=0):
"""Process count items and send the result individually to the output channel"""
items = self._read(count)
@@ -78,14 +98,33 @@ class OutputChannelTask(Node):
wc = self._out_wc
if self.apply_single:
for item in items:
- wc.write(self.fun(item))
+ rval = self.fun(item)
+ # decrement afterwards, the its unscheduled once its produced
+ self._slock.acquire()
+ self._scheduled_items -= 1
+ self._slock.release()
+ wc.write(rval)
# END for each item
else:
- wc.write(self.fun(items))
+ # shouldn't apply single be the default anyway ?
+ # The task designers should chunk them up in advance
+ rvals = self.fun(items)
+ self._slock.acquire()
+ self._scheduled_items -= len(items)
+ self._slock.release()
+ for rval in rvals:
+ wc.write(rval)
# END handle single apply
except Exception, e:
self._exc = e
self.set_done()
+ # unschedule all, we don't know how many have been produced actually
+ # but only if we don't apply single please
+ if not self.apply_single:
+ self._slock.acquire()
+ self._scheduled_items -= len(items)
+ self._slock.release()
+ # END unschedule all
# END exception handling
del(wc)
@@ -189,6 +228,10 @@ class InputChannelTask(OutputChannelTask):
# for each task, which would allow to precisely determine whether
# the pool as to be triggered, and bail out early. Problem would
# be the
+ # * Perhaps one shouldn't seek the perfect solution , but instead
+ # document whats working and what not, or under which conditions.
+ # The whole system is simple, but gets more complicated the
+ # smarter it wants to be.
if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref():
self._read = self._in_rc._read
diff --git a/lib/git/async/util.py b/lib/git/async/util.py
index 85d44694..6bd8a4e8 100644
--- a/lib/git/async/util.py
+++ b/lib/git/async/util.py
@@ -59,7 +59,7 @@ class SyncQueue(deque):
class HSCondition(_Condition):
"""An attempt to make conditions less blocking, which gains performance
in return by sleeping less"""
- delay = 0.00005 # reduces wait times, but increases overhead
+ delay = 0.0001 # reduces wait times, but increases overhead
def wait(self, timeout=None):
waiter = Lock()