summaryrefslogtreecommitdiff
path: root/lib/git/async
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2010-06-06 12:48:25 +0200
committerSebastian Thiel <byronimo@gmail.com>2010-06-06 12:48:25 +0200
commitec28ad575ce1d7bb6a616ffc404f32bbb1af67b2 (patch)
tree90003f8f93becbb0b8aacd4c2ff7119842fa8003 /lib/git/async
parentb72e2704022d889f116e49abf3e1e5d3e3192d3b (diff)
downloadgitpython-ec28ad575ce1d7bb6a616ffc404f32bbb1af67b2.tar.gz
thread: adjusted worker thread not to provide an output queue anymore - this is handled by the task system
graph: implemented it including test according to the pools requirements pool: implemented set_pool_size
Diffstat (limited to 'lib/git/async')
-rw-r--r--lib/git/async/graph.py84
-rw-r--r--lib/git/async/pool.py53
-rw-r--r--lib/git/async/thread.py27
3 files changed, 127 insertions, 37 deletions
diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py
index 0c0a2137..b4d6aa00 100644
--- a/lib/git/async/graph.py
+++ b/lib/git/async/graph.py
@@ -6,31 +6,95 @@ class Node(object):
we need"""
__slots__ = ('in_nodes', 'out_nodes')
+ def __init__(self):
+ self.in_nodes = list()
+ self.out_nodes = list()
+
class Graph(object):
"""A simple graph implementation, keeping nodes and providing basic access and
- editing functions"""
+ editing functions. The performance is only suitable for small graphs of not
+ more than 10 nodes !"""
__slots__ = "nodes"
def __init__(self):
self.nodes = list()
def add_node(self, node):
- """Add a new node to the graph"""
- raise NotImplementedError()
+ """Add a new node to the graph
+ :return: the newly added node"""
+ self.nodes.append(node)
+ return node
def del_node(self, node):
- """Delete a node from the graph"""
- raise NotImplementedError()
+ """Delete a node from the graph
+ :return: self"""
+ # clear connections
+ for outn in node.out_nodes:
+ del(outn.in_nodes[outn.in_nodes.index(node)])
+ for inn in node.in_nodes:
+ del(inn.out_nodes[inn.out_nodes.index(node)])
+ del(self.nodes[self.nodes.index(node)])
+ return self
def add_edge(self, u, v):
"""Add an undirected edge between the given nodes u and v.
+
+ return: self
:raise ValueError: If the new edge would create a cycle"""
- raise NotImplementedError()
+ if u is v:
+ raise ValueError("Cannot connect a node with itself")
+
+ # are they already connected ?
+ if u in v.in_nodes and v in u.out_nodes or \
+ v in u.in_nodes and u in v.out_nodes:
+ return self
+ # END handle connection exists
+
+ # cycle check - if we can reach any of the two by following either ones
+ # history, its a cycle
+ for start, end in ((u, v), (v,u)):
+ if not start.in_nodes:
+ continue
+ nodes = start.in_nodes[:]
+ seen = set()
+ # depth first search - its faster
+ while nodes:
+ n = nodes.pop()
+ if n in seen:
+ continue
+ seen.add(n)
+ if n is end:
+ raise ValueError("Connecting u with v would create a cycle")
+ nodes.extend(n.in_nodes)
+ # END while we are searching
+ # END for each direction to look
+
+ # connection is valid, set it up
+ u.out_nodes.append(v)
+ v.in_nodes.append(u)
+
+ return self
- def visit_input_depth_first(self, node, visitor=lambda n: True ):
+ def visit_input_inclusive_depth_first(self, node, visitor=lambda n: True ):
"""Visit all input nodes of the given node, depth first, calling visitor
for each node on our way. If the function returns False, the traversal
- will not go any deeper, but continue at the next branch"""
- raise NotImplementedError()
-
+ will not go any deeper, but continue at the next branch
+ It will return the actual input node in the end !"""
+ nodes = node.in_nodes[:]
+ seen = set()
+
+ # depth first
+ while nodes:
+ n = nodes.pop()
+ if n in seen:
+ continue
+ seen.add(n)
+
+ # only proceed in that direction if visitor is fine with it
+ if visitor(n):
+ nodes.extend(n.in_nodes)
+ # END call visitor
+ # END while walking
+ visitor(node)
+
diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py
index 7798d3d4..9a24cbc5 100644
--- a/lib/git/async/pool.py
+++ b/lib/git/async/pool.py
@@ -25,7 +25,8 @@ class TaskNode(Node):
'_out_wc', # output write channel
'_pool_ref', # ref to our pool
'_exc', # exception caught
- 'fun', # function to call with items read from in_rc
+ 'fun', # function to call with items read from in_rc
+ 'min_count', # minimum amount of items to produce, None means no override
'max_chunksize', # maximium amount of items to process per process call
'apply_single' # apply single items even if multiple where read
)
@@ -36,6 +37,7 @@ class TaskNode(Node):
self._pool_ref = None
self._exc = None
self.fun = fun
+ self.min_count = None
self.max_chunksize = 0 # note set
self.apply_single = apply_single
@@ -174,6 +176,12 @@ class ThreadPool(object):
if task.error() or task.is_done():
self._consumed_tasks.append(task)
+ # allow min-count override. This makes sure we take at least min-count
+ # items off the input queue ( later )
+ if task.min_count is not None:
+ count = task.min_count
+ # END handle min-count
+
# if the task does not have the required output on its queue, schedule
# it for processing. If we should process all, we don't care about the
# amount as it should process until its all done.
@@ -213,7 +221,7 @@ class ThreadPool(object):
Tasks which are not done will be put onto the queue for processing, which
is fine as we walked them depth-first."""
- self._tasks.visit_input_depth_first(task, lambda n: self._queue_feeder_visitor(n, count))
+ self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count))
# delete consumed tasks to cleanup
for task in self._consumed_tasks:
@@ -233,7 +241,9 @@ class ThreadPool(object):
"""Delete the task
Additionally we will remove orphaned tasks, which can be identified if their
output channel is only held by themselves, so no one will ever consume
- its items."""
+ its items.
+
+ :return: self"""
# now delete our actual node - must set it done os it closes its channels.
# Otherwise further reads of output tasks will block.
# Actually they may still block if anyone wants to read all ... without
@@ -246,12 +256,45 @@ class ThreadPool(object):
for t in in_tasks
self._del_task_if_orphaned(t)
# END handle orphans recursively
+
+ return self
def set_pool_size(self, size=0):
- """Set the amount of workers to use in this pool.
+ """Set the amount of workers to use in this pool. When reducing the size,
+ the call may block as it waits for threads to finish.
+ When reducing the size to zero, this thread will process all remaining
+ items on the queue.
+
+ :return: self
:param size: if 0, the pool will do all work itself in the calling thread,
otherwise the work will be distributed among the given amount of threads"""
- raise NotImplementedError()
+ # either start new threads, or kill existing ones.
+ # If we end up with no threads, we process the remaining chunks on the queue
+ # ourselves
+ cur_count = len(self._workers)
+ if cur_count < size:
+ for i in range(size - cur_count):
+ worker = WorkerThread(self._queue)
+ self._workers.append(worker)
+ # END for each new worker to create
+ elif cur_count > size:
+ del_count = cur_count - size
+ for i in range(del_count):
+ self._workers[i].stop_and_join()
+ # END for each thread to stop
+ del(self._workers[:del_count])
+ # END handle count
+
+ if size == 0:
+ while not self._queue.empty():
+ try:
+ taskmethod, count = self._queue.get(False)
+ taskmethod(count)
+ except Queue.Empty:
+ continue
+ # END while there are tasks on the queue
+ # END process queue
+ return self
def add_task(self, task):
"""Add a new task to be processed.
diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py
index 3938666a..7ca93c86 100644
--- a/lib/git/async/thread.py
+++ b/lib/git/async/thread.py
@@ -85,9 +85,9 @@ class TerminatableThread(threading.Thread):
class WorkerThread(TerminatableThread):
- """
- This base allows to call functions on class instances natively and retrieve
- their results asynchronously using a queue.
+ """ This base allows to call functions on class instances natively.
+ As it is meant to work with a pool, the result of the call must be
+ handled by the callee.
The thread runs forever unless it receives the terminate signal using
its task queue.
@@ -95,11 +95,9 @@ class WorkerThread(TerminatableThread):
allow the following:
inq = Queue()
- outq = Queue()
- w = WorkerThread(inq, outq)
+ w = WorkerThread(inq)
w.start()
inq.put((WorkerThread.<method>, args, kwargs))
- res = outq.get()
finally we call quit to terminate asap.
@@ -120,10 +118,9 @@ class WorkerThread(TerminatableThread):
class InvalidRoutineError(Exception):
"""Class sent as return value in case of an error"""
- def __init__(self, inq = None, outq = None):
+ def __init__(self, inq = None):
super(WorkerThread, self).__init__()
self.inq = inq or Queue.Queue()
- self.outq = outq or Queue.Queue()
def call(self, function, *args, **kwargs):
"""Method that makes the call to the worker using the input queue,
@@ -135,17 +132,6 @@ class WorkerThread(TerminatableThread):
:param args: arguments to pass to function
:parma **kwargs: kwargs to pass to function"""
self.inq.put((function, args, kwargs))
- return self.outq
-
- def wait_until_idle(self):
- """wait until the input queue is empty, in the meanwhile, take all
- results off the output queue."""
- while not self.inq.empty():
- try:
- self.outq.get(False)
- except Queue.Empty:
- continue
- # END while there are tasks on the queue
def run(self):
"""Process input tasks until we receive the quit signal"""
@@ -184,15 +170,12 @@ class WorkerThread(TerminatableThread):
else:
# ignore unknown items
print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple))
- self.outq.put(self.InvalidRoutineError(routine))
break
# END make routine call
- self.outq.put(rval)
except StopIteration:
break
except Exception,e:
print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e))
- self.outq.put(e)
# END routine exception handling
# END endless loop