diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/graph.py | 7 | ||||
-rw-r--r-- | lib/git/async/pool.py | 66 | ||||
-rw-r--r-- | lib/git/async/task.py | 2 |
3 files changed, 58 insertions, 17 deletions
diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index d817eeb4..6386cbaa 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -35,12 +35,17 @@ class Graph(object): def del_node(self, node): """Delete a node from the graph :return: self""" + try: + del(self.nodes[self.nodes.index(node)]) + except ValueError: + return self + # END ignore if it doesn't exist + # clear connections for outn in node.out_nodes: del(outn.in_nodes[outn.in_nodes.index(node)]) for inn in node.in_nodes: del(inn.out_nodes[inn.out_nodes.index(node)]) - del(self.nodes[self.nodes.index(node)]) return self def add_edge(self, u, v): diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 2efc862b..620e2258 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -117,36 +117,72 @@ class ThreadPool(object): queue all others for processing by our worker threads ( if available ).""" if task.error() or task.is_done(): self._consumed_tasks.append(task) + return True + # END stop processing # allow min-count override. This makes sure we take at least min-count # items off the input queue ( later ) - if task.min_count is not None: + if task.min_count is not None and count != 0 and count < task.min_count: count = task.min_count # END handle min-count # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. - if self._workers: - if count < 1 or task._out_wc.size() < count: + if count < 1 or task._out_wc.size() < count: + numchunks = 1 + chunksize = count + remainder = 0 + + # we need the count set for this - can't chunk up unlimited items + # In serial mode we could do this by checking for empty input channels, + # but in dispatch mode its impossible ( == not easily possible ) + # Only try it if we have enough demand + if task.max_chunksize and count > task.max_chunksize: + numchunks = count / task.max_chunksize + chunksize = task.max_chunksize + remainder = count - (numchunks * chunksize) + # END handle chunking + + print count, numchunks, chunksize, remainder + # the following loops are kind of unrolled - code duplication + # should make things execute faster. Putting the if statements + # into the loop would be less code, but ... slower + if self._workers: # respect the chunk size, and split the task up if we want # to process too much. This can be defined per task queue = self._queue - if task.max_chunksize: - chunksize = count / task.max_chunksize - remainder = count - (chunksize * task.max_chunksize) - for i in xrange(chunksize): + if numchunks > 1: + for i in xrange(numchunks): queue.put((task.process, chunksize)) - if remainder: - queue.put((task.process, remainder)) + # END for each chunk to put + else: + queue.put((task.process, chunksize)) + # END try efficient looping + + if remainder: + queue.put((task.process, remainder)) + # END handle chunksize + else: + # no workers, so we have to do the work ourselves + if numchunks > 1: + for i in xrange(numchunks): + task.process(chunksize) + # END for each chunk to put else: - self._queue.put((task.process, count)) + task.process(chunksize) + # END try efficient looping + + if remainder: + task.process(remainder) # END handle chunksize - # END handle queuing - else: - # no workers, so we have to do the work ourselves - task.process(count) - # END handle serial mode + + # as we are serial, we can check for consumption right away + if task.error() or task.is_done(): + self._consumed_tasks.append(task) + # END handle consumption + # END handle serial mode + # END handle queuing # always walk the whole graph, we want to find consumed tasks return True diff --git a/lib/git/async/task.py b/lib/git/async/task.py index d2422773..ec650237 100644 --- a/lib/git/async/task.py +++ b/lib/git/async/task.py @@ -58,7 +58,7 @@ class OutputChannelTask(Node): # if we didn't get all demanded items, which is also the case if count is 0 # we have depleted the input channel and are done - if len(items) != count: + if not items or len(items) != count: self.set_done() # END handle done state #{ Configuration |