summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py21
1 files changed, 19 insertions, 2 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 19fc9f6e..d6b5711d 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -5,6 +5,7 @@ from threading import Lock
from util import (
SyncQueue,
AsyncQueue,
+ DummyLock
)
from task import InputChannelTask
@@ -80,12 +81,13 @@ class RPoolChannel(RChannel):
# *
have_enough = False
if count > 0:
- have_enough = self._wc._queue.qsize() >= count
- # END risky game
+ have_enough = self._task.scheduled_item_count() >= count or self._wc._queue.qsize() >= count
+ # END
########## prepare ##############################
if not have_enough:
self._pool._prepare_channel_read(self._task, count)
+ # END prepare pool scheduling
####### read data ########
@@ -260,26 +262,33 @@ class Pool(object):
queue = self._queue
if numchunks > 1:
for i in xrange(numchunks):
+ # schedule them as early as we know about them
+ task.add_scheduled_items(chunksize)
queue.put((task.process, chunksize))
# END for each chunk to put
else:
+ task.add_scheduled_items(chunksize)
queue.put((task.process, chunksize))
# END try efficient looping
if remainder:
+ task.add_scheduled_items(remainder)
queue.put((task.process, remainder))
# END handle chunksize
else:
# no workers, so we have to do the work ourselves
if numchunks > 1:
for i in xrange(numchunks):
+ task.add_scheduled_items(chunksize)
task.process(chunksize)
# END for each chunk to put
else:
+ task.add_scheduled_items(chunksize)
task.process(chunksize)
# END try efficient looping
if remainder:
+ task.add_scheduled_items(remainder)
task.process(remainder)
# END handle chunksize
# END handle serial mode
@@ -348,6 +357,9 @@ class Pool(object):
self._workers.append(worker)
# END for each new worker to create
elif cur_count > size:
+ # we can safely increase the size, even from serial mode, as we would
+ # only be able to do this if the serial ( sync ) mode finished processing.
+ # Just adding more workers is not a problem at all.
del_count = cur_count - size
for i in range(del_count):
self._workers[i].stop_and_join()
@@ -451,6 +463,11 @@ class Pool(object):
# END add task relation
# END handle input channels for connections
+ # fix locks - in serial mode, the task does not need real locks
+ if self.size() == 0:
+ task._slock = DummyLock()
+ # END improve locks
+
return rc
#} END interface