diff options
Diffstat (limited to 'lib/git/async')
-rw-r--r-- | lib/git/async/graph.py | 84 | ||||
-rw-r--r-- | lib/git/async/pool.py | 53 | ||||
-rw-r--r-- | lib/git/async/thread.py | 27 |
3 files changed, 127 insertions, 37 deletions
diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index 0c0a2137..b4d6aa00 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -6,31 +6,95 @@ class Node(object): we need""" __slots__ = ('in_nodes', 'out_nodes') + def __init__(self): + self.in_nodes = list() + self.out_nodes = list() + class Graph(object): """A simple graph implementation, keeping nodes and providing basic access and - editing functions""" + editing functions. The performance is only suitable for small graphs of not + more than 10 nodes !""" __slots__ = "nodes" def __init__(self): self.nodes = list() def add_node(self, node): - """Add a new node to the graph""" - raise NotImplementedError() + """Add a new node to the graph + :return: the newly added node""" + self.nodes.append(node) + return node def del_node(self, node): - """Delete a node from the graph""" - raise NotImplementedError() + """Delete a node from the graph + :return: self""" + # clear connections + for outn in node.out_nodes: + del(outn.in_nodes[outn.in_nodes.index(node)]) + for inn in node.in_nodes: + del(inn.out_nodes[inn.out_nodes.index(node)]) + del(self.nodes[self.nodes.index(node)]) + return self def add_edge(self, u, v): """Add an undirected edge between the given nodes u and v. + + return: self :raise ValueError: If the new edge would create a cycle""" - raise NotImplementedError() + if u is v: + raise ValueError("Cannot connect a node with itself") + + # are they already connected ? + if u in v.in_nodes and v in u.out_nodes or \ + v in u.in_nodes and u in v.out_nodes: + return self + # END handle connection exists + + # cycle check - if we can reach any of the two by following either ones + # history, its a cycle + for start, end in ((u, v), (v,u)): + if not start.in_nodes: + continue + nodes = start.in_nodes[:] + seen = set() + # depth first search - its faster + while nodes: + n = nodes.pop() + if n in seen: + continue + seen.add(n) + if n is end: + raise ValueError("Connecting u with v would create a cycle") + nodes.extend(n.in_nodes) + # END while we are searching + # END for each direction to look + + # connection is valid, set it up + u.out_nodes.append(v) + v.in_nodes.append(u) + + return self - def visit_input_depth_first(self, node, visitor=lambda n: True ): + def visit_input_inclusive_depth_first(self, node, visitor=lambda n: True ): """Visit all input nodes of the given node, depth first, calling visitor for each node on our way. If the function returns False, the traversal - will not go any deeper, but continue at the next branch""" - raise NotImplementedError() - + will not go any deeper, but continue at the next branch + It will return the actual input node in the end !""" + nodes = node.in_nodes[:] + seen = set() + + # depth first + while nodes: + n = nodes.pop() + if n in seen: + continue + seen.add(n) + + # only proceed in that direction if visitor is fine with it + if visitor(n): + nodes.extend(n.in_nodes) + # END call visitor + # END while walking + visitor(node) + diff --git a/lib/git/async/pool.py b/lib/git/async/pool.py index 7798d3d4..9a24cbc5 100644 --- a/lib/git/async/pool.py +++ b/lib/git/async/pool.py @@ -25,7 +25,8 @@ class TaskNode(Node): '_out_wc', # output write channel '_pool_ref', # ref to our pool '_exc', # exception caught - 'fun', # function to call with items read from in_rc + 'fun', # function to call with items read from in_rc + 'min_count', # minimum amount of items to produce, None means no override 'max_chunksize', # maximium amount of items to process per process call 'apply_single' # apply single items even if multiple where read ) @@ -36,6 +37,7 @@ class TaskNode(Node): self._pool_ref = None self._exc = None self.fun = fun + self.min_count = None self.max_chunksize = 0 # note set self.apply_single = apply_single @@ -174,6 +176,12 @@ class ThreadPool(object): if task.error() or task.is_done(): self._consumed_tasks.append(task) + # allow min-count override. This makes sure we take at least min-count + # items off the input queue ( later ) + if task.min_count is not None: + count = task.min_count + # END handle min-count + # if the task does not have the required output on its queue, schedule # it for processing. If we should process all, we don't care about the # amount as it should process until its all done. @@ -213,7 +221,7 @@ class ThreadPool(object): Tasks which are not done will be put onto the queue for processing, which is fine as we walked them depth-first.""" - self._tasks.visit_input_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) + self._tasks.visit_input_inclusive_depth_first(task, lambda n: self._queue_feeder_visitor(n, count)) # delete consumed tasks to cleanup for task in self._consumed_tasks: @@ -233,7 +241,9 @@ class ThreadPool(object): """Delete the task Additionally we will remove orphaned tasks, which can be identified if their output channel is only held by themselves, so no one will ever consume - its items.""" + its items. + + :return: self""" # now delete our actual node - must set it done os it closes its channels. # Otherwise further reads of output tasks will block. # Actually they may still block if anyone wants to read all ... without @@ -246,12 +256,45 @@ class ThreadPool(object): for t in in_tasks self._del_task_if_orphaned(t) # END handle orphans recursively + + return self def set_pool_size(self, size=0): - """Set the amount of workers to use in this pool. + """Set the amount of workers to use in this pool. When reducing the size, + the call may block as it waits for threads to finish. + When reducing the size to zero, this thread will process all remaining + items on the queue. + + :return: self :param size: if 0, the pool will do all work itself in the calling thread, otherwise the work will be distributed among the given amount of threads""" - raise NotImplementedError() + # either start new threads, or kill existing ones. + # If we end up with no threads, we process the remaining chunks on the queue + # ourselves + cur_count = len(self._workers) + if cur_count < size: + for i in range(size - cur_count): + worker = WorkerThread(self._queue) + self._workers.append(worker) + # END for each new worker to create + elif cur_count > size: + del_count = cur_count - size + for i in range(del_count): + self._workers[i].stop_and_join() + # END for each thread to stop + del(self._workers[:del_count]) + # END handle count + + if size == 0: + while not self._queue.empty(): + try: + taskmethod, count = self._queue.get(False) + taskmethod(count) + except Queue.Empty: + continue + # END while there are tasks on the queue + # END process queue + return self def add_task(self, task): """Add a new task to be processed. diff --git a/lib/git/async/thread.py b/lib/git/async/thread.py index 3938666a..7ca93c86 100644 --- a/lib/git/async/thread.py +++ b/lib/git/async/thread.py @@ -85,9 +85,9 @@ class TerminatableThread(threading.Thread): class WorkerThread(TerminatableThread): - """ - This base allows to call functions on class instances natively and retrieve - their results asynchronously using a queue. + """ This base allows to call functions on class instances natively. + As it is meant to work with a pool, the result of the call must be + handled by the callee. The thread runs forever unless it receives the terminate signal using its task queue. @@ -95,11 +95,9 @@ class WorkerThread(TerminatableThread): allow the following: inq = Queue() - outq = Queue() - w = WorkerThread(inq, outq) + w = WorkerThread(inq) w.start() inq.put((WorkerThread.<method>, args, kwargs)) - res = outq.get() finally we call quit to terminate asap. @@ -120,10 +118,9 @@ class WorkerThread(TerminatableThread): class InvalidRoutineError(Exception): """Class sent as return value in case of an error""" - def __init__(self, inq = None, outq = None): + def __init__(self, inq = None): super(WorkerThread, self).__init__() self.inq = inq or Queue.Queue() - self.outq = outq or Queue.Queue() def call(self, function, *args, **kwargs): """Method that makes the call to the worker using the input queue, @@ -135,17 +132,6 @@ class WorkerThread(TerminatableThread): :param args: arguments to pass to function :parma **kwargs: kwargs to pass to function""" self.inq.put((function, args, kwargs)) - return self.outq - - def wait_until_idle(self): - """wait until the input queue is empty, in the meanwhile, take all - results off the output queue.""" - while not self.inq.empty(): - try: - self.outq.get(False) - except Queue.Empty: - continue - # END while there are tasks on the queue def run(self): """Process input tasks until we receive the quit signal""" @@ -184,15 +170,12 @@ class WorkerThread(TerminatableThread): else: # ignore unknown items print "%s: task %s was not understood - terminating" % (self.getName(), str(tasktuple)) - self.outq.put(self.InvalidRoutineError(routine)) break # END make routine call - self.outq.put(rval) except StopIteration: break except Exception,e: print "%s: Task %s raised unhandled exception: %s" % (self.getName(), str(tasktuple), str(e)) - self.outq.put(e) # END routine exception handling # END endless loop |