From b72e2704022d889f116e49abf3e1e5d3e3192d3b Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 01:00:12 +0200 Subject: Improved pool design and started rough implementation, top down to learn while going. Tests will be written soon for verification, its still quite theoretical --- lib/git/async/graph.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 lib/git/async/graph.py (limited to 'lib/git/async/graph.py') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py new file mode 100644 index 00000000..0c0a2137 --- /dev/null +++ b/lib/git/async/graph.py @@ -0,0 +1,36 @@ +"""Simplistic implementation of a graph""" + +class Node(object): + """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. + Its not designed to support big graphs, and sports only the functionality + we need""" + __slots__ = ('in_nodes', 'out_nodes') + + +class Graph(object): + """A simple graph implementation, keeping nodes and providing basic access and + editing functions""" + __slots__ = "nodes" + + def __init__(self): + self.nodes = list() + + def add_node(self, node): + """Add a new node to the graph""" + raise NotImplementedError() + + def del_node(self, node): + """Delete a node from the graph""" + raise NotImplementedError() + + def add_edge(self, u, v): + """Add an undirected edge between the given nodes u and v. + :raise ValueError: If the new edge would create a cycle""" + raise NotImplementedError() + + def visit_input_depth_first(self, node, visitor=lambda n: True ): + """Visit all input nodes of the given node, depth first, calling visitor + for each node on our way. If the function returns False, the traversal + will not go any deeper, but continue at the next branch""" + raise NotImplementedError() + -- cgit v1.2.1 From ec28ad575ce1d7bb6a616ffc404f32bbb1af67b2 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 12:48:25 +0200 Subject: thread: adjusted worker thread not to provide an output queue anymore - this is handled by the task system graph: implemented it including test according to the pools requirements pool: implemented set_pool_size --- lib/git/async/graph.py | 84 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 74 insertions(+), 10 deletions(-) (limited to 'lib/git/async/graph.py') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index 0c0a2137..b4d6aa00 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -6,31 +6,95 @@ class Node(object): we need""" __slots__ = ('in_nodes', 'out_nodes') + def __init__(self): + self.in_nodes = list() + self.out_nodes = list() + class Graph(object): """A simple graph implementation, keeping nodes and providing basic access and - editing functions""" + editing functions. The performance is only suitable for small graphs of not + more than 10 nodes !""" __slots__ = "nodes" def __init__(self): self.nodes = list() def add_node(self, node): - """Add a new node to the graph""" - raise NotImplementedError() + """Add a new node to the graph + :return: the newly added node""" + self.nodes.append(node) + return node def del_node(self, node): - """Delete a node from the graph""" - raise NotImplementedError() + """Delete a node from the graph + :return: self""" + # clear connections + for outn in node.out_nodes: + del(outn.in_nodes[outn.in_nodes.index(node)]) + for inn in node.in_nodes: + del(inn.out_nodes[inn.out_nodes.index(node)]) + del(self.nodes[self.nodes.index(node)]) + return self def add_edge(self, u, v): """Add an undirected edge between the given nodes u and v. + + return: self :raise ValueError: If the new edge would create a cycle""" - raise NotImplementedError() + if u is v: + raise ValueError("Cannot connect a node with itself") + + # are they already connected ? + if u in v.in_nodes and v in u.out_nodes or \ + v in u.in_nodes and u in v.out_nodes: + return self + # END handle connection exists + + # cycle check - if we can reach any of the two by following either ones + # history, its a cycle + for start, end in ((u, v), (v,u)): + if not start.in_nodes: + continue + nodes = start.in_nodes[:] + seen = set() + # depth first search - its faster + while nodes: + n = nodes.pop() + if n in seen: + continue + seen.add(n) + if n is end: + raise ValueError("Connecting u with v would create a cycle") + nodes.extend(n.in_nodes) + # END while we are searching + # END for each direction to look + + # connection is valid, set it up + u.out_nodes.append(v) + v.in_nodes.append(u) + + return self - def visit_input_depth_first(self, node, visitor=lambda n: True ): + def visit_input_inclusive_depth_first(self, node, visitor=lambda n: True ): """Visit all input nodes of the given node, depth first, calling visitor for each node on our way. If the function returns False, the traversal - will not go any deeper, but continue at the next branch""" - raise NotImplementedError() - + will not go any deeper, but continue at the next branch + It will return the actual input node in the end !""" + nodes = node.in_nodes[:] + seen = set() + + # depth first + while nodes: + n = nodes.pop() + if n in seen: + continue + seen.add(n) + + # only proceed in that direction if visitor is fine with it + if visitor(n): + nodes.extend(n.in_nodes) + # END call visitor + # END while walking + visitor(node) + -- cgit v1.2.1 From b3cde0ee162b8f0cb67da981311c8f9c16050a62 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 18:13:21 +0200 Subject: First step of testing the pool - tasks have been separated into a new module including own tests, their design improved to prepare them for some specifics that would be needed for multiprocessing support --- lib/git/async/graph.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'lib/git/async/graph.py') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index b4d6aa00..d817eeb4 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -1,14 +1,20 @@ """Simplistic implementation of a graph""" class Node(object): - """A quick and dirty to the point implementation of a simple, and slow ascyclic graph. - Its not designed to support big graphs, and sports only the functionality - we need""" - __slots__ = ('in_nodes', 'out_nodes') + """A Node in the graph. They know their neighbours, and have an id which should + resolve into a string""" + __slots__ = ('in_nodes', 'out_nodes', 'id') - def __init__(self): + def __init__(self, id=None): + self.id = id self.in_nodes = list() self.out_nodes = list() + + def __str__(self): + return str(self.id) + + def __repr__(self): + return "%s(%s)" % (type(self).__name__, self.id) class Graph(object): -- cgit v1.2.1 From 1b27292936c81637f6b9a7141dafaad1126f268e Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sun, 6 Jun 2010 21:15:13 +0200 Subject: Plenty of fixes in the chunking routine, made possible by a serialized chunking test. Next up, actual async processing --- lib/git/async/graph.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'lib/git/async/graph.py') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index d817eeb4..6386cbaa 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -35,12 +35,17 @@ class Graph(object): def del_node(self, node): """Delete a node from the graph :return: self""" + try: + del(self.nodes[self.nodes.index(node)]) + except ValueError: + return self + # END ignore if it doesn't exist + # clear connections for outn in node.out_nodes: del(outn.in_nodes[outn.in_nodes.index(node)]) for inn in node.in_nodes: del(inn.out_nodes[inn.out_nodes.index(node)]) - del(self.nodes[self.nodes.index(node)]) return self def add_edge(self, u, v): -- cgit v1.2.1 From 3323464f85b986cba23176271da92a478b33ab9c Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 00:24:49 +0200 Subject: messy first version of a properly working depth-first graph method, which allows the pool to work as expected. Many more tests need to be added, and there still is a problem with shutdown as sometimes it won't kill all threads, mainly because the process came up with worker threads started, which cannot be --- lib/git/async/graph.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) (limited to 'lib/git/async/graph.py') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index 6386cbaa..e3999cdc 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -87,25 +87,26 @@ class Graph(object): return self - def visit_input_inclusive_depth_first(self, node, visitor=lambda n: True ): - """Visit all input nodes of the given node, depth first, calling visitor - for each node on our way. If the function returns False, the traversal - will not go any deeper, but continue at the next branch - It will return the actual input node in the end !""" - nodes = node.in_nodes[:] + def input_inclusive_dfirst_reversed(self, node): + """Return all input nodes of the given node, depth first, + It will return the actual input node last, as it is required + like that by the pool""" + stack = [node] seen = set() # depth first - while nodes: - n = nodes.pop() + out = list() + while stack: + n = stack.pop() if n in seen: continue seen.add(n) + out.append(n) # only proceed in that direction if visitor is fine with it - if visitor(n): - nodes.extend(n.in_nodes) + stack.extend(n.in_nodes) # END call visitor # END while walking - visitor(node) + out.reverse() + return out -- cgit v1.2.1 From fbe062bf6dacd3ad63dd827d898337fa542931ac Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Thu, 10 Jun 2010 23:55:50 +0200 Subject: Added dependency-task tests, and fixed plenty of ref-count related bugs, as well as concurrency issues. Now it works okay, but the thread-shutdown is still an issue, as it causes incorrect behaviour making the tests fail. Its good, as it hints at additional issues that need to be solved. There is just a little more left on the feature side, but its nearly there --- lib/git/async/graph.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'lib/git/async/graph.py') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index e3999cdc..9ee0e891 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -25,14 +25,24 @@ class Graph(object): def __init__(self): self.nodes = list() + + def __del__(self): + """Deletes bidericational dependencies""" + for node in self.nodes: + node.in_nodes = None + node.out_nodes = None + # END cleanup nodes + + # otherwise the nodes would keep floating around + def add_node(self, node): """Add a new node to the graph :return: the newly added node""" self.nodes.append(node) return node - def del_node(self, node): + def remove_node(self, node): """Delete a node from the graph :return: self""" try: @@ -46,6 +56,8 @@ class Graph(object): del(outn.in_nodes[outn.in_nodes.index(node)]) for inn in node.in_nodes: del(inn.out_nodes[inn.out_nodes.index(node)]) + node.out_nodes = list() + node.in_nodes = list() return self def add_edge(self, u, v): -- cgit v1.2.1 From cac6e06cc9ef2903a15e594186445f3baa989a1a Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Fri, 11 Jun 2010 16:58:44 +0200 Subject: test_task: fixed import error, made all modules from x import * safe --- lib/git/async/graph.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'lib/git/async/graph.py') diff --git a/lib/git/async/graph.py b/lib/git/async/graph.py index 9ee0e891..4e14c81e 100644 --- a/lib/git/async/graph.py +++ b/lib/git/async/graph.py @@ -1,5 +1,7 @@ """Simplistic implementation of a graph""" +__all__ = ('Node', 'Graph') + class Node(object): """A Node in the graph. They know their neighbours, and have an id which should resolve into a string""" -- cgit v1.2.1