summaryrefslogtreecommitdiff
path: root/lib/git/async/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r--lib/git/async/pool.py66
1 files changed, 51 insertions, 15 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 2efc862b..620e2258 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -117,36 +117,72 @@ class ThreadPool(object):
queue all others for processing by our worker threads ( if available )."""
if task.error() or task.is_done():
self._consumed_tasks.append(task)
+ return True
+ # END stop processing
# allow min-count override. This makes sure we take at least min-count
# items off the input queue ( later )
- if task.min_count is not None:
+ if task.min_count is not None and count != 0 and count < task.min_count:
count = task.min_count
# END handle min-count
# if the task does not have the required output on its queue, schedule
# it for processing. If we should process all, we don't care about the
# amount as it should process until its all done.
- if self._workers:
- if count < 1 or task._out_wc.size() < count:
+ if count < 1 or task._out_wc.size() < count:
+ numchunks = 1
+ chunksize = count
+ remainder = 0
+
+ # we need the count set for this - can't chunk up unlimited items
+ # In serial mode we could do this by checking for empty input channels,
+ # but in dispatch mode its impossible ( == not easily possible )
+ # Only try it if we have enough demand
+ if task.max_chunksize and count > task.max_chunksize:
+ numchunks = count / task.max_chunksize
+ chunksize = task.max_chunksize
+ remainder = count - (numchunks * chunksize)
+ # END handle chunking
+
+ print count, numchunks, chunksize, remainder
+ # the following loops are kind of unrolled - code duplication
+ # should make things execute faster. Putting the if statements
+ # into the loop would be less code, but ... slower
+ if self._workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
queue = self._queue
- if task.max_chunksize:
- chunksize = count / task.max_chunksize
- remainder = count - (chunksize * task.max_chunksize)
- for i in xrange(chunksize):
+ if numchunks > 1:
+ for i in xrange(numchunks):
queue.put((task.process, chunksize))
- if remainder:
- queue.put((task.process, remainder))
+ # END for each chunk to put
+ else:
+ queue.put((task.process, chunksize))
+ # END try efficient looping
+
+ if remainder:
+ queue.put((task.process, remainder))
+ # END handle chunksize
+ else:
+ # no workers, so we have to do the work ourselves
+ if numchunks > 1:
+ for i in xrange(numchunks):
+ task.process(chunksize)
+ # END for each chunk to put
else:
- self._queue.put((task.process, count))
+ task.process(chunksize)
+ # END try efficient looping
+
+ if remainder:
+ task.process(remainder)
# END handle chunksize
- # END handle queuing
- else:
- # no workers, so we have to do the work ourselves
- task.process(count)
- # END handle serial mode
+
+ # as we are serial, we can check for consumption right away
+ if task.error() or task.is_done():
+ self._consumed_tasks.append(task)
+ # END handle consumption
+ # END handle serial mode
+ # END handle queuing
# always walk the whole graph, we want to find consumed tasks
return True