diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 23:20:37 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 23:28:01 +0200 |
commit | def0f73989047c4ddf9b11da05ad2c9c8e387331 (patch) | |
tree | 0b5afab413885df75a31d36d430a5a5323aaeaa9 /lib/git/async | |
parent | be06e87433685b5ea9cfcc131ab89c56cf8292f2 (diff) | |
download | gitpython-def0f73989047c4ddf9b11da05ad2c9c8e387331.tar.gz |
introduced a new counter keeping track of the scheduled tasks - this prevent unnecessary tasks to be scheduled as we keep track of how many items will be produced for the task at hand. This introduces additional locking, but performns well in multithreaded mode. Performance of the master queue is still a huge issue, its currently the limiting factor, as bypassing the master queue in serial moode gives 15x performance, wich is what I would need
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/pool.py | 15 | ||||
-rw-r--r-- | lib/git/async/task.py | 47 | ||||
-rw-r--r-- | lib/git/async/util.py | 2 |
3 files changed, 59 insertions, 5 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 19fc9f6e..4c97feb0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -80,12 +80,13 @@ class RPoolChannel(RChannel): # * have_enough = False if count > 0: - have_enough = self._wc._queue.qsize() >= count - # END risky game + have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + # END ########## prepare ############################## if not have_enough: self._pool._prepare_channel_read(self._task, count) + # END prepare pool scheduling ####### read data ######## @@ -260,26 +261,33 @@ class Pool(object): queue = self._queue if numchunks > 1: for i in xrange(numchunks): + # schedule them as early as we know about them + task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END for each chunk to put else: + task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END try efficient looping if remainder: + task.add_scheduled_items(remainder) queue.put((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves if numchunks > 1: for i in xrange(numchunks): + task.add_scheduled_items(chunksize) task.process(chunksize) # END for each chunk to put else: + task.add_scheduled_items(chunksize) task.process(chunksize) # END try efficient looping if remainder: + task.add_scheduled_items(remainder) task.process(remainder) # END handle chunksize # END handle serial mode @@ -348,6 +356,9 @@ class Pool(object): self._workers.append(worker) # END for each new worker to create elif cur_count > size: + # we can safely increase the size, even from serial mode, as we would + # only be able to do this if the serial ( sync ) mode finished processing. + # Just adding more workers is not a problem at all. del_count = cur_count - size for i in range(del_count): self._workers[i].stop_and_join() diff --git a/lib/git/async/task.py b/lib/git/async/task.py index 4e8aef54..cf486f48 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -21,6 +21,8 @@ class OutputChannelTask(Node): '_out_wc', # output write channel '_exc', # exception caught '_done', # True if we are done + '_scheduled_items', # amount of scheduled items that will be processed in total + '_slock', # lock for scheduled items 'fun', # function to call with items read 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call @@ -33,6 +35,8 @@ class OutputChannelTask(Node): self._out_wc = None # to be set later self._exc = None self._done = False + self._scheduled_items = 0 + self._slock = threading.Lock() self.fun = fun self.min_count = None self.max_chunksize = 0 # note set @@ -50,6 +54,7 @@ class OutputChannelTask(Node): """Set the write channel to the given one :note: resets it done state in order to allow proper queue handling""" self._done = False + self._scheduled_items = 0 self._out_wc = wc def close(self): @@ -65,6 +70,21 @@ class OutputChannelTask(Node): """:return: Exception caught during last processing or None""" return self._exc + def add_scheduled_items(self, count): + """Add the given amount of scheduled items to this task""" + self._slock.acquire() + self._scheduled_items += count + self._slock.release() + + def scheduled_item_count(self): + """:return: amount of scheduled items for this task""" + self._slock.acquire() + try: + return self._scheduled_items + finally: + self._slock.release() + # END threadsafe return + def process(self, count=0): """Process count items and send the result individually to the output channel""" items = self._read(count) @@ -78,14 +98,33 @@ class OutputChannelTask(Node): wc = self._out_wc if self.apply_single: for item in items: - wc.write(self.fun(item)) + rval = self.fun(item) + # decrement afterwards, the its unscheduled once its produced + self._slock.acquire() + self._scheduled_items -= 1 + self._slock.release() + wc.write(rval) # END for each item else: - wc.write(self.fun(items)) + # shouldn't apply single be the default anyway ? + # The task designers should chunk them up in advance + rvals = self.fun(items) + self._slock.acquire() + self._scheduled_items -= len(items) + self._slock.release() + for rval in rvals: + wc.write(rval) # END handle single apply except Exception, e: self._exc = e self.set_done() + # unschedule all, we don't know how many have been produced actually + # but only if we don't apply single please + if not self.apply_single: + self._slock.acquire() + self._scheduled_items -= len(items) + self._slock.release() + # END unschedule all # END exception handling del(wc) @@ -189,6 +228,10 @@ class InputChannelTask(OutputChannelTask): # for each task, which would allow to precisely determine whether # the pool as to be triggered, and bail out early. Problem would # be the + # * Perhaps one shouldn't seek the perfect solution , but instead + # document whats working and what not, or under which conditions. + # The whole system is simple, but gets more complicated the + # smarter it wants to be. if isinstance(self._in_rc, RPoolChannel) and self._in_rc._pool is self._pool_ref(): self._read = self._in_rc._read diff --git a/lib/git/async/util.py b/lib/git/async/util.py index 85d44694..6bd8a4e8 100644 --- a/lib/git/async/util.py +++ b/lib/git/async/util.py @@ -59,7 +59,7 @@ class SyncQueue(deque): class HSCondition(_Condition): """An attempt to make conditions less blocking, which gains performance in return by sleeping less""" - delay = 0.00005 # reduces wait times, but increases overhead + delay = 0.0001 # reduces wait times, but increases overhead def wait(self, timeout=None): waiter = Lock() |