summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/git/async/graph.py84
-rw-r--r--lib/git/async/pool.py53
-rw-r--r--lib/git/async/thread.py27
-rw-r--r--test/git/async/test_graph.py82
-rw-r--r--test/git/async/test_thread.py4
5 files changed, 209 insertions, 41 deletions
diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py
index 0c0a2137..b4d6aa00 100644
--- a/lib/git/async/graph.py
+++ b/lib/git/async/graph.py
@@ -6,31 +6,95 @@ class Node(object):
we need"""
__slots__ = ('in_nodes', 'out_nodes')
+ def __init__(self):
+ self.in_nodes = list()
+ self.out_nodes = list()
+
class Graph(object):
"""A simple graph implementation, keeping nodes and providing basic access and
- editing functions"""
+ editing functions. The performance is only suitable for small graphs of not
+ more than 10 nodes !"""
__slots__ = "nodes"
def __init__(self):
self.nodes = list()
def add_node(self, node):
- """Add a new node to the graph"""
- raise NotImplementedError()
+ """Add a new node to the graph
+ :return: the newly added node"""
+ self.nodes.append(node)
+ return node
def del_node(self, node):
- """Delete a node from the graph"""
- raise NotImplementedError()
+ """Delete a node from the graph
+ :return: self"""
+ # clear connections
+ for outn in node.out_nodes:
+ del(outn.in_nodes[outn.in_nodes.index(node)])
+ for inn in node.in_nodes:
+ del(inn.out_nodes[inn.out_nodes.index(node)])
+ del(self.nodes[self.nodes.index(node)])
+ return self
def add_edge(self, u, v):
"""Add an undirected edge between the given nodes u and v.
+
+ return: self
:raise ValueError: If the new edge would create a cycle"""
- raise NotImplementedError()
+ if u is v:
+ raise ValueError("Cannot connect a node with itself")
+
+ # are they already connected ?
+ if u in v.in_nodes and v in u.out_nodes or \
+ v in u.in_nodes and u in v.out_nodes:
+ return self
+ # END handle connection exists
+
+ # cycle check - if we can reach any of the two by following either ones
+ # history, its a cycle
+ for start, end in ((u, v), (v,u)):
+ if not start.in_nodes:
+ continue
+ nodes = start.in_nodes[:]
+ seen = set()
+ # depth first search - its faster
+ while nodes:
+ n = nodes.pop()
+ if n in seen:
+ continue
+ seen.add(n)
+ if n is end:
+ raise ValueError("Connecting u with v would create a cycle")
+ nodes.extend(n.in_nodes)
+ # END while we are searching
+ # END for each direction to look
+
+ # connection is valid, set it up
+ u.out_nodes.append(v)
+ v.in_nodes.append(u)
+
+ return self
- def visit_input_depth_first(self, node, visitor=lambda n: True ):
+ def visit_input_inclusive_depth_first(self, node, visitor=lambda n: True ):
"""Visit all input nodes of the given node, depth first, calling visitor
for each node on our way. If the function returns False, the traversal
- will not go any deeper, but continue at the next branch"""
- raise NotImplementedError()
-
+ will not go any deeper, but continue at the next branch
+ It will return the actual input node in the end !"""
+ nodes = node.in_nodes[:]
+ seen = set()
+
+ # depth first
+ while nodes:
+ n = nodes.pop()
+ if n in seen:
+ continue
+ seen.add(n)
+
+ # only proceed in that direction if visitor is fine with it
+ if visitor(n):
+ nodes.extend(n.in_nodes)
+ # END call visitor
+ # END while walking
+ visitor(node)
+
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 7798d3d4..9a24cbc5 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -25,7 +25,8 @@ class TaskNode(Node):
'_out_wc', # output write channel
'_pool_ref', # ref to our pool
'_exc', # exception caught
- 'fun', # function to call with items read from in_rc
+ 'fun', # function to call with items read from in_rc
+ 'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
'apply_single' # apply single items even if multiple where read
)
@@ -36,6 +37,7 @@ class TaskNode(Node):
self._pool_ref = None
self._exc = None
self.fun = fun
+ self.min_count = None
self.max_chunksize = 0 # note set
self.apply_single = apply_single
@@ -174,6 +176,12 @@ class ThreadPool(object):
if task.error() or task.is_done():
self._consumed_tasks.append(task)
+ # allow min-count override. This makes sure we take at least min-count
+ # items off the input queue ( later )
+ if task.min_count is not None:
+ count = task.min_count
+ # END handle min-count
+
# if the task does not have the required output on its queue, schedule
# it for processing. If we should process all, we don't care about the
# amount as it should process until its all done.
@@ -213,7 +221,7 @@ class ThreadPool(object):
Tasks which are not done will be put onto the queue for processing, which
is fine as we walked them depth-first."""
- self._tasks.visit_input_depth_first(task, lambda n: self._queue_feeder_visitor(n, count))
+ self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count))
# delete consumed tasks to cleanup
for task in self._consumed_tasks:
@@ -233,7 +241,9 @@ class ThreadPool(object):
"""Delete the task
Additionally we will remove orphaned tasks, which can be identified if their
output channel is only held by themselves, so no one will ever consume
- its items."""
+ its items.
+
+ :return: self"""
# now delete our actual node - must set it done os it closes its channels.
# Otherwise further reads of output tasks will block.
# Actually they may still block if anyone wants to read all ... without
@@ -246,12 +256,45 @@ class ThreadPool(object):
for t in in_tasks
self._del_task_if_orphaned(t)
# END handle orphans recursively
+
+ return self
def set_pool_size(self, size=0):
- """Set the amount of workers to use in this pool.
+ """Set the amount of workers to use in this pool. When reducing the size,
+ the call may block as it waits for threads to finish.
+ When reducing the size to zero, this thread will process all remaining
+ items on the queue.
+
+ :return: self
:param size: if 0, the pool will do all work itself in the calling thread,
otherwise the work will be distributed among the given amount of threads"""
- raise NotImplementedError()
+ # either start new threads, or kill existing ones.
+ # If we end up with no threads, we process the remaining chunks on the queue
+ # ourselves
+ cur_count = len(self._workers)
+ if cur_count < size:
+ for i in range(size - cur_count):
+ worker = WorkerThread(self._queue)
+ self._workers.append(worker)
+ # END for each new worker to create
+ elif cur_count > size:
+ del_count = cur_count - size
+ for i in range(del_count):
+ self._workers[i].stop_and_join()
+ # END for each thread to stop
+ del(self._workers[:del_count])
+ # END handle count
+
+ if size == 0:
+ while not self._queue.empty():
+ try:
+ taskmethod, count = self._queue.get(False)
+ taskmethod(count)
+ except Queue.Empty:
+ continue
+ # END while there are tasks on the queue
+ # END process queue
+ return self
def add_task(self, task):
"""Add a new task to be processed.
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index 3938666a..7ca93c86 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -85,9 +85,9 @@ class TerminatableThread(threading.Thread):
class WorkerThread(TerminatableThread):
- """
- This base allows to call functions on class instances natively and retrieve
- their results asynchronously using a queue.
+ """ This base allows to call functions on class instances natively.
+ As it is meant to work with a pool, the result of the call must be
+ handled by the callee.
The thread runs forever unless it receives the terminate signal using
its task queue.
@@ -95,11 +95,9 @@ class WorkerThread(TerminatableThread):
allow the following:
inq = Queue()
- outq = Queue()
- w = WorkerThread(inq, outq)
+ w = WorkerThread(inq)
w.start()
inq.put((WorkerThread.<method>, args, kwargs))
- res = outq.get()
finally we call quit to terminate asap.
@@ -120,10 +118,9 @@ class WorkerThread(TerminatableThread):
class InvalidRoutineError(Exception):
"""Class sent as return value in case of an error"""
- def __init__(self, inq = None, outq = None):
+ def __init__(self, inq = None):
super(WorkerThread, self).__init__()
self.inq = inq or Queue.Queue()
- self.outq = outq or Queue.Queue()
def call(self, function, *args, **kwargs):
"""Method that makes the call to the worker using the input queue,
@@ -135,17 +132,6 @@ class WorkerThread(TerminatableThread):
:param args: arguments to pass to function
:parma **kwargs: kwargs to pass to function"""
self.inq.put((function, args, kwargs))
- return self.outq
-
- def wait_until_idle(self):
- """wait until the input queue is empty, in the meanwhile, take all
- results off the output queue."""
- while not self.inq.empty():
- try:
- self.outq.get(False)
- except Queue.Empty:
- continue
- # END while there are tasks on the queue
def run(self):
"""Process input tasks until we receive the quit signal"""
@@ -184,15 +170,12 @@ class WorkerThread(TerminatableThread):
else:
# ignore unknown items
print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
- self.outq.put(self.InvalidRoutineError(routine))
break
# END make routine call
- self.outq.put(rval)
except StopIteration:
break
except Exception,e:
print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
- self.outq.put(e)
# END routine exception handling
# END endless loop
diff --git a/test/git/async/test_graph.py b/test/git/async/test_graph.py
index 18d6997c..400e92cd 100644
--- a/test/git/async/test_graph.py
+++ b/test/git/async/test_graph.py
@@ -7,4 +7,84 @@ import time
class TestGraph(TestBase):
def test_base(self):
- pass
+ g = Graph()
+ nn = 10
+ assert nn > 2, "need at least 3 nodes"
+
+ # add unconnected nodes
+ for i in range(nn):
+ assert isinstance(g.add_node(Node()), Node)
+ # END add nodes
+ assert len(g.nodes) == nn
+
+ # delete unconnected nodes
+ for n in g.nodes[:]:
+ g.del_node(n)
+ # END del nodes
+
+ # add a chain of connected nodes
+ last = None
+ for i in range(nn):
+ n = g.add_node(Node())
+ if last:
+ assert not last.out_nodes
+ assert not n.in_nodes
+ assert g.add_edge(last, n) is g
+ assert last.out_nodes[0] is n
+ assert n.in_nodes[0] is last
+ last = n
+ # END for each node to connect
+
+ # try to connect a node with itself
+ self.failUnlessRaises(ValueError, g.add_edge, last, last)
+
+ # try to create a cycle
+ self.failUnlessRaises(ValueError, g.add_edge, g.nodes[0], g.nodes[-1])
+ self.failUnlessRaises(ValueError, g.add_edge, g.nodes[-1], g.nodes[0])
+
+ # we have undirected edges, readding the same edge, but the other way
+ # around does not change anything
+ n1, n2, n3 = g.nodes[0], g.nodes[1], g.nodes[2]
+ g.add_edge(n1, n2) # already connected
+ g.add_edge(n2, n1) # same thing
+ assert len(n1.out_nodes) == 1
+ assert len(n1.in_nodes) == 0
+ assert len(n2.in_nodes) == 1
+ assert len(n2.out_nodes) == 1
+
+ # deleting a connected node clears its neighbour connections
+ assert n3.in_nodes[0] is n2
+ g.del_node(n2)
+ assert len(g.nodes) == nn - 1
+ assert len(n3.in_nodes) == 0
+ assert len(n1.out_nodes) == 0
+
+ # check the history from the last node
+ last = g.nodes[-1]
+ class Visitor(object):
+ def __init__(self, origin):
+ self.origin_seen = False
+ self.origin = origin
+ self.num_seen = 0
+
+ def __call__(self, n):
+ if n is self.origin:
+ self.origin_seen = True
+ else:
+ assert not self.origin_seen, "should see origin last"
+ # END check origin
+ self.num_seen += 1
+ return True
+
+ def _assert(self, num_expected):
+ assert self.origin_seen
+ assert self.num_seen == num_expected
+ # END visitor helper
+
+ end = g.nodes[-1]
+ visitor = Visitor(end)
+ g.visit_input_inclusive_depth_first(end, visitor)
+
+ num_nodes_seen = nn - 2 # deleted second, which leaves first one disconnected
+ visitor._assert(num_nodes_seen)
+
diff --git a/test/git/async/test_thread.py b/test/git/async/test_thread.py
index ca306cc0..2ea8d1ff 100644
--- a/test/git/async/test_thread.py
+++ b/test/git/async/test_thread.py
@@ -37,9 +37,7 @@ class TestThreads( TestCase ):
# test different method types
standalone_func = lambda *args, **kwargs: worker.fun(*args, **kwargs)
for function in ("fun", TestWorker.fun, worker.fun, standalone_func):
- rval = worker.call(function, 1, this='that')
- assert isinstance(rval, Queue)
- assert rval.get() is True
+ worker.call(function, 1, this='that')
worker.make_assertion()
# END for each function type