diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 23:20:37 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-07 23:28:01 +0200 |
commit | def0f73989047c4ddf9b11da05ad2c9c8e387331 (patch) | |
tree | 0b5afab413885df75a31d36d430a5a5323aaeaa9 /lib/git/async/pool.py | |
parent | be06e87433685b5ea9cfcc131ab89c56cf8292f2 (diff) | |
download | gitpython-def0f73989047c4ddf9b11da05ad2c9c8e387331.tar.gz |
introduced a new counter keeping track of the scheduled tasks - this prevent unnecessary tasks to be scheduled as we keep track of how many items will be produced for the task at hand. This introduces additional locking, but performns well in multithreaded mode. Performance of the master queue is still a huge issue, its currently the limiting factor, as bypassing the master queue in serial moode gives 15x performance, wich is what I would need
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 15 |
1 files changed, 13 insertions, 2 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 19fc9f6e..4c97feb0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -80,12 +80,13 @@ class RPoolChannel(RChannel): # * have_enough = False if count > 0: - have_enough = self._wc._queue.qsize() >= count - # END risky game + have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + # END ########## prepare ############################## if not have_enough: self._pool._prepare_channel_read(self._task, count) + # END prepare pool scheduling ####### read data ######## @@ -260,26 +261,33 @@ class Pool(object): queue = self._queue if numchunks > 1: for i in xrange(numchunks): + # schedule them as early as we know about them + task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END for each chunk to put else: + task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END try efficient looping if remainder: + task.add_scheduled_items(remainder) queue.put((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves if numchunks > 1: for i in xrange(numchunks): + task.add_scheduled_items(chunksize) task.process(chunksize) # END for each chunk to put else: + task.add_scheduled_items(chunksize) task.process(chunksize) # END try efficient looping if remainder: + task.add_scheduled_items(remainder) task.process(remainder) # END handle chunksize # END handle serial mode @@ -348,6 +356,9 @@ class Pool(object): self._workers.append(worker) # END for each new worker to create elif cur_count > size: + # we can safely increase the size, even from serial mode, as we would + # only be able to do this if the serial ( sync ) mode finished processing. + # Just adding more workers is not a problem at all. del_count = cur_count - size for i in range(del_count): self._workers[i].stop_and_join() |