diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 21:15:13 +0200 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2010-06-06 21:31:51 +0200 |
commit | 1b27292936c81637f6b9a7141dafaad1126f268e (patch) | |
tree | f629d098429099934a35798312e6e0660df0d677 /lib/git/async/pool.py | |
parent | b3cde0ee162b8f0cb67da981311c8f9c16050a62 (diff) | |
download | gitpython-1b27292936c81637f6b9a7141dafaad1126f268e.tar.gz |
Plenty of fixes in the chunking routine, made possible by a serialized chunking test. Next up, actual async processing
Diffstat (limited to 'lib/git/async/pool.py')
-rw-r--r-- | lib/git/async/pool.py | 66 |
1 files changed, 51 insertions, 15 deletions
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 2efc862b..620e2258 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -117,36 +117,72 @@ class ThreadPool(object): queue all others for processing by our worker threads ( if available ).""" if task.error() or task.is_done(): self._consumed_tasks.append(task) + return True + # END stop processing # allow min-count override. This makes sure we take at least min-count # items off the input queue ( later ) - if task.min_count is not None: + if task.min_count is not None and count != 0 and count < task.min_count: count = task.min_count # END handle min-count # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. - if self._workers: - if count < 1 or task._out_wc.size() < count: + if count < 1 or task._out_wc.size() < count: + numchunks = 1 + chunksize = count + remainder = 0 + + # we need the count set for this - can't chunk up unlimited items + # In serial mode we could do this by checking for empty input channels, + # but in dispatch mode its impossible ( == not easily possible ) + # Only try it if we have enough demand + if task.max_chunksize and count > task.max_chunksize: + numchunks = count / task.max_chunksize + chunksize = task.max_chunksize + remainder = count - (numchunks * chunksize) + # END handle chunking + + print count, numchunks, chunksize, remainder + # the following loops are kind of unrolled - code duplication + # should make things execute faster. Putting the if statements + # into the loop would be less code, but ... slower + if self._workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task queue = self._queue - if task.max_chunksize: - chunksize = count / task.max_chunksize - remainder = count - (chunksize * task.max_chunksize) - for i in xrange(chunksize): + if numchunks > 1: + for i in xrange(numchunks): queue.put((task.process, chunksize)) - if remainder: - queue.put((task.process, remainder)) + # END for each chunk to put + else: + queue.put((task.process, chunksize)) + # END try efficient looping + + if remainder: + queue.put((task.process, remainder)) + # END handle chunksize + else: + # no workers, so we have to do the work ourselves + if numchunks > 1: + for i in xrange(numchunks): + task.process(chunksize) + # END for each chunk to put else: - self._queue.put((task.process, count)) + task.process(chunksize) + # END try efficient looping + + if remainder: + task.process(remainder) # END handle chunksize - # END handle queuing - else: - # no workers, so we have to do the work ourselves - task.process(count) - # END handle serial mode + + # as we are serial, we can check for consumption right away + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + # END handle consumption + # END handle serial mode + # END handle queuing # always walk the whole graph, we want to find consumed tasks return True |