diff options
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 15 |
1 files changed, 13 insertions, 2 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 19fc9f6e..4c97feb0 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -80,12 +80,13 @@ class RPoolChannel(RChannel): # * have_enough = False if count > 0: - have_enough = self._wc._queue.qsize() >= count - # END risky game + have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count + # END ########## prepare ############################## if not have_enough: self._pool._prepare_channel_read(self._task, count) + # END prepare pool scheduling ####### read data ######## @@ -260,26 +261,33 @@ class Pool(object): queue = self._queue if numchunks > 1: for i in xrange(numchunks): + # schedule them as early as we know about them + task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END for each chunk to put else: + task.add_scheduled_items(chunksize) queue.put((task.process, chunksize)) # END try efficient looping if remainder: + task.add_scheduled_items(remainder) queue.put((task.process, remainder)) # END handle chunksize else: # no workers, so we have to do the work ourselves if numchunks > 1: for i in xrange(numchunks): + task.add_scheduled_items(chunksize) task.process(chunksize) # END for each chunk to put else: + task.add_scheduled_items(chunksize) task.process(chunksize) # END try efficient looping if remainder: + task.add_scheduled_items(remainder) task.process(remainder) # END handle chunksize # END handle serial mode @@ -348,6 +356,9 @@ class Pool(object): self._workers.append(worker) # END for each new worker to create elif cur_count > size: + # we can safely increase the size, even from serial mode, as we would + # only be able to do this if the serial ( sync ) mode finished processing. + # Just adding more workers is not a problem at all. del_count = cur_count - size for i in range(del_count): self._workers[i].stop_and_join() |