summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/graph.py7
-rw-r--r--lib/git/async/pool.py66
-rw-r--r--lib/git/async/task.py2
3 files changed, 58 insertions, 17 deletions
diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py
index d817eeb4..6386cbaa 100644
--- a/lib/git/async/graph.py
+++ b/lib/git/async/graph.py
@@ -35,12 +35,17 @@ class Graph(object):
def del_node(self, node):
"""Delete a node from the graph
:return: self"""
+ try:
+ del(self.nodes[self.nodes.index(node)])
+ except ValueError:
+ return self
+ # END ignore if it doesn't exist
+
# clear connections
for outn in node.out_nodes:
del(outn.in_nodes[outn.in_nodes.index(node)])
for inn in node.in_nodes:
del(inn.out_nodes[inn.out_nodes.index(node)])
- del(self.nodes[self.nodes.index(node)])
return self
def add_edge(self, u, v):
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 2efc862b..620e2258 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -117,36 +117,72 @@ class ThreadPool(object):
queue all others for processing by our worker threads ( if available )."""
if task.error() or task.is_done():
self._consumed_tasks.append(task)
+ return True
+ # END stop processing
# allow min-count override. This makes sure we take at least min-count
# items off the input queue ( later )
- if task.min_count is not None:
+ if task.min_count is not None and count != 0 and count < task.min_count:
count = task.min_count
# END handle min-count
# if the task does not have the required output on its queue, schedule
# it for processing. If we should process all, we don't care about the
# amount as it should process until its all done.
- if self._workers:
- if count < 1 or task._out_wc.size() < count:
+ if count < 1 or task._out_wc.size() < count:
+ numchunks = 1
+ chunksize = count
+ remainder = 0
+
+ # we need the count set for this - can't chunk up unlimited items
+ # In serial mode we could do this by checking for empty input channels,
+ # but in dispatch mode its impossible ( == not easily possible )
+ # Only try it if we have enough demand
+ if task.max_chunksize and count > task.max_chunksize:
+ numchunks = count / task.max_chunksize
+ chunksize = task.max_chunksize
+ remainder = count - (numchunks * chunksize)
+ # END handle chunking
+
+ print count, numchunks, chunksize, remainder
+ # the following loops are kind of unrolled - code duplication
+ # should make things execute faster. Putting the if statements
+ # into the loop would be less code, but ... slower
+ if self._workers:
# respect the chunk size, and split the task up if we want
# to process too much. This can be defined per task
queue = self._queue
- if task.max_chunksize:
- chunksize = count / task.max_chunksize
- remainder = count - (chunksize * task.max_chunksize)
- for i in xrange(chunksize):
+ if numchunks > 1:
+ for i in xrange(numchunks):
queue.put((task.process, chunksize))
- if remainder:
- queue.put((task.process, remainder))
+ # END for each chunk to put
+ else:
+ queue.put((task.process, chunksize))
+ # END try efficient looping
+
+ if remainder:
+ queue.put((task.process, remainder))
+ # END handle chunksize
+ else:
+ # no workers, so we have to do the work ourselves
+ if numchunks > 1:
+ for i in xrange(numchunks):
+ task.process(chunksize)
+ # END for each chunk to put
else:
- self._queue.put((task.process, count))
+ task.process(chunksize)
+ # END try efficient looping
+
+ if remainder:
+ task.process(remainder)
# END handle chunksize
- # END handle queuing
- else:
- # no workers, so we have to do the work ourselves
- task.process(count)
- # END handle serial mode
+
+ # as we are serial, we can check for consumption right away
+ if task.error() or task.is_done():
+ self._consumed_tasks.append(task)
+ # END handle consumption
+ # END handle serial mode
+ # END handle queuing
# always walk the whole graph, we want to find consumed tasks
return True
diff --git a/lib/git/async/task.py b/lib/git/async/task.py
index d2422773..ec650237 100644
--- a/lib/git/async/task.py
+++ b/lib/git/async/task.py
@@ -58,7 +58,7 @@ class OutputChannelTask(Node):
# if we didn't get all demanded items, which is also the case if count is 0
# we have depleted the input channel and are done
- if len(items) != count:
+ if not items or len(items) != count:
self.set_done()
# END handle done state
#{ Configuration