summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/extras/dispatch/TODO47
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/compose.h3
-rw-r--r--qpid/extras/dispatch/include/qpid/dispatch/hash.h12
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/link.py6
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py6
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py2
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/node.py89
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py27
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/routing.py13
-rw-r--r--qpid/extras/dispatch/src/agent.c2
-rw-r--r--qpid/extras/dispatch/src/container.c4
-rw-r--r--qpid/extras/dispatch/src/hash.c59
-rw-r--r--qpid/extras/dispatch/src/python_embedded.c23
-rw-r--r--qpid/extras/dispatch/src/router_node.c163
-rw-r--r--qpid/extras/dispatch/src/router_private.h14
-rw-r--r--qpid/extras/dispatch/src/router_pynode.c279
-rw-r--r--qpid/extras/dispatch/tests/field_test.c4
-rw-r--r--qpid/extras/dispatch/tests/router_engine_test.py112
-rw-r--r--qpid/extras/dispatch/tests/system_tests_one_router.py12
-rw-r--r--qpid/extras/dispatch/tests/tworouters-A.conf54
-rw-r--r--qpid/extras/dispatch/tests/tworouters-B.conf61
21 files changed, 776 insertions, 216 deletions
diff --git a/qpid/extras/dispatch/TODO b/qpid/extras/dispatch/TODO
index ab0c1a951e..0ae3988d29 100644
--- a/qpid/extras/dispatch/TODO
+++ b/qpid/extras/dispatch/TODO
@@ -5,3 +5,50 @@ enhancements to be fixed by going to the Apache Qpid JIRA instance:
http://issues.apache.org/jira/browse/QPID
==============================================================================
+
+- Router Mode:
+ o Stand-Alone-Router - Does not participate in routing protocol, does not permit inter-router
+ links, acts as a normal interior-router otherwise.
+ o Interior-Router - Participates in the routing protocol
+ o Edge-Concentrator - Does not participate in routing protocol, requires uplink connection(s)
+ This mode should be used when Dispatch is integrated into an endpoint
+ application or when it is acting as a connection concentrator.
+ Proxy and access-protocol functions will be available in this mode.
+
+- Connection Annotation:
+ o Type: Inter-router, uplink, endpoint, etc. This formal annotation can be accessed internally
+ by the connection handlers to guide Dispatch's handling of new connections.
+ o Weight-{in,out}: Weight/Cost metrics for inter-router links
+
+- Statistics for Instrumentation:
+ o Link
+ . delivery count {unsettled, pre-settled}
+ . deliveries {accepted, rejected, released, modified}
+ . octets of delivery {accepted, rejected, released, modified}
+ . flow frame count
+ . disposition frame count {forward, backward}
+ o Address
+ . deliveries {ingress, egress, transit}
+ . octets of delivery {ingress, egress, transit}
+
+- Infrastructure
+ o Router_Link - Buffer and Iterator for a copy of the link's target address (for use
+ as an address for messages with no 'to' field).
+ o Router Event Queue - Event queue to feed alerts to the Python router code.
+ Neighbor-link-loss is a valuable event because it accelerates the
+ detection of topology change.
+ o All PyRouter stimulus through a work queue.
+ o Router Code Updates
+ . Remove all vestiges of "binding"
+ . Calculate the valid-origin mask for each path
+ . Report address mappings to routers
+ o Expose idle-timeout/keepalive on connectors and listeners
+
+- Major Roadmap Features
+ o Security Policy Enforcement
+ o Proxy (Translation Node) Capability
+ o Address Provisioning with variable semantics
+ o Link Routing
+ o Management, Instrumentation, and Accounting
+ o Link Cost
+ o Area Routing
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/compose.h b/qpid/extras/dispatch/include/qpid/dispatch/compose.h
index 7d668f0ac0..d68f6be746 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/compose.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/compose.h
@@ -167,7 +167,8 @@ void dx_compose_insert_string(dx_composed_field_t *field, const char *value);
* Insert a utf8-encoded string into the field from an iterator
*
* @param field A field created by dx_compose.
- * @param value A pointer to a null-terminated string.
+ * @param iter An iterator for a string value. The caller is responsible for freeing
+ * this iterator after the call is complete.
*/
void dx_compose_insert_string_iterator(dx_composed_field_t *field, dx_field_iterator_t *iter);
diff --git a/qpid/extras/dispatch/include/qpid/dispatch/hash.h b/qpid/extras/dispatch/include/qpid/dispatch/hash.h
index bfad142517..bf22c2c603 100644
--- a/qpid/extras/dispatch/include/qpid/dispatch/hash.h
+++ b/qpid/extras/dispatch/include/qpid/dispatch/hash.h
@@ -23,16 +23,22 @@
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/error.h>
-typedef struct hash_t hash_t;
+typedef struct hash_t hash_t;
+typedef struct hash_handle_t hash_handle_t;
hash_t *hash(int bucket_exponent, int batch_size, int value_is_const);
void hash_free(hash_t *h);
size_t hash_size(hash_t *h);
-dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val);
-dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val);
+dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val, hash_handle_t **handle);
+dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val, hash_handle_t **handle);
dx_error_t hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val);
dx_error_t hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val);
dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key);
+void hash_handle_free(hash_handle_t *handle);
+const unsigned char *hash_key_by_handle(const hash_handle_t *handle);
+dx_error_t hash_remove_by_handle(hash_t *h, hash_handle_t *handle);
+
+
#endif
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/link.py b/qpid/extras/dispatch/python/qpid/dispatch/router/link.py
index 46e03379b4..4a770684cd 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/link.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/link.py
@@ -101,7 +101,7 @@ class LinkStateEngine(object):
if self.id not in self.collection:
return
my_ls = self.collection[self.id]
- self.container.send('_topo/%s/%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls))
+ self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls))
def new_local_link_state(self, link_state):
@@ -132,7 +132,7 @@ class LinkStateEngine(object):
def _send_lsrs(self):
for (_area, _id) in self.needed_lsrs.keys():
- self.container.send('_topo/%s/%s' % (_area, _id), MessageLSR(None, self.id, self.area))
+ self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageLSR(None, self.id, self.area))
self.needed_lsrs = {}
@@ -140,4 +140,4 @@ class LinkStateEngine(object):
ls_seq = 0
if self.id in self.collection:
ls_seq = self.collection[self.id].ls_seq
- self.container.send('_topo/%s/all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq))
+ self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq))
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
index 5413dd38a8..0707f7f250 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
@@ -56,7 +56,7 @@ class MobileAddressEngine(object):
##
if len(self.added_keys) > 0 or len(self.deleted_keys) > 0:
self.mobile_seq += 1
- self.container.send('_topo.%s.all' % self.area,
+ self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area,
MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys))
self.local_keys.extend(self.added_keys)
for key in self.deleted_keys:
@@ -162,7 +162,7 @@ class MobileAddressEngine(object):
if msg.id == self.id:
return
if msg.have_seq < self.mobile_seq:
- self.container.send('_topo.%s.%s' % (msg.area, msg.id),
+ self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id),
MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys))
@@ -183,6 +183,6 @@ class MobileAddressEngine(object):
def _send_mars(self):
for _id, _area, _seq in self.needed_mars.keys():
- self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
+ self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
self.needed_mars = {}
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py b/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py
index 6a16049065..37b7888efe 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py
@@ -49,7 +49,7 @@ class NeighborEngine(object):
if now - self.last_hello_time >= self.hello_interval:
self.last_hello_time = now
- self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys()))
+ self.container.send('amqp:/_local/qdxhello', MessageHELLO(None, self.id, self.area, self.hellos.keys()))
if self.link_state_changed:
self.link_state_changed = False
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
index 433ce3ab1e..b1f6df2662 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
@@ -50,10 +50,16 @@ class NodeTracker(object):
A node, designated by node_id, has been discovered as a neighbor over a link with
a maskbit of link_maskbit.
"""
- if node_id not in self.nodes:
- self.nodes[node_id] = RemoteNode(node_id, self._allocate_maskbit())
- self.nodes[node_id].set_neighbor(link_maskbit)
- self._notify(self.nodes[node_id])
+ if node_id in self.nodes:
+ node = self.nodes[node_id]
+ if node.neighbor:
+ return
+ self.container.del_remote_router(node.maskbit)
+ node.neighbor = True
+ else:
+ node = RemoteNode(node_id, self._allocate_maskbit(), True)
+ self.nodes[node_id] = node
+ self.container.add_neighbor_router(self._address(node_id), node.maskbit, link_maskbit)
def lost_neighbor(self, node_id):
@@ -61,9 +67,11 @@ class NodeTracker(object):
We have lost contact with a neighboring node node_id.
"""
node = self.nodes[node_id]
- node.clear_neighbor()
- self._notify(node)
- if node.to_delete():
+ node.neighbor = False
+ self.container.del_neighbor_router(node.maskbit)
+ if node.remote:
+ self.container.add_remote_router(self._address(node.id), node.maskbit)
+ else:
self._free_maskbit(node.maskbit)
self.nodes.pop(node_id)
@@ -74,9 +82,12 @@ class NodeTracker(object):
remote peer.
"""
if node_id not in self.nodes:
- self.nodes[node_id] = RemoteNode(node_id, self._allocate_maskbit())
- self.nodes[node_id].set_remote()
- self._notify(self.nodes[node_id])
+ node = RemoteNode(node_id, self._allocate_maskbit(), False)
+ self.nodes[node_id] = node
+ self.container.add_remote_router(self._address(node.id), node.maskbit)
+ else:
+ node = self.nodes[node_id]
+ node.remote = True
def lost_node(self, node_id):
@@ -84,11 +95,21 @@ class NodeTracker(object):
A remote node, node_id, has not been heard from for too long and is being deemed lost.
"""
node = self.nodes[node_id]
- node.clear_remote()
- self._notify(node)
- if node.to_delete():
- self._free_maskbit(node.maskbit)
- self.nodes.pop(node_id)
+ if node.remote:
+ node.remote = False
+ if not node.neighbor:
+ self.container.del_remote_router(node.maskbit)
+ self._free_maskbit(node.maskbit)
+ self.nodes.pop(node_id)
+
+
+ def maskbit_for_node(self, node_id):
+ """
+ """
+ node = self.nodes[node_id]
+ if node:
+ return node.maskbit
+ return None
def _allocate_maskbit(self):
@@ -110,39 +131,15 @@ class NodeTracker(object):
self.next_maskbit = i
- def _notify(self, node):
- if node.to_delete():
- self.container.node_updated("R%s" % node.id, 0, 0, 0, 0)
- else:
- is_neighbor = 0
- if node.neighbor:
- is_neighbor = 1
- self.container.node_updated("R%s" % node.id, 1, is_neighbor, node.link_maskbit, node.maskbit)
+ def _address(self, node_id):
+ return "amqp:/_topo/%s/%s" % (self.container.area, node_id)
class RemoteNode(object):
- def __init__(self, node_id, maskbit):
- self.id = node_id
- self.neighbor = None
- self.link_maskbit = None
- self.maskbit = maskbit
- self.remote = None
-
- def set_neighbor(self, link_maskbit):
- self.neighbor = True
- self.link_maskbit = link_maskbit
-
- def set_remote(self):
- self.remote = True
-
- def clear_neighbor(self):
- self.neighbor = None
- self.link_maskbit = None
-
- def clear_remote(self):
- self.remote = None
-
- def to_delete(self):
- return self.neighbor == None and self.remote == None
+ def __init__(self, node_id, maskbit, neighbor):
+ self.id = node_id
+ self.maskbit = maskbit
+ self.neighbor = neighbor
+ self.remote = not neighbor
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
index 6fde822444..c2c623f980 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
@@ -55,7 +55,7 @@ class RouterEngine:
self.domain = "domain"
self.router_adapter = router_adapter
self.log_adapter = LogAdapter("dispatch.router")
- self.io_adapter = IoAdapter(self, "qdxrouter")
+ self.io_adapter = IoAdapter(self, ("qdxrouter", "qdxhello"))
self.max_routers = max_routers
self.id = router_id
self.area = area
@@ -71,14 +71,14 @@ class RouterEngine:
##
## Launch the sub-module engines
##
+ self.node_tracker = NodeTracker(self, self.max_routers)
self.neighbor_engine = NeighborEngine(self)
self.link_state_engine = LinkStateEngine(self)
self.path_engine = PathEngine(self)
self.mobile_address_engine = MobileAddressEngine(self)
- self.routing_table_engine = RoutingTableEngine(self)
+ self.routing_table_engine = RoutingTableEngine(self, self.node_tracker)
self.binding_engine = BindingEngine(self)
self.adapter_engine = AdapterEngine(self)
- self.node_tracker = NodeTracker(self, self.max_routers)
@@ -278,7 +278,20 @@ class RouterEngine:
self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
self.node_tracker.lost_node(rid)
- def node_updated(self, address, reachable, neighbor, link_bit, router_bit):
- self.log(LOG_DEBUG, "Event: node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \
- (address, reachable, neighbor, link_bit, router_bit))
- self.router_adapter.node_updated(address, reachable, neighbor, link_bit, router_bit)
+ def add_neighbor_router(self, address, router_bit, link_bit):
+ self.log(LOG_DEBUG, "Event: add_neighbor_router: address=%s, router_bit=%d, link_bit=%d" % \
+ (address, router_bit, link_bit))
+ self.router_adapter.add_neighbor_router(address, router_bit, link_bit)
+
+ def del_neighbor_router(self, router_bit):
+ self.log(LOG_DEBUG, "Event: del_neighbor_router: router_bit=%d" % router_bit)
+ self.router_adapter.del_neighbor_router(router_bit)
+
+ def add_remote_router(self, address, router_bit):
+ self.log(LOG_DEBUG, "Event: add_remote_router: address=%s, router_bit=%d" % (address, router_bit))
+ self.router_adapter.add_remote_router(address, router_bit)
+
+ def del_remote_router(self, router_bit):
+ self.log(LOG_DEBUG, "Event: del_remote_router: router_bit=%d" % router_bit)
+ self.router_adapter.del_remote_router(router_bit)
+
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py b/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
index 39b34bbc9b..af974b402a 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py
@@ -27,8 +27,9 @@ class RoutingTableEngine(object):
This module is responsible for converting the set of next hops to remote routers to a routing
table in the "topological" address class.
"""
- def __init__(self, container):
+ def __init__(self, container, node_tracker):
self.container = container
+ self.node_tracker = node_tracker
self.id = self.container.id
self.area = self.container.area
self.next_hops = {}
@@ -41,14 +42,10 @@ class RoutingTableEngine(object):
def next_hops_changed(self, next_hops):
# Convert next_hops into routing table
self.next_hops = next_hops
- new_table = []
for _id, next_hop in next_hops.items():
- new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop))
- pair = ('_topo.%s.all' % (self.area), next_hop)
- if new_table.count(pair) == 0:
- new_table.append(pair)
-
- self.container.remote_routes_changed('topological', new_table)
+ mb_id = self.node_tracker.maskbit_for_node(_id)
+ mb_nh = self.node_tracker.maskbit_for_node(next_hop)
+ self.container.router_adapter.set_next_hop(mb_id, mb_nh)
def get_next_hops(self):
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c
index 2063b8fcc4..3aad9f3427 100644
--- a/qpid/extras/dispatch/src/agent.c
+++ b/qpid/extras/dispatch/src/agent.c
@@ -295,7 +295,7 @@ dx_agent_class_t *dx_agent_register_class(dx_dispatch_t *dx,
cls->query_handler = query_handler;
dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL);
- int result = hash_insert_const(agent->class_hash, iter, cls);
+ int result = hash_insert_const(agent->class_hash, iter, cls, 0);
dx_field_iterator_free(iter);
if (result < 0)
assert(false);
diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c
index 3500e08406..123e2ffb60 100644
--- a/qpid/extras/dispatch/src/container.c
+++ b/qpid/extras/dispatch/src/container.c
@@ -507,7 +507,7 @@ int dx_container_register_node_type(dx_dispatch_t *dx, const dx_node_type_t *nt)
nt_item->ntype = nt;
sys_mutex_lock(container->lock);
- result = hash_insert_const(container->node_type_map, iter, nt);
+ result = hash_insert_const(container->node_type_map, iter, nt, 0);
DEQ_INSERT_TAIL(container->node_type_list, nt_item);
sys_mutex_unlock(container->lock);
@@ -565,7 +565,7 @@ dx_node_t *dx_container_create_node(dx_dispatch_t *dx,
if (name) {
dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL);
sys_mutex_lock(container->lock);
- result = hash_insert(container->node_map, iter, node);
+ result = hash_insert(container->node_map, iter, node, 0);
sys_mutex_unlock(container->lock);
dx_field_iterator_free(iter);
if (result < 0) {
diff --git a/qpid/extras/dispatch/src/hash.c b/qpid/extras/dispatch/src/hash.c
index 1f7d8aa3f5..49f79925f6 100644
--- a/qpid/extras/dispatch/src/hash.c
+++ b/qpid/extras/dispatch/src/hash.c
@@ -52,6 +52,15 @@ struct hash_t {
};
+struct hash_handle_t {
+ bucket_t *bucket;
+ hash_item_t *item;
+};
+
+ALLOC_DECLARE(hash_handle_t);
+ALLOC_DEFINE(hash_handle_t);
+
+
// djb2 hash algorithm
static unsigned long hash_function(dx_field_iterator_t *iter)
{
@@ -102,7 +111,7 @@ size_t hash_size(hash_t *h)
}
-static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists)
+static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists, hash_handle_t **handle)
{
unsigned long idx = hash_function(key) & h->bucket_mask;
hash_item_t *item = DEQ_HEAD(h->buckets[idx].items);
@@ -115,6 +124,8 @@ static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, in
if (item) {
*exists = 1;
+ if (handle)
+ *handle = 0;
return item;
}
@@ -128,14 +139,24 @@ static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, in
DEQ_INSERT_TAIL(h->buckets[idx].items, item);
h->size++;
*exists = 0;
+
+ //
+ // If a pointer to a handle-pointer was supplied, create a handle for this item.
+ //
+ if (handle) {
+ *handle = new_hash_handle_t();
+ (*handle)->bucket = &h->buckets[idx];
+ (*handle)->item = item;
+ }
+
return item;
}
-dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
+dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val, hash_handle_t **handle)
{
int exists = 0;
- hash_item_t *item = hash_internal_insert(h, key, &exists);
+ hash_item_t *item = hash_internal_insert(h, key, &exists, handle);
if (!item)
return DX_ERROR_ALLOC;
@@ -149,12 +170,12 @@ dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val)
}
-dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val)
+dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val, hash_handle_t **handle)
{
assert(h->is_const);
int error = 0;
- hash_item_t *item = hash_internal_insert(h, key, &error);
+ hash_item_t *item = hash_internal_insert(h, key, &error, handle);
if (item)
item->v.val_const = val;
@@ -225,3 +246,31 @@ dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key)
return DX_ERROR_NOT_FOUND;
}
+
+void hash_handle_free(hash_handle_t *handle)
+{
+ if (handle)
+ free_hash_handle_t(handle);
+}
+
+
+const unsigned char *hash_key_by_handle(const hash_handle_t *handle)
+{
+ if (handle)
+ return handle->item->key;
+ return 0;
+}
+
+
+dx_error_t hash_remove_by_handle(hash_t *h, hash_handle_t *handle)
+{
+ if (!handle)
+ return DX_ERROR_NOT_FOUND;
+ free(handle->item->key);
+ DEQ_REMOVE(handle->bucket->items, handle->item);
+ free_hash_item_t(handle->item);
+ h->size--;
+ free_hash_handle_t(handle);
+ return DX_ERROR_NONE;
+}
+
diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c
index a3c41f86f5..c7c8f82dfc 100644
--- a/qpid/extras/dispatch/src/python_embedded.c
+++ b/qpid/extras/dispatch/src/python_embedded.c
@@ -398,7 +398,8 @@ typedef struct {
PyObject *handler;
PyObject *handler_rx_call;
dx_dispatch_t *dx;
- dx_address_t *address;
+ Py_ssize_t addr_count;
+ dx_address_t **addrs;
} IoAdapter;
@@ -469,25 +470,35 @@ static void dx_io_rx_handler(void *context, dx_message_t *msg, int link_id)
static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds)
{
- const char *address;
- if (!PyArg_ParseTuple(args, "Os", &self->handler, &address))
+ PyObject *addrs;
+ if (!PyArg_ParseTuple(args, "OO", &self->handler, &addrs))
return -1;
self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive");
if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call))
return -1;
+ if (!PyTuple_Check(addrs))
+ return -1;
+
Py_INCREF(self->handler);
Py_INCREF(self->handler_rx_call);
- self->dx = dispatch;
- self->address = dx_router_register_address(self->dx, address, dx_io_rx_handler, self);
+ self->dx = dispatch;
+ self->addr_count = PyTuple_Size(addrs);
+ self->addrs = NEW_PTR_ARRAY(dx_address_t, self->addr_count);
+ for (Py_ssize_t idx = 0; idx < self->addr_count; idx++)
+ self->addrs[idx] = dx_router_register_address(self->dx,
+ PyString_AS_STRING(PyTuple_GetItem(addrs, idx)),
+ dx_io_rx_handler, self);
return 0;
}
static void IoAdapter_dealloc(IoAdapter* self)
{
- dx_router_unregister_address(self->address);
+ for (Py_ssize_t idx = 0; idx < self->addr_count; idx++)
+ dx_router_unregister_address(self->addrs[idx]);
+ free(self->addrs);
Py_DECREF(self->handler);
Py_DECREF(self->handler_rx_call);
self->ob_type->tp_free((PyObject*)self);
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index fea3695620..35fb834029 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -28,8 +28,9 @@
static char *module = "ROUTER";
-static char *local_prefix = "_local/";
-static char *topo_prefix = "_topo/";
+static char *local_prefix = "_local/";
+static char *topo_prefix = "_topo/";
+static char *direct_prefix;
/**
* Address Types and Processing:
@@ -55,7 +56,7 @@ ALLOC_DEFINE(dx_address_t);
ALLOC_DEFINE(dx_router_conn_t);
-static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
{
dx_router_link_ref_t *ref = new_dx_router_link_ref_t();
DEQ_ITEM_INIT(ref);
@@ -65,7 +66,7 @@ static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_ro
}
-static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
+void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
{
if (link->ref) {
DEQ_REMOVE(*ref_list, link->ref);
@@ -75,6 +76,31 @@ static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_ro
}
+void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode)
+{
+ dx_router_ref_t *ref = new_dx_router_ref_t();
+ DEQ_ITEM_INIT(ref);
+ ref->router = rnode;
+ rnode->ref_count++;
+ DEQ_INSERT_TAIL(*ref_list, ref);
+}
+
+
+void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode)
+{
+ dx_router_ref_t *ref = DEQ_HEAD(*ref_list);
+ while (ref) {
+ if (ref->router == rnode) {
+ DEQ_REMOVE(*ref_list, ref);
+ free_dx_router_ref_t(ref);
+ rnode->ref_count--;
+ break;
+ }
+ ref = DEQ_NEXT(ref);
+ }
+}
+
+
/**
* Check an address to see if it no longer has any associated destinations.
* Depending on its policy, the address may be eligible for being closed out
@@ -262,10 +288,11 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
}
-static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
+static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg)
{
- dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
- dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+ dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
+ dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+ dx_field_iterator_t *ingress_iter = 0;
dx_parsed_field_t *trace = 0;
dx_parsed_field_t *ingress = 0;
@@ -293,7 +320,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
trace_item = dx_parse_sub_value(trace, idx);
}
- dx_compose_insert_string(out_da, router->router_id);
+ dx_compose_insert_string(out_da, direct_prefix);
dx_compose_end_list(out_da);
}
@@ -303,15 +330,17 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
//
dx_compose_insert_string(out_da, DX_DA_INGRESS);
if (ingress && dx_parse_is_scalar(ingress)) {
- dx_field_iterator_t *iter = dx_parse_raw(ingress);
- dx_compose_insert_string_iterator(out_da, iter);
+ ingress_iter = dx_parse_raw(ingress);
+ dx_compose_insert_string_iterator(out_da, ingress_iter);
} else
- dx_compose_insert_string(out_da, router->router_id);
+ dx_compose_insert_string(out_da, direct_prefix);
dx_compose_end_map(out_da);
dx_message_set_delivery_annotations(msg, out_da);
dx_compose_free(out_da);
+
+ return ingress_iter;
}
@@ -394,7 +423,8 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
hash_retrieve(router->addr_hash, iter, (void*) &addr);
dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
- int is_local = dx_field_iterator_prefix(iter, local_prefix);
+ int is_local = dx_field_iterator_prefix(iter, local_prefix);
+ int is_direct = dx_field_iterator_prefix(iter, direct_prefix);
dx_field_iterator_free(iter);
if (addr) {
@@ -404,9 +434,10 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
//
//
- // Interpret and update the delivery annotations of the message
+ // Interpret and update the delivery annotations of the message. As a convenience,
+ // this function returns the iterator to the ingress field (if it exists).
//
- router_annotate_message(router, msg);
+ dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg);
//
// Forward to the in-process handler for this message if there is one. The
@@ -446,31 +477,54 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
}
//
- // Forward to the next-hops for remote destinations.
+ // If the address form is direct to this router node, don't relay it on
+ // to any other part of the network.
//
- dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
- dx_router_link_t *dest_link;
- while (dest_node_ref) {
- if (dest_node_ref->router->next_hop)
- dest_link = dest_node_ref->router->next_hop->peer_link;
- else
- dest_link = dest_node_ref->router->peer_link;
- if (dest_link) {
- dx_routed_event_t *re = new_dx_routed_event_t();
- DEQ_ITEM_INIT(re);
- re->delivery = 0;
- re->message = dx_message_copy(msg);
- re->settle = 0;
- re->disposition = 0;
- DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
-
- fanout++;
- if (fanout == 1)
- re->delivery = delivery;
-
- dx_link_activate(dest_link->link);
+ if (!is_direct) {
+ //
+ // Get the mask bit associated with the ingress router for the message.
+ // This will be compared against the "valid_origin" masks for each
+ // candidate destination router.
+ //
+ int origin = -1;
+ if (ingress_iter) {
+ dx_address_t *origin_addr;
+ hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr);
+ if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) {
+ dx_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes);
+ origin = rref->router->mask_bit;
+ }
+ }
+
+ //
+ // Forward to the next-hops for remote destinations.
+ //
+ if (origin >= 0) {
+ dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes);
+ dx_router_link_t *dest_link;
+ while (dest_node_ref) {
+ if (dest_node_ref->router->next_hop)
+ dest_link = dest_node_ref->router->next_hop->peer_link;
+ else
+ dest_link = dest_node_ref->router->peer_link;
+ if (dest_link && dx_bitmask_value(dest_node_ref->router->valid_origins, origin)) {
+ dx_routed_event_t *re = new_dx_routed_event_t();
+ DEQ_ITEM_INIT(re);
+ re->delivery = 0;
+ re->message = dx_message_copy(msg);
+ re->settle = 0;
+ re->disposition = 0;
+ DEQ_INSERT_TAIL(dest_link->msg_fifo, re);
+
+ fanout++;
+ if (fanout == 1)
+ re->delivery = delivery;
+
+ dx_link_activate(dest_link->link);
+ }
+ dest_node_ref = DEQ_NEXT(dest_node_ref);
+ }
}
- dest_node_ref = DEQ_NEXT(dest_node_ref);
}
}
}
@@ -633,10 +687,10 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
if (is_router) {
//
- // If this is a router link, put it in the router_address link-list.
+ // If this is a router link, put it in the hello_address link-list.
//
- dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
- rlink->owning_addr = router->router_addr;
+ dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
+ rlink->owning_addr = router->hello_addr;
router->out_links_by_mask_bit[rlink->mask_bit] = rlink;
} else {
@@ -667,7 +721,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- hash_insert(router->addr_hash, iter, addr);
+ hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
}
dx_field_iterator_free(iter);
@@ -806,7 +860,7 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
rlink->mask_bit = mask_bit;
rlink->link_type = DX_LINK_ROUTER;
rlink->link_direction = DX_OUTGOING;
- rlink->owning_addr = router->router_addr;
+ rlink->owning_addr = router->hello_addr;
rlink->link = sender;
rlink->connected_link = 0;
rlink->peer_link = 0;
@@ -814,9 +868,9 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co
DEQ_INIT(rlink->msg_fifo);
//
- // Add the new outgoing link to the router_address's list of links.
+ // Add the new outgoing link to the hello_address's list of links.
//
- dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink);
+ dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink);
//
// Index this link from the by-maskbit index so we can later find it quickly
@@ -867,6 +921,14 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
dx_container_register_node_type(dx, &router_node);
}
+ size_t dplen = 9 + strlen(area) + strlen(id);
+ direct_prefix = (char*) malloc(dplen);
+ strcpy(direct_prefix, "_topo/");
+ strcat(direct_prefix, area);
+ strcat(direct_prefix, "/");
+ strcat(direct_prefix, id);
+ strcat(direct_prefix, "/");
+
dx_router_t *router = NEW(dx_router_t);
router_node.type_context = router;
@@ -883,8 +945,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
DEQ_INIT(router->routers);
router->out_links_by_mask_bit = NEW_PTR_ARRAY(dx_router_link_t, dx_bitmask_width());
- for (int idx = 0; idx < dx_bitmask_width(); idx++)
+ router->routers_by_mask_bit = NEW_PTR_ARRAY(dx_router_node_t, dx_bitmask_width());
+ for (int idx = 0; idx < dx_bitmask_width(); idx++) {
router->out_links_by_mask_bit[idx] = 0;
+ router->routers_by_mask_bit[idx] = 0;
+ }
router->neighbor_free_mask = dx_bitmask(1);
router->lock = sys_mutex();
@@ -894,10 +959,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
router->pyTick = 0;
//
- // Create an address for all of the routers in the topology. It will be registered
+ // Create addresses for all of the routers in the topology. It will be registered
// locally later in the initialization sequence.
//
router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0);
+ router->hello_addr = dx_router_register_address(dx, "qdxhello", 0, 0);
//
// Inform the field iterator module of this router's id and area. The field iterator
@@ -935,8 +1001,7 @@ void dx_router_free(dx_router_t *router)
const char *dx_router_id(const dx_dispatch_t *dx)
{
- dx_router_t *router = dx->router;
- return router->router_id;
+ return direct_prefix;
}
@@ -963,7 +1028,7 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
addr->handler_context = 0;
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- hash_insert(router->addr_hash, iter, addr);
+ hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(router->addrs, addr);
}
diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h
index 7f1f7486fb..a21e4f1d5c 100644
--- a/qpid/extras/dispatch/src/router_private.h
+++ b/qpid/extras/dispatch/src/router_private.h
@@ -66,10 +66,11 @@ DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
struct dx_router_node_t {
DEQ_LINKS(dx_router_node_t);
- const char *id;
+ dx_address_t *owning_addr;
int mask_bit;
dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node
dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node
+ uint32_t ref_count;
dx_bitmask_t *valid_origins;
};
@@ -107,6 +108,7 @@ struct dx_address_t {
void *handler_context; // In-Process Consumer context
dx_router_link_ref_list_t rlinks; // Locally-Connected Consumers
dx_router_ref_list_t rnodes; // Remotely-Connected Consumers
+ hash_handle_t *hash_handle; // Linkage back to the hash table entry
};
ALLOC_DECLARE(dx_address_t);
@@ -122,10 +124,12 @@ struct dx_router_t {
dx_address_list_t addrs;
hash_t *addr_hash;
dx_address_t *router_addr;
+ dx_address_t *hello_addr;
dx_router_link_list_t links;
dx_router_node_list_t routers;
dx_router_link_t **out_links_by_mask_bit;
+ dx_router_node_t **routers_by_mask_bit;
dx_bitmask_t *neighbor_free_mask;
sys_mutex_t *lock;
@@ -136,4 +140,12 @@ struct dx_router_t {
PyObject *pyTick;
};
+
+void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
+void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
+
+void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
+void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
+
+
#endif
diff --git a/qpid/extras/dispatch/src/router_pynode.c b/qpid/extras/dispatch/src/router_pynode.c
index 9930c5dde9..7ada6dad4b 100644
--- a/qpid/extras/dispatch/src/router_pynode.c
+++ b/qpid/extras/dispatch/src/router_pynode.c
@@ -34,34 +34,268 @@ typedef struct {
} RouterAdapter;
-static PyObject* dx_router_node_updated(PyObject *self, PyObject *args)
+static char *dx_add_router(dx_router_t *router, const char *address, int router_maskbit, int link_maskbit)
{
- //RouterAdapter *adapter = (RouterAdapter*) self;
- //dx_router_t *router = adapter->router;
+ if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0)
+ return "Router bit mask out of range";
+
+ if (link_maskbit >= dx_bitmask_width() || link_maskbit < -1)
+ return "Link bit mask out of range";
+
+ sys_mutex_lock(router->lock);
+ if (router->routers_by_mask_bit[router_maskbit] != 0) {
+ sys_mutex_unlock(router->lock);
+ return "Adding router over already existing router";
+ }
+
+ if (link_maskbit >= 0 && router->out_links_by_mask_bit[link_maskbit] == 0) {
+ sys_mutex_unlock(router->lock);
+ return "Adding neighbor router with invalid link reference";
+ }
+
+ //
+ // Hash lookup the address to ensure there isn't an existing router address.
+ //
+ dx_field_iterator_t *iter = dx_field_iterator_string(address, ITER_VIEW_ADDRESS_HASH);
+ dx_address_t *addr;
+
+ hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ assert(addr == 0);
+
+ //
+ // Create an address record for this router and insert it in the hash table.
+ // This record will be found whenever a "foreign" topological address to this
+ // remote router is looked up.
+ //
+ addr = new_dx_address_t();
+ DEQ_ITEM_INIT(addr);
+ addr->handler = 0;
+ addr->handler_context = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(router->addrs, addr);
+
+ //
+ // Create a router-node record to represent the remote router.
+ //
+ dx_router_node_t *rnode = new_dx_router_node_t();
+ DEQ_ITEM_INIT(rnode);
+ rnode->owning_addr = addr;
+ rnode->mask_bit = router_maskbit;
+ rnode->next_hop = 0;
+ rnode->peer_link = 0;
+ rnode->ref_count = 0;
+ rnode->valid_origins = dx_bitmask(0);
+
+ DEQ_INSERT_TAIL(router->routers, rnode);
+
+ //
+ // Link the router record to the address record.
+ //
+ dx_router_add_node_ref_LH(&addr->rnodes, rnode);
+
+ //
+ // Link the router record to the router address record.
+ //
+ dx_router_add_node_ref_LH(&router->router_addr->rnodes, rnode);
+
+ //
+ // Add the router record to the mask-bit index.
+ //
+ router->routers_by_mask_bit[router_maskbit] = rnode;
+
+ //
+ // If this is a neighbor router, add the peer_link reference to the
+ // router record.
+ //
+ if (link_maskbit >= 0)
+ rnode->peer_link = router->out_links_by_mask_bit[link_maskbit];
+
+ sys_mutex_unlock(router->lock);
+ return 0;
+}
+
+
+static char *dx_del_router(dx_router_t *router, int router_maskbit)
+{
+ if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0)
+ return "Router bit mask out of range";
+
+ sys_mutex_lock(router->lock);
+ if (router->routers_by_mask_bit[router_maskbit] == 0) {
+ sys_mutex_unlock(router->lock);
+ return "Deleting nonexistent router";
+ }
+
+ dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
+ dx_address_t *oaddr = rnode->owning_addr;
+ assert(oaddr);
+
+ //
+ // Unlink the router node from the address record
+ //
+ dx_router_del_node_ref_LH(&oaddr->rnodes, rnode);
+
+ //
+ // While the router node has a non-zero reference count, look for addresses
+ // to unlink the node from.
+ //
+ dx_address_t *addr = DEQ_HEAD(router->addrs);
+ while (addr && rnode->ref_count > 0) {
+ dx_router_del_node_ref_LH(&addr->rnodes, rnode);
+ addr = DEQ_NEXT(addr);
+ }
+ assert(rnode->ref_count == 0);
+
+ //
+ // Free the router node and the owning address records.
+ //
+ dx_bitmask_free(rnode->valid_origins);
+ free_dx_router_node_t(rnode);
+
+ hash_remove_by_handle(router->addr_hash, oaddr->hash_handle);
+ DEQ_REMOVE(router->addrs, oaddr);
+ hash_handle_free(oaddr->hash_handle);
+ router->routers_by_mask_bit[router_maskbit] = 0;
+ free_dx_address_t(oaddr);
+
+ sys_mutex_unlock(router->lock);
+ return 0;
+}
+
+
+static PyObject* dx_add_remote_router(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
const char *address;
- int is_reachable;
- int is_neighbor;
- int link_maskbit;
int router_maskbit;
- if (!PyArg_ParseTuple(args, "siiii", &address, &is_reachable, &is_neighbor,
- &link_maskbit, &router_maskbit))
+ if (!PyArg_ParseTuple(args, "si", &address, &router_maskbit))
return 0;
- // TODO
+ char *error = dx_add_router(router, address, router_maskbit, -1);
+ if (error) {
+ PyErr_SetString(PyExc_Exception, error);
+ return 0;
+ }
Py_INCREF(Py_None);
return Py_None;
}
-static PyObject* dx_router_add_route(PyObject *self, PyObject *args)
+static PyObject* dx_del_remote_router(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ int router_maskbit;
+
+ if (!PyArg_ParseTuple(args, "i", &router_maskbit))
+ return 0;
+
+ char *error = dx_del_router(router, router_maskbit);
+ if (error) {
+ PyErr_SetString(PyExc_Exception, error);
+ return 0;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_set_next_hop(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ int router_maskbit;
+ int next_hop_maskbit;
+
+ if (!PyArg_ParseTuple(args, "ii", &router_maskbit, &next_hop_maskbit))
+ return 0;
+
+ if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
+ return 0;
+ }
+
+ if (next_hop_maskbit >= dx_bitmask_width() || next_hop_maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Next Hop bit mask out of range");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[router_maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Router Not Found");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[next_hop_maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Next Hop Not Found");
+ return 0;
+ }
+
+ if (router_maskbit != next_hop_maskbit) {
+ dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
+ rnode->next_hop = router->routers_by_mask_bit[next_hop_maskbit];
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_add_neighbor_router(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ const char *address;
+ int router_maskbit;
+ int link_maskbit;
+
+ if (!PyArg_ParseTuple(args, "sii", &address, &router_maskbit, &link_maskbit))
+ return 0;
+
+ char *error = dx_add_router(router, address, router_maskbit, link_maskbit);
+ if (error) {
+ PyErr_SetString(PyExc_Exception, error);
+ return 0;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_del_neighbor_router(PyObject *self, PyObject *args)
+{
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ int router_maskbit;
+
+ if (!PyArg_ParseTuple(args, "i", &router_maskbit))
+ return 0;
+
+ char *error = dx_del_router(router, router_maskbit);
+ if (error) {
+ PyErr_SetString(PyExc_Exception, error);
+ return 0;
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_map_destination(PyObject *self, PyObject *args)
{
//RouterAdapter *adapter = (RouterAdapter*) self;
- const char *addr;
- const char *peer;
+ //dx_router_t *router = adapter->router;
+ const char *addr;
+ int router_maskbit;
- if (!PyArg_ParseTuple(args, "ss", &addr, &peer))
+ if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
return 0;
// TODO
@@ -71,13 +305,14 @@ static PyObject* dx_router_add_route(PyObject *self, PyObject *args)
}
-static PyObject* dx_router_del_route(PyObject *self, PyObject *args)
+static PyObject* dx_unmap_destination(PyObject *self, PyObject *args)
{
//RouterAdapter *adapter = (RouterAdapter*) self;
- const char *addr;
- const char *peer;
+ //dx_router_t *router = adapter->router;
+ const char *addr;
+ int router_maskbit;
- if (!PyArg_ParseTuple(args, "ss", &addr, &peer))
+ if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
return 0;
// TODO
@@ -88,9 +323,13 @@ static PyObject* dx_router_del_route(PyObject *self, PyObject *args)
static PyMethodDef RouterAdapter_methods[] = {
- {"node_updated", dx_router_node_updated, METH_VARARGS, "Update the status of a remote router node"},
- {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"},
- {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"},
+ {"add_remote_router", dx_add_remote_router, METH_VARARGS, "A new remote/reachable router has been discovered"},
+ {"del_remote_router", dx_del_remote_router, METH_VARARGS, "We've lost reachability to a remote router"},
+ {"set_next_hop", dx_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"},
+ {"add_neighbor_router", dx_add_neighbor_router, METH_VARARGS, "A new neighbor router has been discovered"},
+ {"del_neighbor_router", dx_del_neighbor_router, METH_VARARGS, "We've lost reachability to a neighbor router"},
+ {"map_destination", dx_map_destination, METH_VARARGS, "Add a newly discovered destination mapping"},
+ {"unmap_destination", dx_unmap_destination, METH_VARARGS, "Delete a destination mapping"},
{0, 0, 0, 0}
};
diff --git a/qpid/extras/dispatch/tests/field_test.c b/qpid/extras/dispatch/tests/field_test.c
index 59fc3c4cfb..33f9fe5d03 100644
--- a/qpid/extras/dispatch/tests/field_test.c
+++ b/qpid/extras/dispatch/tests/field_test.c
@@ -115,6 +115,10 @@ static char* test_view_address_hash(void *context)
{"amqp:/_topo/my-area/all/local/sub", "Llocal/sub"},
{"amqp:/_topo/all/all/local/sub", "Llocal/sub"},
{"amqp://host:port/_local/my-addr", "Lmy-addr"},
+ {"_topo/area/router/my-addr", "Aarea"},
+ {"_topo/my-area/router/my-addr", "Rrouter"},
+ {"_topo/my-area/my-router/my-addr", "Lmy-addr"},
+ {"_topo/my-area/router", "Rrouter"},
{0, 0}
};
int idx;
diff --git a/qpid/extras/dispatch/tests/router_engine_test.py b/qpid/extras/dispatch/tests/router_engine_test.py
index 7e3940fda4..47917b0f5f 100644
--- a/qpid/extras/dispatch/tests/router_engine_test.py
+++ b/qpid/extras/dispatch/tests/router_engine_test.py
@@ -97,62 +97,74 @@ class NodeTrackerTest(unittest.TestCase):
def log(self, level, text):
pass
- def node_updated(self, address, reachable, neighbor, link_bit, router_bit):
+ def add_neighbor_router(self, address, router_bit, link_bit):
self.address = address
- self.reachable = reachable
- self.neighbor = neighbor
+ self.router_bit = router_bit
self.link_bit = link_bit
+ self.calls += 1
+
+ def del_neighbor_router(self, router_bit):
+ self.address = None
self.router_bit = router_bit
+ self.link_bit = None
+ self.calls += 1
- def reset(self):
+ def add_remote_router(self, address, router_bit):
+ self.address = address
+ self.router_bit = router_bit
+ self.link_bit = None
+ self.calls += 1
+
+ def del_remote_router(self, router_bit):
self.address = None
- self.reachable = None
- self.neighbor = None
+ self.router_bit = router_bit
self.link_bit = None
+ self.calls += 1
+
+ def reset(self):
+ self.address = None
self.router_bit = None
+ self.link_bit = None
+ self.area = "area"
+ self.calls = 0
def test_node_tracker_limits(self):
tracker = NodeTracker(self, 5)
self.reset()
tracker.new_neighbor('A', 1)
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/A')
self.assertEqual(self.link_bit, 1)
self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_neighbor('B', 5)
- self.assertEqual(self.address, 'RB')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/B')
self.assertEqual(self.link_bit, 5)
self.assertEqual(self.router_bit, 1)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_neighbor('C', 6)
- self.assertEqual(self.address, 'RC')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/C')
self.assertEqual(self.link_bit, 6)
self.assertEqual(self.router_bit, 2)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_neighbor('D', 7)
- self.assertEqual(self.address, 'RD')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/D')
self.assertEqual(self.link_bit, 7)
self.assertEqual(self.router_bit, 3)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_neighbor('E', 8)
- self.assertEqual(self.address, 'RE')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/E')
self.assertEqual(self.link_bit, 8)
self.assertEqual(self.router_bit, 4)
+ self.assertEqual(self.calls, 1)
self.reset()
try:
@@ -163,16 +175,15 @@ class NodeTrackerTest(unittest.TestCase):
self.reset()
tracker.lost_neighbor('C')
- self.assertEqual(self.address, 'RC')
- self.assertFalse(self.reachable)
+ self.assertEqual(self.router_bit, 2)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_neighbor('F', 9)
- self.assertEqual(self.address, 'RF')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/F')
self.assertEqual(self.link_bit, 9)
self.assertEqual(self.router_bit, 2)
+ self.assertEqual(self.calls, 1)
def test_node_tracker_remote_neighbor(self):
@@ -180,32 +191,29 @@ class NodeTrackerTest(unittest.TestCase):
self.reset()
tracker.new_node('A')
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertFalse(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/A')
self.assertFalse(self.link_bit)
self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_neighbor('A', 3)
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/A')
self.assertEqual(self.link_bit, 3)
self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 2)
self.reset()
tracker.lost_node('A')
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
- self.assertEqual(self.link_bit, 3)
- self.assertEqual(self.router_bit, 0)
+ self.assertFalse(self.address)
+ self.assertFalse(self.link_bit)
+ self.assertFalse(self.router_bit)
+ self.assertEqual(self.calls, 0)
self.reset()
tracker.lost_neighbor('A')
- self.assertEqual(self.address, 'RA')
- self.assertFalse(self.reachable)
+ self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 1)
def test_node_tracker_neighbor_remote(self):
@@ -213,32 +221,28 @@ class NodeTrackerTest(unittest.TestCase):
self.reset()
tracker.new_neighbor('A', 3)
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
+ self.assertEqual(self.address, 'amqp:/_topo/area/A')
self.assertEqual(self.link_bit, 3)
self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 1)
self.reset()
tracker.new_node('A')
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertTrue(self.neighbor)
- self.assertEqual(self.link_bit, 3)
- self.assertEqual(self.router_bit, 0)
+ self.assertFalse(self.address)
+ self.assertFalse(self.link_bit)
+ self.assertFalse(self.router_bit)
+ self.assertEqual(self.calls, 0)
self.reset()
tracker.lost_neighbor('A')
- self.assertEqual(self.address, 'RA')
- self.assertTrue(self.reachable)
- self.assertFalse(self.neighbor)
- self.assertFalse(self.link_bit)
+ self.assertEqual(self.address, 'amqp:/_topo/area/A')
self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 2)
self.reset()
tracker.lost_node('A')
- self.assertEqual(self.address, 'RA')
- self.assertFalse(self.reachable)
+ self.assertEqual(self.router_bit, 0)
+ self.assertEqual(self.calls, 1)
class NeighborTest(unittest.TestCase):
@@ -274,7 +278,7 @@ class NeighborTest(unittest.TestCase):
self.engine.tick(1.5)
self.assertEqual(len(self.sent), 1)
dest, msg = self.sent.pop(0)
- self.assertEqual(dest, "_local/qdxrouter")
+ self.assertEqual(dest, "amqp:/_local/qdxhello")
self.assertEqual(msg.get_opcode(), "HELLO")
self.assertEqual(msg.id, self.id)
self.assertEqual(msg.area, self.area)
diff --git a/qpid/extras/dispatch/tests/system_tests_one_router.py b/qpid/extras/dispatch/tests/system_tests_one_router.py
index b25568d283..8d486e2b85 100644
--- a/qpid/extras/dispatch/tests/system_tests_one_router.py
+++ b/qpid/extras/dispatch/tests/system_tests_one_router.py
@@ -310,7 +310,7 @@ class RouterTest(unittest.TestCase):
self.assertEqual(i, rm.body['number'])
da = rm.instructions
self.assertEqual(da.__class__, dict)
- self.assertEqual(da['qdx.ingress'], 'Qpid.Dispatch.Router.A')
+ self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/')
self.assertFalse('qdx.trace' in da)
##
@@ -346,7 +346,7 @@ class RouterTest(unittest.TestCase):
self.assertEqual(i, rm.body['number'])
da = rm.instructions
self.assertEqual(da.__class__, dict)
- self.assertEqual(da['qdx.ingress'], 'Qpid.Dispatch.Router.A')
+ self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/')
self.assertFalse('qdx.trace' in da)
##
@@ -364,8 +364,8 @@ class RouterTest(unittest.TestCase):
self.assertEqual(i, rm.body['number'])
da = rm.instructions
self.assertEqual(da.__class__, dict)
- self.assertEqual(da['qdx.ingress'], 'Qpid.Dispatch.Router.A')
- self.assertEqual(da['qdx.trace'], ['Qpid.Dispatch.Router.A'])
+ self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/')
+ self.assertEqual(da['qdx.trace'], ['_topo/area/Qpid.Dispatch.Router.A/'])
##
## Non-empty trace
@@ -382,8 +382,8 @@ class RouterTest(unittest.TestCase):
self.assertEqual(i, rm.body['number'])
da = rm.instructions
self.assertEqual(da.__class__, dict)
- self.assertEqual(da['qdx.ingress'], 'Qpid.Dispatch.Router.A')
- self.assertEqual(da['qdx.trace'], ['first.hop', 'Qpid.Dispatch.Router.A'])
+ self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/')
+ self.assertEqual(da['qdx.trace'], ['first.hop', '_topo/area/Qpid.Dispatch.Router.A/'])
M1.stop()
M2.stop()
diff --git a/qpid/extras/dispatch/tests/tworouters-A.conf b/qpid/extras/dispatch/tests/tworouters-A.conf
new file mode 100644
index 0000000000..b4a5770c2d
--- /dev/null
+++ b/qpid/extras/dispatch/tests/tworouters-A.conf
@@ -0,0 +1,54 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+
+##
+## Container section - Configures the general operation of the AMQP container.
+##
+container {
+ ##
+ ## worker-threads - The number of threads that will be created to
+ ## process message traffic and other application work (timers, non-amqp
+ ## file descriptors, etc.)
+ ##
+ ## The number of threads should be related to the number of available
+ ## processor cores. To fully utilize a quad-core system, set the
+ ## number of threads to 4.
+ ##
+ worker-threads: 4
+
+ ##
+ ## container-name - The name of the AMQP container. If not specified,
+ ## the container name will be set to a value of the container's
+ ## choosing. The automatically assigned container name is not
+ ## guaranteed to be persistent across restarts of the container.
+ ##
+ container-name: Qpid.Dispatch.Router.A
+}
+
+
+##
+## Listeners and Connectors
+##
+listener {
+ addr: 0.0.0.0
+ port: 20001
+ sasl-mechanisms: ANONYMOUS
+}
+
diff --git a/qpid/extras/dispatch/tests/tworouters-B.conf b/qpid/extras/dispatch/tests/tworouters-B.conf
new file mode 100644
index 0000000000..d56bd45078
--- /dev/null
+++ b/qpid/extras/dispatch/tests/tworouters-B.conf
@@ -0,0 +1,61 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+
+##
+## Container section - Configures the general operation of the AMQP container.
+##
+container {
+ ##
+ ## worker-threads - The number of threads that will be created to
+ ## process message traffic and other application work (timers, non-amqp
+ ## file descriptors, etc.)
+ ##
+ ## The number of threads should be related to the number of available
+ ## processor cores. To fully utilize a quad-core system, set the
+ ## number of threads to 4.
+ ##
+ worker-threads: 4
+
+ ##
+ ## container-name - The name of the AMQP container. If not specified,
+ ## the container name will be set to a value of the container's
+ ## choosing. The automatically assigned container name is not
+ ## guaranteed to be persistent across restarts of the container.
+ ##
+ container-name: Qpid.Dispatch.Router.B
+}
+
+
+##
+## Listeners and Connectors
+##
+listener {
+ addr: 0.0.0.0
+ port: 20002
+ sasl-mechanisms: ANONYMOUS
+}
+
+connector {
+ label: Router Uplink
+ addr: 0.0.0.0
+ port: 20001
+ sasl-mechanisms: ANONYMOUS
+}
+