summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-10-04 13:40:59 +0000
committerTed Ross <tross@apache.org>2013-10-04 13:40:59 +0000
commit28302f5604920d7cf7941481ff376f92cdd0e535 (patch)
tree3f7e2686d87cb713f7be84f46c5b4d7eb027c116
parent20fbc65eba57a9526a39652cb1473a8551d3d97b (diff)
downloadqpid-python-28302f5604920d7cf7941481ff376f92cdd0e535.tar.gz
QPID-4967 - Work in progress on multi-router networks
- Added a feature to the hash table to allow referenced objects to hold a direct linkage back to the hash structure for fast deletion and access to the key. This allows the key to be stored in only one place and allows items to be removed without requiring a hash lookup on the key. - Completed the integration of the Python router and the C data structures that track remote routers (neighbor and multi-hop). - Allow multiple addresses in the ioAdapter from Python. - Added a separate address for the hello messages because the messaging pattern is different for these messages. - Added some content to the TODO file. - Added test configurations for a two-router network. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1529163 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/extras/dispatch/TODO47
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/compose.h3
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/hash.h12
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/link.py6
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py6
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py2
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/node.py89
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py27
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/routing.py13
-rw-r--r--qpid/extras/dispatch/src/agent.c2
-rw-r--r--qpid/extras/dispatch/src/container.c4
-rw-r--r--qpid/extras/dispatch/src/hash.c59
-rw-r--r--qpid/extras/dispatch/src/python_embedded.c23
-rw-r--r--qpid/extras/dispatch/src/router_node.c163
-rw-r--r--qpid/extras/dispatch/src/router_private.h14
-rw-r--r--qpid/extras/dispatch/src/router_pynode.c279
-rw-r--r--qpid/extras/dispatch/tests/field_test.c4
-rw-r--r--qpid/extras/dispatch/tests/router_engine_test.py112
-rw-r--r--qpid/extras/dispatch/tests/system_tests_one_router.py12
-rw-r--r--qpid/extras/dispatch/tests/tworouters-A.conf54
-rw-r--r--qpid/extras/dispatch/tests/tworouters-B.conf61
21 files changed, 776 insertions, 216 deletions
diff --git a/qpid/extras/dispatch/TODO b/qpid/extras/dispatch/TODO
index ab0c1a951e..0ae3988d29 100644
--- a/qpid/extras/dispatch/TODO
+++ b/qpid/extras/dispatch/TODO
@@ -5,3 +5,50 @@ enhancements to be fixed by going to the Apache Qpid JIRA instance:
http://issues.apache.org/jira/browse/QPID
==============================================================================
+
+- Router Mode:
+ o Stand-Alone-Router - Does not participate in routing protocol, does not permit inter-router
+ links, acts as a normal interior-router otherwise.
+ o Interior-Router - Participates in the routing protocol
+ o Edge-Concentrator - Does not participate in routing protocol, requires uplink connection(s)
+ This mode should be used when Dispatch is integrated into an endpoint
+ application or when it is acting as a connection concentrator.
+ Proxy and access-protocol functions will be available in this mode.
+
+- Connection Annotation:
+ o Type: Inter-router, uplink, endpoint, etc. This formal annotation can be accessed internally
+ by the connection handlers to guide Dispatch's handling of new connections.
+ o Weight-{in,out}: Weight/Cost metrics for inter-router links
+
+- Statistics for Instrumentation:
+ o Link
+ . delivery count {unsettled, pre-settled}
+ . deliveries {accepted, rejected, released, modified}
+ . octets of delivery {accepted, rejected, released, modified}
+ . flow frame count
+ . disposition frame count {forward, backward}
+ o Address
+ . deliveries {ingress, egress, transit}
+ . octets of delivery {ingress, egress, transit}
+
+- Infrastructure
+ o Router_Link - Buffer and Iterator for a copy of the link's target address (for use
+ as an address for messages with no 'to' field).
+ o Router Event Queue - Event queue to feed alerts to the Python router code.
+ Neighbor-link-loss is a valuable event because it accelerates the
+ detection of topology change.
+ o All PyRouter stimulus through a work queue.
+ o Router Code Updates
+ . Remove all vestiges of "binding"
+ . Calculate the valid-origin mask for each path
+ . Report address mappings to routers
+ o Expose idle-timeout/keepalive on connectors and listeners
+
+- Major Roadmap Features
+ o Security Policy Enforcement
+ o Proxy (Translation Node) Capability
+ o Address Provisioning with variable semantics
+ o Link Routing
+ o Management, Instrumentation, and Accounting
+ o Link Cost
+ o Area Routing
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/compose.h b/qpid/extras/dispatch/include/qpid/dispatch/compose.h
index 7d668f0ac0..d68f6be746 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/compose.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/compose.h
@@ -167,7 +167,8 @@ void dx_compose_insert_string(dx_composed_field_t *field, const char *value);
* Insert a utf8-encoded string into the field from an iterator
*
* @param field A field created by dx_compose.
- * @param value A pointer to a null-terminated string.
+ * @param iter An iterator for a string value. The caller is responsible for freeing
+ * this iterator after the call is complete.
*/
void dx_compose_insert_string_iterator(dx_composed_field_t *field, dx_field_iterator_t *iter);
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/hash.h b/qpid/extras/dispatch/include/qpid/dispatch/hash.h
index bfad142517..bf22c2c603 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/hash.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/hash.h
@@ -23,16 +23,22 @@
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/error.h>
-typedef struct hash_t hash_t;
+typedef struct hash_t hash_t;
+typedef struct hash_handle_t hash_handle_t;
hash_t *hash(int bucket_exponent, int batch_size, int value_is_const);
void hash_free(hash_t *h);
size_t hash_size(hash_t *h);
-dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val);
-dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val);
+dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val, hash_handle_t **handle);
+dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val, hash_handle_t **handle);
dx_error_t hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val);
dx_error_t hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val);
dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key);
+void hash_handle_free(hash_handle_t *handle);
+const unsigned char *hash_key_by_handle(const hash_handle_t *handle);
+dx_error_t hash_remove_by_handle(hash_t *h, hash_handle_t *handle);
+
+
#endif
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/link.py b/qpid/extras/dispatch/python/qpid/dispatch/router/link.py
index 46e03379b4..4a770684cd 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/link.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/link.py
@@ -101,7 +101,7 @@ class LinkStateEngine(object):
if self.id not in self.collection:
return
my_ls = self.collection[self.id]
- self.container.send('_topo/%s/%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls))
+ self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls))
def new_local_link_state(self, link_state):
@@ -132,7 +132,7 @@ class LinkStateEngine(object):
def _send_lsrs(self):
for (_area, _id) in self.needed_lsrs.keys():
- self.container.send('_topo/%s/%s' % (_area, _id), MessageLSR(None, self.id, self.area))
+ self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageLSR(None, self.id, self.area))
self.needed_lsrs = {}
@@ -140,4 +140,4 @@ class LinkStateEngine(object):
ls_seq = 0
if self.id in self.collection:
ls_seq = self.collection[self.id].ls_seq
- self.container.send('_topo/%s/all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq))
+ self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq))
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
index 5413dd38a8..0707f7f250 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
@@ -56,7 +56,7 @@ class MobileAddressEngine(object):
##
if len(self.added_keys) > 0 or len(self.deleted_keys) > 0:
self.mobile_seq += 1
- self.container.send('_topo.%s.all' % self.area,
+ self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area,
MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys))
self.local_keys.extend(self.added_keys)
for key in self.deleted_keys:
@@ -162,7 +162,7 @@ class MobileAddressEngine(object):
if msg.id == self.id:
return
if msg.have_seq < self.mobile_seq:
- self.container.send('_topo.%s.%s' % (msg.area, msg.id),
+ self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id),
MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys))
@@ -183,6 +183,6 @@ class MobileAddressEngine(object):
def _send_mars(self):
for _id, _area, _seq in self.needed_mars.keys():
- self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
+ self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
self.needed_mars = {}
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py b/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py
index 6a16049065..37b7888efe 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py
@@ -49,7 +49,7 @@ class NeighborEngine(object):
if now - self.last_hello_time >= self.hello_interval:
self.last_hello_time = now
- self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys()))
+ self.container.send('amqp:/_local/qdxhello', MessageHELLO(None, self.id, self.area, self.hellos.keys()))
if self.link_state_changed:
self.link_state_changed = False
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
index 433ce3ab1e..b1f6df2662 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
@@ -50,10 +50,16 @@ class NodeTracker(object):
A node, designated by node_id, has been discovered as a neighbor over a link with
a maskbit of link_maskbit.
"""
- if node_id not in self.nodes:
- self.nodes[node_id] = RemoteNode(node_id, self._allocate_maskbit())
- self.nodes[node_id].set_neighbor(link_maskbit)
- self._notify(self.nodes[node_id])
+ if node_id in self.nodes:
+ node = self.nodes[node_id]
+ if node.neighbor:
+ return
+ self.container.del_remote_router(node.maskbit)
+ node.neighbor = True
+ else:
+ node = RemoteNode(node_id, self._allocate_maskbit(), True)
+ self.nodes[node_id] = node
+ self.container.add_neighbor_router(self._address(node_id), node.maskbit, link_maskbit)
def lost_neighbor(self, node_id):
@@ -61,9 +67,11 @@ class NodeTracker(object):
We have lost contact with a neighboring node node_id.
"""
node = self.nodes[node_id]
- node.clear_neighbor()
- self._notify(node)
- if node.to_delete():
+ node.neighbor = False
+ self.container.del_neighbor_router(node.maskbit)
+ if node.remote:
+ self.container.add_remote_router(self._address(node.id), node.maskbit)
+ else:
self._free_maskbit(node.maskbit)
self.nodes.pop(node_id)
@@ -74,9 +82,12 @@ class NodeTracker(object):
remote peer.
"""
if node_id not in self.nodes:
- self.nodes[node_id] = RemoteNode(node_id, self._allocate_maskbit())
- self.nodes[node_id].set_remote()
- self._notify(self.nodes[node_id])
+ node = RemoteNode(node_id, self._allocate_maskbit(), False)
+ self.nodes[node_id] = node
+ self.container.add_remote_router(self._address(node.id), node.maskbit)
+ else:
+ node = self.nodes[node_id]
+ node.remote = True
def lost_node(self, node_id):
@@ -84,11 +95,21 @@ class NodeTracker(object):
A remote node, node_id, has not been heard from for too long and is being deemed lost.
"""
node = self.nodes[node_id]
- node.clear_remote()
- self._notify(node)
- if node.to_delete():
- self._free_maskbit(node.maskbit)
- self.nodes.pop(node_id)
+ if node.remote:
+ node.remote = False
+ if not node.neighbor:
+ self.container.del_remote_router(node.maskbit)
+ self._free_maskbit(node.maskbit)
+ self.nodes.pop(node_id)
+
+
+ def maskbit_for_node(self, node_id):
+ """
+ """
+ node = self.nodes[node_id]
+ if node:
+ return node.maskbit
+ return None
def _allocate_maskbit(self):
@@ -110,39 +131,15 @@ class NodeTracker(object):
self.next_maskbit = i
- def _notify(self, node):
- if node.to_delete():
- self.container.node_updated("R%s" % node.id, 0, 0, 0, 0)
- else:
- is_neighbor = 0
- if node.neighbor:
- is_neighbor = 1
- self.container.node_updated("R%s" % node.id, 1, is_neighbor, node.link_maskbit, node.maskbit)
+ def _address(self, node_id):
+ return "amqp:/_topo/%s/%s" % (self.container.area, node_id)
class RemoteNode(object):
- def __init__(self, node_id, maskbit):
- self.id = node_id
- self.neighbor = None
- self.link_maskbit = None
- self.maskbit = maskbit
- self.remote = None
-
- def set_neighbor(self, link_maskbit):
- self.neighbor = True
- self.link_maskbit = link_maskbit
-
- def set_remote(self):
- self.remote = True
-
- def clear_neighbor(self):
- self.neighbor = None
- self.link_maskbit = None
-
- def clear_remote(self):
- self.remote = None
-
- def to_delete(self):
- return self.neighbor == None and self.remote == None
+ def __init__(self, node_id, maskbit, neighbor):
+ self.id = node_id
+ self.maskbit = maskbit
+ self.neighbor = neighbor
+ self.remote = not neighbor
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
index 6fde822444..c2c623f980 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
@@ -55,7 +55,7 @@ class RouterEngine:
self.domain = "domain"
self.router_adapter = router_adapter
self.log_adapter = LogAdapter("dispatch.router")
- self.io_adapter = IoAdapter(self, "qdxrouter")
+ self.io_adapter = IoAdapter(self, ("qdxrouter", "qdxhello"))
self.max_routers = max_routers
self.id = router_id
self.area = area
@@ -71,14 +71,14 @@ class RouterEngine:
##
## Launch the sub-module engines
##
+ self.node_tracker = NodeTracker(self, self.max_routers)
self.neighbor_engine = NeighborEngine(self)
self.link_state_engine = LinkStateEngine(self)
self.path_engine = PathEngine(self)
self.mobile_address_engine = MobileAddressEngine(self)
- self.routing_table_engine = RoutingTableEngine(self)
+ self.routing_table_engine = RoutingTableEngine(self, self.node_tracker)
self.binding_engine = BindingEngine(self)
self.adapter_engine = AdapterEngine(self)
- self.node_tracker = NodeTracker(self, self.max_routers)
@@ -278,7 +278,20 @@ class RouterEngine:
self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
self.node_tracker.lost_node(rid)
- def node_updated(self, address, reachable, neighbor, link_bit, router_bit):
- self.log(LOG_DEBUG, "Event: node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \
- (address, reachable, neighbor, link_bit, router_bit))
- self.router_adapter.node_updated(address, reachable, neighbor, link_bit, router_bit)
+ def add_neighbor_router(self, address, router_bit, link_bit):
+ self.log(LOG_DEBUG, "Event: add_neighbor_router: address=%s, router_bit=%d, link_bit=%d" % \
+ (address, router_bit, link_bit))
+ self.router_adapter.add_neighbor_router(address, router_bit, link_bit)
+
+ def del_neighbor_router(self, router_bit):
+ self.log(LOG_DEBUG, "Event: del_neighbor_router: router_bit=%d" % router_bit)
+ self.router_adapter.del_neighbor_router(router_bit)
+
+ def add_remote_router(self, address, router_bit):
+ self.log(LOG_DEBUG, "Event: add_remote_router: address=%s, router_bit=%d" % (address, router_bit))
+ self.router_adapter.add_remote_router(address, router_bit)
+
+ def del_remote_router(self, router_bit):
+ self.log(LOG_DEBUG, "Event: del_remote_router: router_bit=%d" % router_bit)
+ self.router_adapter.del_remote_router(router_bit)
+
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py b/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
index 39b34bbc9b..af974b402a 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
@@ -27,8 +27,9 @@ class RoutingTableEngine(object):
This module is responsible for converting the set of next hops to remote routers to a routing
table in the "topological" address class.
"""
- def __init__(self, container):
+ def __init__(self, container, node_tracker):
self.container = container
+ self.node_tracker = node_tracker
self.id = self.container.id
self.area = self.container.area
self.next_hops = {}
@@ -41,14 +42,10 @@ class RoutingTableEngine(object):
def next_hops_changed(self, next_hops):
# Convert next_hops into routing table
self.next_hops = next_hops
- new_table = []
for _id, next_hop in next_hops.items():
- new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop))
- pair = ('_topo.%s.all' % (self.area), next_hop)
- if new_table.count(pair) == 0:
- new_table.append(pair)
-
- self.container.remote_routes_changed('topological', new_table)
+ mb_id = self.node_tracker.maskbit_for_node(_id)
+ mb_nh = self.node_tracker.maskbit_for_node(next_hop)
+ self.container.router_adapter.set_next_hop(mb_id, mb_nh)
def get_next_hops(self):
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c
index 2063b8fcc4..3aad9f3427 100644
--- a/qpid/extras/dispatch/src/agent.c
+++ b/qpid/extras/dispatch/src/agent.c
@@ -295,7 +295,7 @@ dx_agent_class_t *dx_agent_register_class(dx_dispatch_t *dx,
cls->query_handler = query_handler;
dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL);
- int result = hash_insert_const(agent->class_hash, iter, cls);
+ int result = hash_insert_const(agent->class_hash, iter, cls, 0);
dx_field_iterator_free(iter);
if (result < 0)
assert(false);
diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c
index 3500e08406..123e2ffb60 100644
--- a/qpid/extras/dispatch/src/container.c
+++ b/qpid/extras/dispatch/src/container.c
@@ -507,7 +507,7 @@ int dx_container_register_node_type(dx_dispatch_t *dx, const dx_node_type_t *nt)
nt_item->ntype = nt;
sys_mutex_lock(container->lock);
- result = hash_insert_const(container->node_type_map, iter, nt);
+ result = hash_insert_const(container->node_type_map, iter, nt, 0);
DEQ_INSERT_TAIL(container->node_type_list, nt_item);
sys_mutex_unlock(container->lock);
@@ -565,7 +565,7 @@ dx_node_t *dx_container_create_node(dx_dispatch_t *dx,
if (name) {
dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL);
sys_mutex_lock(container->lock);
- result = hash_insert(container->node_map, iter, node);
+ result = hash_insert(container->node_map, iter, node, 0);
sys_mutex_unlock(container->lock);
dx_field_iterator_free(iter);
if (result < 0) {
diff --git a/qpid/extras/dispatch/src/hash.c b/qpid/extras/dispatch/src/hash.c
index 1f7d8aa3f5..49f79925f6 100644
--- a/qpid/extras/dispatch/src/hash.c
+++ b/qpid/extras/dispatch/src/hash.c
@@ -52,6 +52,15 @@ struct hash_t {
};
+struct hash_handle_t {
+ bucket_t *bucket;
+ hash_item_t *item;
+};
+
+ALLOC_DECLARE(hash_handle_t);
+ALLOC_DEFINE(hash_handle_t);
+
+
// djb2 hash algorithm
static unsigned long hash_function(dx_field_iterator_t *iter)
{
@@ -102,7 +111,7 @@ size_t hash_size(hash_t *h)
}
-static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists)
+static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists, hash_handle_t **handle)
{
unsigned long idx = hash_function(key) & h->bucket_mask;
hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
@@ -115,6 +124,8 @@ static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, in
if (item) {
*exists = 1;
+ if (handle)
+ *handle = 0;
return item;
}
@@ -128,14 +139,24 @@ static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, in
DEQ_INSERT_TAIL(h->buckets[idx].items, item);
h->size++;
*exists = 0;
+
+ //
+ // If a pointer to a handle-pointer was supplied, create a handle for this item.
+ //
+ if (handle) {
+ *handle = new_hash_handle_t();
+ (*handle)->bucket = &h->buckets[idx];
+ (*handle)->item = item;
+ }
+
return item;
}
-dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
+dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val, hash_handle_t **handle)
{
int exists = 0;
- hash_item_t *item = hash_internal_insert(h, key, &exists);
+ hash_item_t *item = hash_internal_insert(h, key, &exists, handle);
if (!item)
return DX_ERROR_ALLOC;
@@ -149,12 +170,12 @@ dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
}
-dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val)
+dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val, hash_handle_t **handle)
{
assert(h->is_const);
int error = 0;
- hash_item_t *item = hash_internal_insert(h, key, &error);
+ hash_item_t *item = hash_internal_insert(h, key, &error, handle);
if (item)
item->v.val_const = val;
@@ -225,3 +246,31 @@ dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key)
return DX_ERROR_NOT_FOUND;
}
+
+void hash_handle_free(hash_handle_t *handle)
+{
+ if (handle)
+ free_hash_handle_t(handle);
+}
+
+
+const unsigned char *hash_key_by_handle(const hash_handle_t *handle)
+{
+ if (handle)
+ return handle->item->key;
+ return 0;
+}
+
+
+dx_error_t hash_remove_by_handle(hash_t *h, hash_handle_t *handle)
+{
+ if (!handle)
+ return DX_ERROR_NOT_FOUND;
+ free(handle->item->key);
+ DEQ_REMOVE(handle->bucket->items, handle->item);
+ free_hash_item_t(handle->item);
+ h->size--;
+ free_hash_handle_t(handle);
+ return DX_ERROR_NONE;
+}
+
diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c
index a3c41f86f5..c7c8f82dfc 100644
--- a/qpid/extras/dispatch/src/python_embedded.c
+++ b/qpid/extras/dispatch/src/python_embedded.c
@@ -398,7 +398,8 @@ typedef struct {
PyObject *handler;
PyObject *handler_rx_call;
dx_dispatch_t *dx;
- dx_address_t *address;
+ Py_ssize_t addr_count;
+ dx_address_t **addrs;
} IoAdapter;
@@ -469,25 +470,35 @@ static void dx_io_rx_handler(void *context, dx_message_t *msg, int link_id)
static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds)
{
- const char *address;
- if (!PyArg_ParseTuple(args, "Os", &self->handler, &address))
+ PyObject *addrs;
+ if (!PyArg_ParseTuple(args, "OO", &self->handler, &addrs))
return -1;
self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive");
if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call))
return -1;
+ if (!PyTuple_Check(addrs))
+ return -1;
+
Py_INCREF(self->handler);
Py_INCREF(self->handler_rx_call);
- self->dx = dispatch;
- self->address = dx_router_register_address(self->dx, address, dx_io_rx_handler, self);
+ self->dx = dispatch;
+ self->addr_count = PyTuple_Size(addrs);
+ self->addrs = NEW_PTR_ARRAY(dx_address_t, self->addr_count);
+ for (Py_ssize_t idx = 0; idx < self->addr_count; idx++)
+ self->addrs[idx] = dx_router_register_address(self->dx,
+ PyString_AS_STRING(PyTuple_GetItem(addrs, idx)),
+ dx_io_rx_handler, self);
return 0;
}
static void IoAdapter_dealloc(IoAdapter* self)
{
- dx_router_unregister_address(self->address);
+ for (Py_ssize_t idx = 0; idx < self->addr_count; idx++)
+ dx_router_unregister_address(self->addrs[idx]);
+ free(self->addrs);
Py_DECREF(self->handler);
Py_DECREF(self->handler_rx_call);
self->ob_type->tp_free((PyObject*)self);
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index fea3695620..35fb834029 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -28,8 +28,9 @@
static char *module = "ROUTER";
-static char *local_prefix = "_local/";
-static char *topo_prefix = "_topo/";
+static char *local_prefix = "_local/";
+static char *topo_prefix = "_topo/";
+static char *direct_prefix;
/**
* Address Types and Processing:
@@ -55,7 +56,7 @@ ALLOC_DEFINE(dx_address_t);
ALLOC_DEFINE(dx_router_conn_t);
-static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
{
dx_router_link_ref_t *ref = new_dx_router_link_ref_t();
DEQ_ITEM_INIT(ref);
@@ -65,7 +66,7 @@ static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_ro
}
-static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
{
if (link->ref) {
DEQ_REMOVE(*ref_list, link->ref);
@@ -75,6 +76,31 @@ static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_ro
}
+void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode)
+{
+ dx_router_ref_t *ref = new_dx_router_ref_t();
+ DEQ_ITEM_INIT(ref);
+ ref->router = rnode;
+ rnode->ref_count++;
+ DEQ_INSERT_TAIL(*ref_list, ref);
+}
+
+
+void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode)
+{
+ dx_router_ref_t *ref = DEQ_HEAD(*ref_list);
+ while (ref) {
+ if (ref->router == rnode) {
+ DEQ_REMOVE(*ref_list, ref);
+ free_dx_router_ref_t(ref);
+ rnode->ref_count--;
+ break;
+ }
+ ref = DEQ_NEXT(ref);
+ }
+}
+
+
/**
* Check an address to see if it no longer has any associated destinations.
* Depending on its policy, the address may be eligible for being closed out
@@ -262,10 +288,11 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
}
-static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
+static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg)
{
- dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
- dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+ dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
+ dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+ dx_field_iterator_t *ingress_iter = 0;
dx_parsed_field_t *trace = 0;
dx_parsed_field_t *ingress = 0;
@@ -293,7 +320,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
trace_item = dx_parse_sub_value(trace, idx);
}
- dx_compose_insert_string(out_da, router->router_id);
+ dx_compose_insert_string(out_da, direct_prefix);
dx_compose_end_list(out_da);
}
@@ -303,15 +330,17 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
//
dx_compose_insert_string(out_da, DX_DA_INGRESS);
if (ingress && dx_parse_is_scalar(ingress)) {
- dx_field_iterator_t *iter = dx_parse_raw(ingress);
- dx_compose_insert_string_iterator(out_da, iter);
+ ingress_iter = dx_parse_raw(ingress);
+ dx_compose_insert_string_iterator(out_da, ingress_iter);
} else
- dx_compose_insert_string(out_da, router->router_id);
+ dx_compose_insert_string(out_da, direct_prefix);
dx_compose_end_map(out_da);
dx_message_set_delivery_annotations(msg, out_da);
dx_compose_free(out_da);
+
+ return ingress_iter;
}
@@ -394,7 +423,8 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
hash_retrieve(router->addr_hash, iter, (void*) &addr);
dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
- int is_local = dx_field_iterator_prefix(iter, local_prefix);
+ int is_local = dx_field_iterator_prefix(iter, local_prefix);
+ int is_direct = dx_field_iterator_prefix(iter, direct_prefix);
dx_field_iterator_free(iter);
if (addr) {
@@ -404,9 +434,10 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
//
//
- // Interpret and update the delivery annotations of the message
+ // Interpret and update the delivery annotations of the message. As a convenience,
+ // this function returns the iterator to the ingress field (if it exists).
//
- router_annotate_message(router, msg);
+ dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg);
//
// Forward to the in-process handler for this message if there is one. The
@@ -446,31 +477,54 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
}
//
- // Forward to the next-hops for remote destinations.
+ // If the address form is direct to this router node, don't relay it on
+ // to any other part of the network.
//
- dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
- dx_router_link_t *dest_link;
- while (dest_node_ref) {
- if (dest_node_ref->router->next_hop)
- dest_link = dest_node_ref->router->next_hop->peer_link;
- else
- dest_link = dest_node_ref->router->peer_link;
- if (dest_link) {
- dx_routed_event_t *re = new_dx_routed_event_t();
- DEQ_ITEM_INIT(re);
- re->delivery = 0;
- re->message = dx_message_copy(msg);
- re->settle = 0;
- re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
-
- fanout++;
- if (fanout == 1)
- re->delivery = delivery;
-
- dx_link_activate(dest_link->link);
+ if (!is_direct) {
+ //
+ // Get the mask bit associated with the ingress router for the message.
+ // This will be compared against the "valid_origin" masks for each
+ // candidate destination router.
+ //
+ int origin = -1;
+ if (ingress_iter) {
+ dx_address_t *origin_addr;
+ hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr);
+ if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) {
+ dx_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes);
+ origin = rref->router->mask_bit;
+ }
+ }
+
+ //
+ // Forward to the next-hops for remote destinations.
+ //
+ if (origin >= 0) {
+ dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
+ dx_router_link_t *dest_link;
+ while (dest_node_ref) {
+ if (dest_node_ref->router->next_hop)
+ dest_link = dest_node_ref->router->next_hop->peer_link;
+ else
+ dest_link = dest_node_ref->router->peer_link;
+ if (dest_link && dx_bitmask_value(dest_node_ref->router->valid_origins, origin)) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ fanout++;
+ if (fanout == 1)
+ re->delivery = delivery;
+
+ dx_link_activate(dest_link->link);
+ }
+ dest_node_ref = DEQ_NEXT(dest_node_ref);
+ }
}
- dest_node_ref = DEQ_NEXT(dest_node_ref);
}
}
}
@@ -633,10 +687,10 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
if (is_router) {
//
- // If this is a router link, put it in the router_address link-list.
+ // If this is a router link, put it in the hello_address link-list.
//
- dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
- rlink->owning_addr = router->router_addr;
+ dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
+ rlink->owning_addr = router->hello_addr;
router->out_links_by_mask_bit[rlink->mask_bit] = rlink;
} else {
@@ -667,7 +721,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- hash_insert(router->addr_hash, iter, addr);
+ hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
}
dx_field_iterator_free(iter);
@@ -806,7 +860,7 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
rlink->mask_bit = mask_bit;
rlink->link_type = DX_LINK_ROUTER;
rlink->link_direction = DX_OUTGOING;
- rlink->owning_addr = router->router_addr;
+ rlink->owning_addr = router->hello_addr;
rlink->link = sender;
rlink->connected_link = 0;
rlink->peer_link = 0;
@@ -814,9 +868,9 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
DEQ_INIT(rlink->msg_fifo);
//
- // Add the new outgoing link to the router_address's list of links.
+ // Add the new outgoing link to the hello_address's list of links.
//
- dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
+ dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
//
// Index this link from the by-maskbit index so we can later find it quickly
@@ -867,6 +921,14 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
dx_container_register_node_type(dx, &router_node);
}
+ size_t dplen = 9 + strlen(area) + strlen(id);
+ direct_prefix = (char*) malloc(dplen);
+ strcpy(direct_prefix, "_topo/");
+ strcat(direct_prefix, area);
+ strcat(direct_prefix, "/");
+ strcat(direct_prefix, id);
+ strcat(direct_prefix, "/");
+
dx_router_t *router = NEW(dx_router_t);
router_node.type_context = router;
@@ -883,8 +945,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
DEQ_INIT(router->routers);
router->out_links_by_mask_bit = NEW_PTR_ARRAY(dx_router_link_t, dx_bitmask_width());
- for (int idx = 0; idx < dx_bitmask_width(); idx++)
+ router->routers_by_mask_bit = NEW_PTR_ARRAY(dx_router_node_t, dx_bitmask_width());
+ for (int idx = 0; idx < dx_bitmask_width(); idx++) {
router->out_links_by_mask_bit[idx] = 0;
+ router->routers_by_mask_bit[idx] = 0;
+ }
router->neighbor_free_mask = dx_bitmask(1);
router->lock = sys_mutex();
@@ -894,10 +959,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
router->pyTick = 0;
//
- // Create an address for all of the routers in the topology. It will be registered
+ // Create addresses for all of the routers in the topology. It will be registered
// locally later in the initialization sequence.
//
router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0);
+ router->hello_addr = dx_router_register_address(dx, "qdxhello", 0, 0);
//
// Inform the field iterator module of this router's id and area. The field iterator
@@ -935,8 +1001,7 @@ void dx_router_free(dx_router_t *router)
const char *dx_router_id(const dx_dispatch_t *dx)
{
- dx_router_t *router = dx->router;
- return router->router_id;
+ return direct_prefix;
}
@@ -963,7 +1028,7 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- hash_insert(router->addr_hash, iter, addr);
+ hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(router->addrs, addr);
}
diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h
index 7f1f7486fb..a21e4f1d5c 100644
--- a/qpid/extras/dispatch/src/router_private.h
+++ b/qpid/extras/dispatch/src/router_private.h
@@ -66,10 +66,11 @@ DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
struct dx_router_node_t {
DEQ_LINKS(dx_router_node_t);
- const char *id;
+ dx_address_t *owning_addr;
int mask_bit;
dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node
dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node
+ uint32_t ref_count;
dx_bitmask_t *valid_origins;
};
@@ -107,6 +108,7 @@ struct dx_address_t {
void *handler_context; // In-Process Consumer context
dx_router_link_ref_list_t rlinks; // Locally-Connected Consumers
dx_router_ref_list_t rnodes; // Remotely-Connected Consumers
+ hash_handle_t *hash_handle; // Linkage back to the hash table entry
};
ALLOC_DECLARE(dx_address_t);
@@ -122,10 +124,12 @@ struct dx_router_t {
dx_address_list_t addrs;
hash_t *addr_hash;
dx_address_t *router_addr;
+ dx_address_t *hello_addr;
dx_router_link_list_t links;
dx_router_node_list_t routers;
dx_router_link_t **out_links_by_mask_bit;
+ dx_router_node_t **routers_by_mask_bit;
dx_bitmask_t *neighbor_free_mask;
sys_mutex_t *lock;
@@ -136,4 +140,12 @@ struct dx_router_t {
PyObject *pyTick;
};
+
+void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
+void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
+
+void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
+void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
+
+
#endif
diff --git a/qpid/extras/dispatch/src/router_pynode.c b/qpid/extras/dispatch/src/router_pynode.c
index 9930c5dde9..7ada6dad4b 100644
--- a/qpid/extras/dispatch/src/router_pynode.c
+++ b/qpid/extras/dispatch/src/router_pynode.c
@@ -34,34 +34,268 @@ typedef struct {
} RouterAdapter;
-static PyObject* dx_router_node_updated(PyObject *self, PyObject *args)
+static char *dx_add_router(dx_router_t *router, const char *address, int router_maskbit, int link_maskbit)
{
- //RouterAdapter *adapter = (RouterAdapter*) self;
- //dx_router_t *router = adapter->router;
+ if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0)
+ return "Router bit mask out of range";
+
+ if (link_maskbit >= dx_bitmask_width() || link_maskbit < -1)
+ return "Link bit mask out of range";
+
+ sys_mutex_lock(router->lock);
+ if (router->routers_by_mask_bit[router_maskbit] != 0) {
+ sys_mutex_unlock(router->lock);
+ return "Adding router over already existing router";
+ }
+
+ if (link_maskbit >= 0 && router->out_links_by_mask_bit[link_maskbit] == 0) {
+ sys_mutex_unlock(router->lock);
+ return "Adding neighbor router with invalid link reference";
+ }
+
+ //
+ // Hash lookup the address to ensure there isn't an existing router address.
+ //
+ dx_field_iterator_t *iter = dx_field_iterator_string(address, ITER_VIEW_ADDRESS_HASH);
+ dx_address_t *addr;
+
+ hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ assert(addr == 0);
+
+ //
+ // Create an address record for this router and insert it in the hash table.
+ // This record will be found whenever a "foreign" topological address to this
+ // remote router is looked up.
+ //
+ addr = new_dx_address_t();
+ DEQ_ITEM_INIT(addr);
+ addr->handler = 0;
+ addr->handler_context = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(router->addrs, addr);
+
+ //
+ // Create a router-node record to represent the remote router.
+ //
+ dx_router_node_t *rnode = new_dx_router_node_t();
+ DEQ_ITEM_INIT(rnode);
+ rnode->owning_addr = addr;
+ rnode->mask_bit = router_maskbit;
+ rnode->next_hop = 0;
+ rnode->peer_link = 0;
+ rnode->ref_count = 0;
+ rnode->valid_origins = dx_bitmask(0);
+
+ DEQ_INSERT_TAIL(router->routers, rnode);
+
+ //
+ // Link the router record to the address record.
+ //
+ dx_router_add_node_ref_LH(&addr->rnodes, rnode);
+
+ //
+ // Link the router record to the router address record.
+ //
+ dx_router_add_node_ref_LH(&router->router_addr->rnodes, rnode);
+
+ //
+ // Add the router record to the mask-bit index.
+ //
+ router->routers_by_mask_bit[router_maskbit] = rnode;
+
+ //
+ // If this is a neighbor router, add the peer_link reference to the
+ // router record.
+ //
+ if (link_maskbit >= 0)
+ rnode->peer_link = router->out_links_by_mask_bit[link_maskbit];
+
+ sys_mutex_unlock(router->lock);
+ return 0;
+}
+
+
+static char *dx_del_router(dx_router_t *router, int router_maskbit)
+{
+ if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0)
+ return "Router bit mask out of range";
+
+ sys_mutex_lock(router->lock);
+ if (router->routers_by_mask_bit[router_maskbit] == 0) {
+ sys_mutex_unlock(router->lock);
+ return "Deleting nonexistent router";
+ }
+
+ dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
+ dx_address_t *oaddr = rnode->owning_addr;
+ assert(oaddr);
+
+ //
+ // Unlink the router node from the address record
+ //
+ dx_router_del_node_ref_LH(&oaddr->rnodes, rnode);
+
+ //
+ // While the router node has a non-zero reference count, look for addresses
+ // to unlink the node from.
+ //
+ dx_address_t *addr = DEQ_HEAD(router->addrs);
+ while (addr && rnode->ref_count > 0) {
+ dx_router_del_node_ref_LH(&addr->rnodes, rnode);
+ addr = DEQ_NEXT(addr);
+ }
+ assert(rnode->ref_count == 0);
+
+ //
+ // Free the router node and the owning address records.
+ //
+ dx_bitmask_free(rnode->valid_origins);
+ free_dx_router_node_t(rnode);
+
+ hash_remove_by_handle(router->addr_hash, oaddr->hash_handle);
+ DEQ_REMOVE(router->addrs, oaddr);
+ hash_handle_free(oaddr->hash_handle);
+ router->routers_by_mask_bit[router_maskbit] = 0;
+ free_dx_address_t(oaddr);
+
+ sys_mutex_unlock(router->lock);
+ return 0;
+}
+
+
+static PyObject* dx_add_remote_router(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
const char *address;
- int is_reachable;
- int is_neighbor;
- int link_maskbit;
int router_maskbit;
- if (!PyArg_ParseTuple(args, "siiii", &address, &is_reachable, &is_neighbor,
- &link_maskbit, &router_maskbit))
+ if (!PyArg_ParseTuple(args, "si", &address, &router_maskbit))
return 0;
- // TODO
+ char *error = dx_add_router(router, address, router_maskbit, -1);
+ if (error) {
+ PyErr_SetString(PyExc_Exception, error);
+ return 0;
+ }
Py_INCREF(Py_None);
return Py_None;
}
-static PyObject* dx_router_add_route(PyObject *self, PyObject *args)
+static PyObject* dx_del_remote_router(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ int router_maskbit;
+
+ if (!PyArg_ParseTuple(args, "i", &router_maskbit))
+ return 0;
+
+ char *error = dx_del_router(router, router_maskbit);
+ if (error) {
+ PyErr_SetString(PyExc_Exception, error);
+ return 0;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_set_next_hop(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ int router_maskbit;
+ int next_hop_maskbit;
+
+ if (!PyArg_ParseTuple(args, "ii", &router_maskbit, &next_hop_maskbit))
+ return 0;
+
+ if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
+ return 0;
+ }
+
+ if (next_hop_maskbit >= dx_bitmask_width() || next_hop_maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Next Hop bit mask out of range");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[router_maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Router Not Found");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[next_hop_maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Next Hop Not Found");
+ return 0;
+ }
+
+ if (router_maskbit != next_hop_maskbit) {
+ dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
+ rnode->next_hop = router->routers_by_mask_bit[next_hop_maskbit];
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_add_neighbor_router(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ const char *address;
+ int router_maskbit;
+ int link_maskbit;
+
+ if (!PyArg_ParseTuple(args, "sii", &address, &router_maskbit, &link_maskbit))
+ return 0;
+
+ char *error = dx_add_router(router, address, router_maskbit, link_maskbit);
+ if (error) {
+ PyErr_SetString(PyExc_Exception, error);
+ return 0;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_del_neighbor_router(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ int router_maskbit;
+
+ if (!PyArg_ParseTuple(args, "i", &router_maskbit))
+ return 0;
+
+ char *error = dx_del_router(router, router_maskbit);
+ if (error) {
+ PyErr_SetString(PyExc_Exception, error);
+ return 0;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_map_destination(PyObject *self, PyObject *args)
{
//RouterAdapter *adapter = (RouterAdapter*) self;
- const char *addr;
- const char *peer;
+ //dx_router_t *router = adapter->router;
+ const char *addr;
+ int router_maskbit;
- if (!PyArg_ParseTuple(args, "ss", &addr, &peer))
+ if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
return 0;
// TODO
@@ -71,13 +305,14 @@ static PyObject* dx_router_add_route(PyObject *self, PyObject *args)
}
-static PyObject* dx_router_del_route(PyObject *self, PyObject *args)
+static PyObject* dx_unmap_destination(PyObject *self, PyObject *args)
{
//RouterAdapter *adapter = (RouterAdapter*) self;
- const char *addr;
- const char *peer;
+ //dx_router_t *router = adapter->router;
+ const char *addr;
+ int router_maskbit;
- if (!PyArg_ParseTuple(args, "ss", &addr, &peer))
+ if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
return 0;
// TODO
@@ -88,9 +323,13 @@ static PyObject* dx_router_del_route(PyObject *self, PyObject *args)
static PyMethodDef RouterAdapter_methods[] = {
- {"node_updated", dx_router_node_updated, METH_VARARGS, "Update the status of a remote router node"},
- {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"},
- {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"},
+ {"add_remote_router", dx_add_remote_router, METH_VARARGS, "A new remote/reachable router has been discovered"},
+ {"del_remote_router", dx_del_remote_router, METH_VARARGS, "We've lost reachability to a remote router"},
+ {"set_next_hop", dx_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"},
+ {"add_neighbor_router", dx_add_neighbor_router, METH_VARARGS, "A new neighbor router has been discovered"},
+ {"del_neighbor_router", dx_del_neighbor_router, METH_VARARGS, "We've lost reachability to a neighbor router"},
+ {"map_destination", dx_map_destination, METH_VARARGS, "Add a newly discovered destination mapping"},
+ {"unmap_destination", dx_unmap_destination, METH_VARARGS, "Delete a destination mapping"},
{0, 0, 0, 0}
};
diff --git a/qpid/extras/dispatch/tests/field_test.c b/qpid/extras/dispatch/tests/field_test.c
index 59fc3c4cfb..33f9fe5d03 100644
--- a/qpid/extras/dispatch/tests/field_test.c
+++ b/qpid/extras/dispatch/tests/field_test.c
@@ -115,6 +115,10 @@ static char* test_view_address_hash(void *context)
{"amqp:/_topo/my-area/all/local/sub", "Llocal/sub"},
{"amqp:/_topo/all/all/local/sub", "Llocal/sub"},
{"amqp://host:port/_local/my-addr", "Lmy-addr"},
+ {"_topo/area/router/my-addr", "Aarea"},
+ {"_topo/my-area/router/my-addr", "Rrouter"},
+ {"_topo/my-area/my-router/my-addr", "Lmy-addr"},
+ {"_topo/my-area/router", "Rrouter"},
{0, 0}
};
int idx;
diff --git a/qpid/extras/dispatch/tests/router_engine_test.py b/qpid/extras/dispatch/tests/router_engine_test.py
index 7e3940fda4..47917b0f5f 100644
--- a/qpid/extras/dispatch/tests/router_engine_test.py
+++ b/qpid/extras/dispatch/tests/router_engine_test.py
@@ -97,62 +97,74 @@ class NodeTrackerTest(unittest.TestCase):
def log(self, level, text):
pass
- def node_updated(self, address, reachable, neighbor, link_bit, router_bit):
+ def add_neighbor_router(self, address, router_bit, link_bit):
self.address = address
- self.reachable = reachable
- self.neighbor = neighbor
+ self.router_bit = router_bit
self.link_bit = link_bit
+ self.calls += 1
+
+ def del_neighbor_router(self, router_bit):
+ self.address = None
self.router_bit = router_bit
+ self.link_bit = None
+ self.calls += 1
- def reset(self):
+ def add_remote_router(self, address, router_bit):
+ self.address = address
+ self.router_bit = router_bit
+ self.link_bit = None
+ self.calls += 1
+
+ def del_remote_router(self, router_bit):
self.address = None
- self.reachable = None
- self.neighbor = None
+ self.router_bit = router_bit
self.link_bit = None
+ self.calls += 1
+
+ def reset(self):
+ self.address = None
self.router_bit = None
+ self.link_bit = None
+ self.area = "area"
+ self.calls = 0
def test_node_tracker_limits(self):
tracker = NodeTracker(self, 5)
self.reset()
tracker.new_neighbor('A', 1)
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/A')
self.assertEqual(self.link_bit, 1)
self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_neighbor('B', 5)
- self.assertEqual(self.address, 'RB')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/B')
self.assertEqual(self.link_bit, 5)
self.assertEqual(self.router_bit, 1)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_neighbor('C', 6)
- self.assertEqual(self.address, 'RC')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/C')
self.assertEqual(self.link_bit, 6)
self.assertEqual(self.router_bit, 2)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_neighbor('D', 7)
- self.assertEqual(self.address, 'RD')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/D')
self.assertEqual(self.link_bit, 7)
self.assertEqual(self.router_bit, 3)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_neighbor('E', 8)
- self.assertEqual(self.address, 'RE')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/E')
self.assertEqual(self.link_bit, 8)
self.assertEqual(self.router_bit, 4)
+ self.assertEqual(self.calls, 1)
self.reset()
try:
@@ -163,16 +175,15 @@ class NodeTrackerTest(unittest.TestCase):
self.reset()
tracker.lost_neighbor('C')
- self.assertEqual(self.address, 'RC')
- self.assertFalse(self.reachable)
+ self.assertEqual(self.router_bit, 2)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_neighbor('F', 9)
- self.assertEqual(self.address, 'RF')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/F')
self.assertEqual(self.link_bit, 9)
self.assertEqual(self.router_bit, 2)
+ self.assertEqual(self.calls, 1)
def test_node_tracker_remote_neighbor(self):
@@ -180,32 +191,29 @@ class NodeTrackerTest(unittest.TestCase):
self.reset()
tracker.new_node('A')
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertFalse(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/A')
self.assertFalse(self.link_bit)
self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_neighbor('A', 3)
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/A')
self.assertEqual(self.link_bit, 3)
self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 2)
self.reset()
tracker.lost_node('A')
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
- self.assertEqual(self.link_bit, 3)
- self.assertEqual(self.router_bit, 0)
+ self.assertFalse(self.address)
+ self.assertFalse(self.link_bit)
+ self.assertFalse(self.router_bit)
+ self.assertEqual(self.calls, 0)
self.reset()
tracker.lost_neighbor('A')
- self.assertEqual(self.address, 'RA')
- self.assertFalse(self.reachable)
+ self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 1)
def test_node_tracker_neighbor_remote(self):
@@ -213,32 +221,28 @@ class NodeTrackerTest(unittest.TestCase):
self.reset()
tracker.new_neighbor('A', 3)
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/A')
self.assertEqual(self.link_bit, 3)
self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_node('A')
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
- self.assertEqual(self.link_bit, 3)
- self.assertEqual(self.router_bit, 0)
+ self.assertFalse(self.address)
+ self.assertFalse(self.link_bit)
+ self.assertFalse(self.router_bit)
+ self.assertEqual(self.calls, 0)
self.reset()
tracker.lost_neighbor('A')
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertFalse(self.neighbor)
- self.assertFalse(self.link_bit)
+ self.assertEqual(self.address, 'amqp:/_topo/area/A')
self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 2)
self.reset()
tracker.lost_node('A')
- self.assertEqual(self.address, 'RA')
- self.assertFalse(self.reachable)
+ self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 1)
class NeighborTest(unittest.TestCase):
@@ -274,7 +278,7 @@ class NeighborTest(unittest.TestCase):
self.engine.tick(1.5)
self.assertEqual(len(self.sent), 1)
dest, msg = self.sent.pop(0)
- self.assertEqual(dest, "_local/qdxrouter")
+ self.assertEqual(dest, "amqp:/_local/qdxhello")
self.assertEqual(msg.get_opcode(), "HELLO")
self.assertEqual(msg.id, self.id)
self.assertEqual(msg.area, self.area)
diff --git a/qpid/extras/dispatch/tests/system_tests_one_router.py b/qpid/extras/dispatch/tests/system_tests_one_router.py
index b25568d283..8d486e2b85 100644
--- a/qpid/extras/dispatch/tests/system_tests_one_router.py
+++ b/qpid/extras/dispatch/tests/system_tests_one_router.py
@@ -310,7 +310,7 @@ class RouterTest(unittest.TestCase):
self.assertEqual(i, rm.body['number'])
da = rm.instructions
self.assertEqual(da.__class__, dict)
- self.assertEqual(da['qdx.ingress'], 'Qpid.Dispatch.Router.A')
+ self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/')
self.assertFalse('qdx.trace' in da)
##
@@ -346,7 +346,7 @@ class RouterTest(unittest.TestCase):
self.assertEqual(i, rm.body['number'])
da = rm.instructions
self.assertEqual(da.__class__, dict)
- self.assertEqual(da['qdx.ingress'], 'Qpid.Dispatch.Router.A')
+ self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/')
self.assertFalse('qdx.trace' in da)
##
@@ -364,8 +364,8 @@ class RouterTest(unittest.TestCase):
self.assertEqual(i, rm.body['number'])
da = rm.instructions
self.assertEqual(da.__class__, dict)
- self.assertEqual(da['qdx.ingress'], 'Qpid.Dispatch.Router.A')
- self.assertEqual(da['qdx.trace'], ['Qpid.Dispatch.Router.A'])
+ self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/')
+ self.assertEqual(da['qdx.trace'], ['_topo/area/Qpid.Dispatch.Router.A/'])
##
## Non-empty trace
@@ -382,8 +382,8 @@ class RouterTest(unittest.TestCase):
self.assertEqual(i, rm.body['number'])
da = rm.instructions
self.assertEqual(da.__class__, dict)
- self.assertEqual(da['qdx.ingress'], 'Qpid.Dispatch.Router.A')
- self.assertEqual(da['qdx.trace'], ['first.hop', 'Qpid.Dispatch.Router.A'])
+ self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/')
+ self.assertEqual(da['qdx.trace'], ['first.hop', '_topo/area/Qpid.Dispatch.Router.A/'])
M1.stop()
M2.stop()
diff --git a/qpid/extras/dispatch/tests/tworouters-A.conf b/qpid/extras/dispatch/tests/tworouters-A.conf
new file mode 100644
index 0000000000..b4a5770c2d
--- /dev/null
+++ b/qpid/extras/dispatch/tests/tworouters-A.conf
@@ -0,0 +1,54 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+
+##
+## Container section - Configures the general operation of the AMQP container.
+##
+container {
+ ##
+ ## worker-threads - The number of threads that will be created to
+ ## process message traffic and other application work (timers, non-amqp
+ ## file descriptors, etc.)
+ ##
+ ## The number of threads should be related to the number of available
+ ## processor cores. To fully utilize a quad-core system, set the
+ ## number of threads to 4.
+ ##
+ worker-threads: 4
+
+ ##
+ ## container-name - The name of the AMQP container. If not specified,
+ ## the container name will be set to a value of the container's
+ ## choosing. The automatically assigned container name is not
+ ## guaranteed to be persistent across restarts of the container.
+ ##
+ container-name: Qpid.Dispatch.Router.A
+}
+
+
+##
+## Listeners and Connectors
+##
+listener {
+ addr: 0.0.0.0
+ port: 20001
+ sasl-mechanisms: ANONYMOUS
+}
+
diff --git a/qpid/extras/dispatch/tests/tworouters-B.conf b/qpid/extras/dispatch/tests/tworouters-B.conf
new file mode 100644
index 0000000000..d56bd45078
--- /dev/null
+++ b/qpid/extras/dispatch/tests/tworouters-B.conf
@@ -0,0 +1,61 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+
+##
+## Container section - Configures the general operation of the AMQP container.
+##
+container {
+ ##
+ ## worker-threads - The number of threads that will be created to
+ ## process message traffic and other application work (timers, non-amqp
+ ## file descriptors, etc.)
+ ##
+ ## The number of threads should be related to the number of available
+ ## processor cores. To fully utilize a quad-core system, set the
+ ## number of threads to 4.
+ ##
+ worker-threads: 4
+
+ ##
+ ## container-name - The name of the AMQP container. If not specified,
+ ## the container name will be set to a value of the container's
+ ## choosing. The automatically assigned container name is not
+ ## guaranteed to be persistent across restarts of the container.
+ ##
+ container-name: Qpid.Dispatch.Router.B
+}
+
+
+##
+## Listeners and Connectors
+##
+listener {
+ addr: 0.0.0.0
+ port: 20002
+ sasl-mechanisms: ANONYMOUS
+}
+
+connector {
+ label: Router Uplink
+ addr: 0.0.0.0
+ port: 20001
+ sasl-mechanisms: ANONYMOUS
+}
+