diff options
21 files changed, 776 insertions, 216 deletions
diff --git a/qpid/extras/dispatch/TODO b/qpid/extras/dispatch/TODO index ab0c1a951e..0ae3988d29 100644 --- a/qpid/extras/dispatch/TODO +++ b/qpid/extras/dispatch/TODO @@ -5,3 +5,50 @@ enhancements to be fixed by going to the Apache Qpid JIRA instance: http://issues.apache.org/jira/browse/QPID ============================================================================== + +- Router Mode: + o Stand-Alone-Router - Does not participate in routing protocol, does not permit inter-router + links, acts as a normal interior-router otherwise. + o Interior-Router - Participates in the routing protocol + o Edge-Concentrator - Does not participate in routing protocol, requires uplink connection(s) + This mode should be used when Dispatch is integrated into an endpoint + application or when it is acting as a connection concentrator. + Proxy and access-protocol functions will be available in this mode. + +- Connection Annotation: + o Type: Inter-router, uplink, endpoint, etc. This formal annotation can be accessed internally + by the connection handlers to guide Dispatch's handling of new connections. + o Weight-{in,out}: Weight/Cost metrics for inter-router links + +- Statistics for Instrumentation: + o Link + . delivery count {unsettled, pre-settled} + . deliveries {accepted, rejected, released, modified} + . octets of delivery {accepted, rejected, released, modified} + . flow frame count + . disposition frame count {forward, backward} + o Address + . deliveries {ingress, egress, transit} + . octets of delivery {ingress, egress, transit} + +- Infrastructure + o Router_Link - Buffer and Iterator for a copy of the link's target address (for use + as an address for messages with no 'to' field). + o Router Event Queue - Event queue to feed alerts to the Python router code. + Neighbor-link-loss is a valuable event because it accelerates the + detection of topology change. + o All PyRouter stimulus through a work queue. + o Router Code Updates + . Remove all vestiges of "binding" + . Calculate the valid-origin mask for each path + . Report address mappings to routers + o Expose idle-timeout/keepalive on connectors and listeners + +- Major Roadmap Features + o Security Policy Enforcement + o Proxy (Translation Node) Capability + o Address Provisioning with variable semantics + o Link Routing + o Management, Instrumentation, and Accounting + o Link Cost + o Area Routing diff --git a/qpid/extras/dispatch/include/qpid/dispatch/compose.h b/qpid/extras/dispatch/include/qpid/dispatch/compose.h index 7d668f0ac0..d68f6be746 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch/compose.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/compose.h @@ -167,7 +167,8 @@ void dx_compose_insert_string(dx_composed_field_t *field, const char *value); * Insert a utf8-encoded string into the field from an iterator * * @param field A field created by dx_compose. - * @param value A pointer to a null-terminated string. + * @param iter An iterator for a string value. The caller is responsible for freeing + * this iterator after the call is complete. */ void dx_compose_insert_string_iterator(dx_composed_field_t *field, dx_field_iterator_t *iter); diff --git a/qpid/extras/dispatch/include/qpid/dispatch/hash.h b/qpid/extras/dispatch/include/qpid/dispatch/hash.h index bfad142517..bf22c2c603 100644 --- a/qpid/extras/dispatch/include/qpid/dispatch/hash.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/hash.h @@ -23,16 +23,22 @@ #include <qpid/dispatch/iterator.h> #include <qpid/dispatch/error.h> -typedef struct hash_t hash_t; +typedef struct hash_t hash_t; +typedef struct hash_handle_t hash_handle_t; hash_t *hash(int bucket_exponent, int batch_size, int value_is_const); void hash_free(hash_t *h); size_t hash_size(hash_t *h); -dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val); -dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val); +dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val, hash_handle_t **handle); +dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val, hash_handle_t **handle); dx_error_t hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val); dx_error_t hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val); dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key); +void hash_handle_free(hash_handle_t *handle); +const unsigned char *hash_key_by_handle(const hash_handle_t *handle); +dx_error_t hash_remove_by_handle(hash_t *h, hash_handle_t *handle); + + #endif diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/link.py b/qpid/extras/dispatch/python/qpid/dispatch/router/link.py index 46e03379b4..4a770684cd 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/link.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/link.py @@ -101,7 +101,7 @@ class LinkStateEngine(object): if self.id not in self.collection: return my_ls = self.collection[self.id] - self.container.send('_topo/%s/%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls)) + self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls)) def new_local_link_state(self, link_state): @@ -132,7 +132,7 @@ class LinkStateEngine(object): def _send_lsrs(self): for (_area, _id) in self.needed_lsrs.keys(): - self.container.send('_topo/%s/%s' % (_area, _id), MessageLSR(None, self.id, self.area)) + self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageLSR(None, self.id, self.area)) self.needed_lsrs = {} @@ -140,4 +140,4 @@ class LinkStateEngine(object): ls_seq = 0 if self.id in self.collection: ls_seq = self.collection[self.id].ls_seq - self.container.send('_topo/%s/all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq)) + self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq)) diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py index 5413dd38a8..0707f7f250 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py @@ -56,7 +56,7 @@ class MobileAddressEngine(object): ## if len(self.added_keys) > 0 or len(self.deleted_keys) > 0: self.mobile_seq += 1 - self.container.send('_topo.%s.all' % self.area, + self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area, MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys)) self.local_keys.extend(self.added_keys) for key in self.deleted_keys: @@ -162,7 +162,7 @@ class MobileAddressEngine(object): if msg.id == self.id: return if msg.have_seq < self.mobile_seq: - self.container.send('_topo.%s.%s' % (msg.area, msg.id), + self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id), MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys)) @@ -183,6 +183,6 @@ class MobileAddressEngine(object): def _send_mars(self): for _id, _area, _seq in self.needed_mars.keys(): - self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq)) + self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageMAR(None, self.id, self.area, _seq)) self.needed_mars = {} diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py b/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py index 6a16049065..37b7888efe 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/neighbor.py @@ -49,7 +49,7 @@ class NeighborEngine(object): if now - self.last_hello_time >= self.hello_interval: self.last_hello_time = now - self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys())) + self.container.send('amqp:/_local/qdxhello', MessageHELLO(None, self.id, self.area, self.hellos.keys())) if self.link_state_changed: self.link_state_changed = False diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py index 433ce3ab1e..b1f6df2662 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py @@ -50,10 +50,16 @@ class NodeTracker(object): A node, designated by node_id, has been discovered as a neighbor over a link with a maskbit of link_maskbit. """ - if node_id not in self.nodes: - self.nodes[node_id] = RemoteNode(node_id, self._allocate_maskbit()) - self.nodes[node_id].set_neighbor(link_maskbit) - self._notify(self.nodes[node_id]) + if node_id in self.nodes: + node = self.nodes[node_id] + if node.neighbor: + return + self.container.del_remote_router(node.maskbit) + node.neighbor = True + else: + node = RemoteNode(node_id, self._allocate_maskbit(), True) + self.nodes[node_id] = node + self.container.add_neighbor_router(self._address(node_id), node.maskbit, link_maskbit) def lost_neighbor(self, node_id): @@ -61,9 +67,11 @@ class NodeTracker(object): We have lost contact with a neighboring node node_id. """ node = self.nodes[node_id] - node.clear_neighbor() - self._notify(node) - if node.to_delete(): + node.neighbor = False + self.container.del_neighbor_router(node.maskbit) + if node.remote: + self.container.add_remote_router(self._address(node.id), node.maskbit) + else: self._free_maskbit(node.maskbit) self.nodes.pop(node_id) @@ -74,9 +82,12 @@ class NodeTracker(object): remote peer. """ if node_id not in self.nodes: - self.nodes[node_id] = RemoteNode(node_id, self._allocate_maskbit()) - self.nodes[node_id].set_remote() - self._notify(self.nodes[node_id]) + node = RemoteNode(node_id, self._allocate_maskbit(), False) + self.nodes[node_id] = node + self.container.add_remote_router(self._address(node.id), node.maskbit) + else: + node = self.nodes[node_id] + node.remote = True def lost_node(self, node_id): @@ -84,11 +95,21 @@ class NodeTracker(object): A remote node, node_id, has not been heard from for too long and is being deemed lost. """ node = self.nodes[node_id] - node.clear_remote() - self._notify(node) - if node.to_delete(): - self._free_maskbit(node.maskbit) - self.nodes.pop(node_id) + if node.remote: + node.remote = False + if not node.neighbor: + self.container.del_remote_router(node.maskbit) + self._free_maskbit(node.maskbit) + self.nodes.pop(node_id) + + + def maskbit_for_node(self, node_id): + """ + """ + node = self.nodes[node_id] + if node: + return node.maskbit + return None def _allocate_maskbit(self): @@ -110,39 +131,15 @@ class NodeTracker(object): self.next_maskbit = i - def _notify(self, node): - if node.to_delete(): - self.container.node_updated("R%s" % node.id, 0, 0, 0, 0) - else: - is_neighbor = 0 - if node.neighbor: - is_neighbor = 1 - self.container.node_updated("R%s" % node.id, 1, is_neighbor, node.link_maskbit, node.maskbit) + def _address(self, node_id): + return "amqp:/_topo/%s/%s" % (self.container.area, node_id) class RemoteNode(object): - def __init__(self, node_id, maskbit): - self.id = node_id - self.neighbor = None - self.link_maskbit = None - self.maskbit = maskbit - self.remote = None - - def set_neighbor(self, link_maskbit): - self.neighbor = True - self.link_maskbit = link_maskbit - - def set_remote(self): - self.remote = True - - def clear_neighbor(self): - self.neighbor = None - self.link_maskbit = None - - def clear_remote(self): - self.remote = None - - def to_delete(self): - return self.neighbor == None and self.remote == None + def __init__(self, node_id, maskbit, neighbor): + self.id = node_id + self.maskbit = maskbit + self.neighbor = neighbor + self.remote = not neighbor diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py index 6fde822444..c2c623f980 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py @@ -55,7 +55,7 @@ class RouterEngine: self.domain = "domain" self.router_adapter = router_adapter self.log_adapter = LogAdapter("dispatch.router") - self.io_adapter = IoAdapter(self, "qdxrouter") + self.io_adapter = IoAdapter(self, ("qdxrouter", "qdxhello")) self.max_routers = max_routers self.id = router_id self.area = area @@ -71,14 +71,14 @@ class RouterEngine: ## ## Launch the sub-module engines ## + self.node_tracker = NodeTracker(self, self.max_routers) self.neighbor_engine = NeighborEngine(self) self.link_state_engine = LinkStateEngine(self) self.path_engine = PathEngine(self) self.mobile_address_engine = MobileAddressEngine(self) - self.routing_table_engine = RoutingTableEngine(self) + self.routing_table_engine = RoutingTableEngine(self, self.node_tracker) self.binding_engine = BindingEngine(self) self.adapter_engine = AdapterEngine(self) - self.node_tracker = NodeTracker(self, self.max_routers) @@ -278,7 +278,20 @@ class RouterEngine: self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid) self.node_tracker.lost_node(rid) - def node_updated(self, address, reachable, neighbor, link_bit, router_bit): - self.log(LOG_DEBUG, "Event: node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \ - (address, reachable, neighbor, link_bit, router_bit)) - self.router_adapter.node_updated(address, reachable, neighbor, link_bit, router_bit) + def add_neighbor_router(self, address, router_bit, link_bit): + self.log(LOG_DEBUG, "Event: add_neighbor_router: address=%s, router_bit=%d, link_bit=%d" % \ + (address, router_bit, link_bit)) + self.router_adapter.add_neighbor_router(address, router_bit, link_bit) + + def del_neighbor_router(self, router_bit): + self.log(LOG_DEBUG, "Event: del_neighbor_router: router_bit=%d" % router_bit) + self.router_adapter.del_neighbor_router(router_bit) + + def add_remote_router(self, address, router_bit): + self.log(LOG_DEBUG, "Event: add_remote_router: address=%s, router_bit=%d" % (address, router_bit)) + self.router_adapter.add_remote_router(address, router_bit) + + def del_remote_router(self, router_bit): + self.log(LOG_DEBUG, "Event: del_remote_router: router_bit=%d" % router_bit) + self.router_adapter.del_remote_router(router_bit) + diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py b/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py index 39b34bbc9b..af974b402a 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/routing.py @@ -27,8 +27,9 @@ class RoutingTableEngine(object): This module is responsible for converting the set of next hops to remote routers to a routing table in the "topological" address class. """ - def __init__(self, container): + def __init__(self, container, node_tracker): self.container = container + self.node_tracker = node_tracker self.id = self.container.id self.area = self.container.area self.next_hops = {} @@ -41,14 +42,10 @@ class RoutingTableEngine(object): def next_hops_changed(self, next_hops): # Convert next_hops into routing table self.next_hops = next_hops - new_table = [] for _id, next_hop in next_hops.items(): - new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop)) - pair = ('_topo.%s.all' % (self.area), next_hop) - if new_table.count(pair) == 0: - new_table.append(pair) - - self.container.remote_routes_changed('topological', new_table) + mb_id = self.node_tracker.maskbit_for_node(_id) + mb_nh = self.node_tracker.maskbit_for_node(next_hop) + self.container.router_adapter.set_next_hop(mb_id, mb_nh) def get_next_hops(self): diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c index 2063b8fcc4..3aad9f3427 100644 --- a/qpid/extras/dispatch/src/agent.c +++ b/qpid/extras/dispatch/src/agent.c @@ -295,7 +295,7 @@ dx_agent_class_t *dx_agent_register_class(dx_dispatch_t *dx, cls->query_handler = query_handler; dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL); - int result = hash_insert_const(agent->class_hash, iter, cls); + int result = hash_insert_const(agent->class_hash, iter, cls, 0); dx_field_iterator_free(iter); if (result < 0) assert(false); diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c index 3500e08406..123e2ffb60 100644 --- a/qpid/extras/dispatch/src/container.c +++ b/qpid/extras/dispatch/src/container.c @@ -507,7 +507,7 @@ int dx_container_register_node_type(dx_dispatch_t *dx, const dx_node_type_t *nt) nt_item->ntype = nt; sys_mutex_lock(container->lock); - result = hash_insert_const(container->node_type_map, iter, nt); + result = hash_insert_const(container->node_type_map, iter, nt, 0); DEQ_INSERT_TAIL(container->node_type_list, nt_item); sys_mutex_unlock(container->lock); @@ -565,7 +565,7 @@ dx_node_t *dx_container_create_node(dx_dispatch_t *dx, if (name) { dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL); sys_mutex_lock(container->lock); - result = hash_insert(container->node_map, iter, node); + result = hash_insert(container->node_map, iter, node, 0); sys_mutex_unlock(container->lock); dx_field_iterator_free(iter); if (result < 0) { diff --git a/qpid/extras/dispatch/src/hash.c b/qpid/extras/dispatch/src/hash.c index 1f7d8aa3f5..49f79925f6 100644 --- a/qpid/extras/dispatch/src/hash.c +++ b/qpid/extras/dispatch/src/hash.c @@ -52,6 +52,15 @@ struct hash_t { }; +struct hash_handle_t { + bucket_t *bucket; + hash_item_t *item; +}; + +ALLOC_DECLARE(hash_handle_t); +ALLOC_DEFINE(hash_handle_t); + + // djb2 hash algorithm static unsigned long hash_function(dx_field_iterator_t *iter) { @@ -102,7 +111,7 @@ size_t hash_size(hash_t *h) } -static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists) +static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *exists, hash_handle_t **handle) { unsigned long idx = hash_function(key) & h->bucket_mask; hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); @@ -115,6 +124,8 @@ static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, in if (item) { *exists = 1; + if (handle) + *handle = 0; return item; } @@ -128,14 +139,24 @@ static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, in DEQ_INSERT_TAIL(h->buckets[idx].items, item); h->size++; *exists = 0; + + // + // If a pointer to a handle-pointer was supplied, create a handle for this item. + // + if (handle) { + *handle = new_hash_handle_t(); + (*handle)->bucket = &h->buckets[idx]; + (*handle)->item = item; + } + return item; } -dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val) +dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val, hash_handle_t **handle) { int exists = 0; - hash_item_t *item = hash_internal_insert(h, key, &exists); + hash_item_t *item = hash_internal_insert(h, key, &exists, handle); if (!item) return DX_ERROR_ALLOC; @@ -149,12 +170,12 @@ dx_error_t hash_insert(hash_t *h, dx_field_iterator_t *key, void *val) } -dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val) +dx_error_t hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val, hash_handle_t **handle) { assert(h->is_const); int error = 0; - hash_item_t *item = hash_internal_insert(h, key, &error); + hash_item_t *item = hash_internal_insert(h, key, &error, handle); if (item) item->v.val_const = val; @@ -225,3 +246,31 @@ dx_error_t hash_remove(hash_t *h, dx_field_iterator_t *key) return DX_ERROR_NOT_FOUND; } + +void hash_handle_free(hash_handle_t *handle) +{ + if (handle) + free_hash_handle_t(handle); +} + + +const unsigned char *hash_key_by_handle(const hash_handle_t *handle) +{ + if (handle) + return handle->item->key; + return 0; +} + + +dx_error_t hash_remove_by_handle(hash_t *h, hash_handle_t *handle) +{ + if (!handle) + return DX_ERROR_NOT_FOUND; + free(handle->item->key); + DEQ_REMOVE(handle->bucket->items, handle->item); + free_hash_item_t(handle->item); + h->size--; + free_hash_handle_t(handle); + return DX_ERROR_NONE; +} + diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c index a3c41f86f5..c7c8f82dfc 100644 --- a/qpid/extras/dispatch/src/python_embedded.c +++ b/qpid/extras/dispatch/src/python_embedded.c @@ -398,7 +398,8 @@ typedef struct { PyObject *handler; PyObject *handler_rx_call; dx_dispatch_t *dx; - dx_address_t *address; + Py_ssize_t addr_count; + dx_address_t **addrs; } IoAdapter; @@ -469,25 +470,35 @@ static void dx_io_rx_handler(void *context, dx_message_t *msg, int link_id) static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds) { - const char *address; - if (!PyArg_ParseTuple(args, "Os", &self->handler, &address)) + PyObject *addrs; + if (!PyArg_ParseTuple(args, "OO", &self->handler, &addrs)) return -1; self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive"); if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call)) return -1; + if (!PyTuple_Check(addrs)) + return -1; + Py_INCREF(self->handler); Py_INCREF(self->handler_rx_call); - self->dx = dispatch; - self->address = dx_router_register_address(self->dx, address, dx_io_rx_handler, self); + self->dx = dispatch; + self->addr_count = PyTuple_Size(addrs); + self->addrs = NEW_PTR_ARRAY(dx_address_t, self->addr_count); + for (Py_ssize_t idx = 0; idx < self->addr_count; idx++) + self->addrs[idx] = dx_router_register_address(self->dx, + PyString_AS_STRING(PyTuple_GetItem(addrs, idx)), + dx_io_rx_handler, self); return 0; } static void IoAdapter_dealloc(IoAdapter* self) { - dx_router_unregister_address(self->address); + for (Py_ssize_t idx = 0; idx < self->addr_count; idx++) + dx_router_unregister_address(self->addrs[idx]); + free(self->addrs); Py_DECREF(self->handler); Py_DECREF(self->handler_rx_call); self->ob_type->tp_free((PyObject*)self); diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index fea3695620..35fb834029 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -28,8 +28,9 @@ static char *module = "ROUTER"; -static char *local_prefix = "_local/"; -static char *topo_prefix = "_topo/"; +static char *local_prefix = "_local/"; +static char *topo_prefix = "_topo/"; +static char *direct_prefix; /** * Address Types and Processing: @@ -55,7 +56,7 @@ ALLOC_DEFINE(dx_address_t); ALLOC_DEFINE(dx_router_conn_t); -static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) +void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) { dx_router_link_ref_t *ref = new_dx_router_link_ref_t(); DEQ_ITEM_INIT(ref); @@ -65,7 +66,7 @@ static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_ro } -static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) +void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link) { if (link->ref) { DEQ_REMOVE(*ref_list, link->ref); @@ -75,6 +76,31 @@ static void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_ro } +void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode) +{ + dx_router_ref_t *ref = new_dx_router_ref_t(); + DEQ_ITEM_INIT(ref); + ref->router = rnode; + rnode->ref_count++; + DEQ_INSERT_TAIL(*ref_list, ref); +} + + +void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode) +{ + dx_router_ref_t *ref = DEQ_HEAD(*ref_list); + while (ref) { + if (ref->router == rnode) { + DEQ_REMOVE(*ref_list, ref); + free_dx_router_ref_t(ref); + rnode->ref_count--; + break; + } + ref = DEQ_NEXT(ref); + } +} + + /** * Check an address to see if it no longer has any associated destinations. * Depending on its policy, the address may be eligible for being closed out @@ -262,10 +288,11 @@ static int router_writable_link_handler(void* context, dx_link_t *link) } -static void router_annotate_message(dx_router_t *router, dx_message_t *msg) +static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg) { - dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg); - dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0); + dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg); + dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0); + dx_field_iterator_t *ingress_iter = 0; dx_parsed_field_t *trace = 0; dx_parsed_field_t *ingress = 0; @@ -293,7 +320,7 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg) trace_item = dx_parse_sub_value(trace, idx); } - dx_compose_insert_string(out_da, router->router_id); + dx_compose_insert_string(out_da, direct_prefix); dx_compose_end_list(out_da); } @@ -303,15 +330,17 @@ static void router_annotate_message(dx_router_t *router, dx_message_t *msg) // dx_compose_insert_string(out_da, DX_DA_INGRESS); if (ingress && dx_parse_is_scalar(ingress)) { - dx_field_iterator_t *iter = dx_parse_raw(ingress); - dx_compose_insert_string_iterator(out_da, iter); + ingress_iter = dx_parse_raw(ingress); + dx_compose_insert_string_iterator(out_da, ingress_iter); } else - dx_compose_insert_string(out_da, router->router_id); + dx_compose_insert_string(out_da, direct_prefix); dx_compose_end_map(out_da); dx_message_set_delivery_annotations(msg, out_da); dx_compose_free(out_da); + + return ingress_iter; } @@ -394,7 +423,8 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); hash_retrieve(router->addr_hash, iter, (void*) &addr); dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); - int is_local = dx_field_iterator_prefix(iter, local_prefix); + int is_local = dx_field_iterator_prefix(iter, local_prefix); + int is_direct = dx_field_iterator_prefix(iter, direct_prefix); dx_field_iterator_free(iter); if (addr) { @@ -404,9 +434,10 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del // // - // Interpret and update the delivery annotations of the message + // Interpret and update the delivery annotations of the message. As a convenience, + // this function returns the iterator to the ingress field (if it exists). // - router_annotate_message(router, msg); + dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg); // // Forward to the in-process handler for this message if there is one. The @@ -446,31 +477,54 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del } // - // Forward to the next-hops for remote destinations. + // If the address form is direct to this router node, don't relay it on + // to any other part of the network. // - dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); - dx_router_link_t *dest_link; - while (dest_node_ref) { - if (dest_node_ref->router->next_hop) - dest_link = dest_node_ref->router->next_hop->peer_link; - else - dest_link = dest_node_ref->router->peer_link; - if (dest_link) { - dx_routed_event_t *re = new_dx_routed_event_t(); - DEQ_ITEM_INIT(re); - re->delivery = 0; - re->message = dx_message_copy(msg); - re->settle = 0; - re->disposition = 0; - DEQ_INSERT_TAIL(dest_link->msg_fifo, re); - - fanout++; - if (fanout == 1) - re->delivery = delivery; - - dx_link_activate(dest_link->link); + if (!is_direct) { + // + // Get the mask bit associated with the ingress router for the message. + // This will be compared against the "valid_origin" masks for each + // candidate destination router. + // + int origin = -1; + if (ingress_iter) { + dx_address_t *origin_addr; + hash_retrieve(router->addr_hash, ingress_iter, (void*) &origin_addr); + if (origin_addr && DEQ_SIZE(origin_addr->rnodes) == 1) { + dx_router_ref_t *rref = DEQ_HEAD(origin_addr->rnodes); + origin = rref->router->mask_bit; + } + } + + // + // Forward to the next-hops for remote destinations. + // + if (origin >= 0) { + dx_router_ref_t *dest_node_ref = DEQ_HEAD(addr->rnodes); + dx_router_link_t *dest_link; + while (dest_node_ref) { + if (dest_node_ref->router->next_hop) + dest_link = dest_node_ref->router->next_hop->peer_link; + else + dest_link = dest_node_ref->router->peer_link; + if (dest_link && dx_bitmask_value(dest_node_ref->router->valid_origins, origin)) { + dx_routed_event_t *re = new_dx_routed_event_t(); + DEQ_ITEM_INIT(re); + re->delivery = 0; + re->message = dx_message_copy(msg); + re->settle = 0; + re->disposition = 0; + DEQ_INSERT_TAIL(dest_link->msg_fifo, re); + + fanout++; + if (fanout == 1) + re->delivery = delivery; + + dx_link_activate(dest_link->link); + } + dest_node_ref = DEQ_NEXT(dest_node_ref); + } } - dest_node_ref = DEQ_NEXT(dest_node_ref); } } } @@ -633,10 +687,10 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) if (is_router) { // - // If this is a router link, put it in the router_address link-list. + // If this is a router link, put it in the hello_address link-list. // - dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink); - rlink->owning_addr = router->router_addr; + dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink); + rlink->owning_addr = router->hello_addr; router->out_links_by_mask_bit[rlink->mask_bit] = rlink; } else { @@ -667,7 +721,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) addr->handler_context = 0; DEQ_INIT(addr->rlinks); DEQ_INIT(addr->rnodes); - hash_insert(router->addr_hash, iter, addr); + hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); DEQ_INSERT_TAIL(router->addrs, addr); } dx_field_iterator_free(iter); @@ -806,7 +860,7 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co rlink->mask_bit = mask_bit; rlink->link_type = DX_LINK_ROUTER; rlink->link_direction = DX_OUTGOING; - rlink->owning_addr = router->router_addr; + rlink->owning_addr = router->hello_addr; rlink->link = sender; rlink->connected_link = 0; rlink->peer_link = 0; @@ -814,9 +868,9 @@ static void router_outbound_open_handler(void *type_context, dx_connection_t *co DEQ_INIT(rlink->msg_fifo); // - // Add the new outgoing link to the router_address's list of links. + // Add the new outgoing link to the hello_address's list of links. // - dx_router_add_link_ref_LH(&router->router_addr->rlinks, rlink); + dx_router_add_link_ref_LH(&router->hello_addr->rlinks, rlink); // // Index this link from the by-maskbit index so we can later find it quickly @@ -867,6 +921,14 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) dx_container_register_node_type(dx, &router_node); } + size_t dplen = 9 + strlen(area) + strlen(id); + direct_prefix = (char*) malloc(dplen); + strcpy(direct_prefix, "_topo/"); + strcat(direct_prefix, area); + strcat(direct_prefix, "/"); + strcat(direct_prefix, id); + strcat(direct_prefix, "/"); + dx_router_t *router = NEW(dx_router_t); router_node.type_context = router; @@ -883,8 +945,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) DEQ_INIT(router->routers); router->out_links_by_mask_bit = NEW_PTR_ARRAY(dx_router_link_t, dx_bitmask_width()); - for (int idx = 0; idx < dx_bitmask_width(); idx++) + router->routers_by_mask_bit = NEW_PTR_ARRAY(dx_router_node_t, dx_bitmask_width()); + for (int idx = 0; idx < dx_bitmask_width(); idx++) { router->out_links_by_mask_bit[idx] = 0; + router->routers_by_mask_bit[idx] = 0; + } router->neighbor_free_mask = dx_bitmask(1); router->lock = sys_mutex(); @@ -894,10 +959,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) router->pyTick = 0; // - // Create an address for all of the routers in the topology. It will be registered + // Create addresses for all of the routers in the topology. It will be registered // locally later in the initialization sequence. // router->router_addr = dx_router_register_address(dx, "qdxrouter", 0, 0); + router->hello_addr = dx_router_register_address(dx, "qdxhello", 0, 0); // // Inform the field iterator module of this router's id and area. The field iterator @@ -935,8 +1001,7 @@ void dx_router_free(dx_router_t *router) const char *dx_router_id(const dx_dispatch_t *dx) { - dx_router_t *router = dx->router; - return router->router_id; + return direct_prefix; } @@ -963,7 +1028,7 @@ dx_address_t *dx_router_register_address(dx_dispatch_t *dx, addr->handler_context = 0; DEQ_INIT(addr->rlinks); DEQ_INIT(addr->rnodes); - hash_insert(router->addr_hash, iter, addr); + hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); DEQ_ITEM_INIT(addr); DEQ_INSERT_TAIL(router->addrs, addr); } diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h index 7f1f7486fb..a21e4f1d5c 100644 --- a/qpid/extras/dispatch/src/router_private.h +++ b/qpid/extras/dispatch/src/router_private.h @@ -66,10 +66,11 @@ DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t); struct dx_router_node_t { DEQ_LINKS(dx_router_node_t); - const char *id; + dx_address_t *owning_addr; int mask_bit; dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node + uint32_t ref_count; dx_bitmask_t *valid_origins; }; @@ -107,6 +108,7 @@ struct dx_address_t { void *handler_context; // In-Process Consumer context dx_router_link_ref_list_t rlinks; // Locally-Connected Consumers dx_router_ref_list_t rnodes; // Remotely-Connected Consumers + hash_handle_t *hash_handle; // Linkage back to the hash table entry }; ALLOC_DECLARE(dx_address_t); @@ -122,10 +124,12 @@ struct dx_router_t { dx_address_list_t addrs; hash_t *addr_hash; dx_address_t *router_addr; + dx_address_t *hello_addr; dx_router_link_list_t links; dx_router_node_list_t routers; dx_router_link_t **out_links_by_mask_bit; + dx_router_node_t **routers_by_mask_bit; dx_bitmask_t *neighbor_free_mask; sys_mutex_t *lock; @@ -136,4 +140,12 @@ struct dx_router_t { PyObject *pyTick; }; + +void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link); +void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link); + +void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode); +void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode); + + #endif diff --git a/qpid/extras/dispatch/src/router_pynode.c b/qpid/extras/dispatch/src/router_pynode.c index 9930c5dde9..7ada6dad4b 100644 --- a/qpid/extras/dispatch/src/router_pynode.c +++ b/qpid/extras/dispatch/src/router_pynode.c @@ -34,34 +34,268 @@ typedef struct { } RouterAdapter; -static PyObject* dx_router_node_updated(PyObject *self, PyObject *args) +static char *dx_add_router(dx_router_t *router, const char *address, int router_maskbit, int link_maskbit) { - //RouterAdapter *adapter = (RouterAdapter*) self; - //dx_router_t *router = adapter->router; + if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) + return "Router bit mask out of range"; + + if (link_maskbit >= dx_bitmask_width() || link_maskbit < -1) + return "Link bit mask out of range"; + + sys_mutex_lock(router->lock); + if (router->routers_by_mask_bit[router_maskbit] != 0) { + sys_mutex_unlock(router->lock); + return "Adding router over already existing router"; + } + + if (link_maskbit >= 0 && router->out_links_by_mask_bit[link_maskbit] == 0) { + sys_mutex_unlock(router->lock); + return "Adding neighbor router with invalid link reference"; + } + + // + // Hash lookup the address to ensure there isn't an existing router address. + // + dx_field_iterator_t *iter = dx_field_iterator_string(address, ITER_VIEW_ADDRESS_HASH); + dx_address_t *addr; + + hash_retrieve(router->addr_hash, iter, (void**) &addr); + assert(addr == 0); + + // + // Create an address record for this router and insert it in the hash table. + // This record will be found whenever a "foreign" topological address to this + // remote router is looked up. + // + addr = new_dx_address_t(); + DEQ_ITEM_INIT(addr); + addr->handler = 0; + addr->handler_context = 0; + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); + hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); + DEQ_INSERT_TAIL(router->addrs, addr); + + // + // Create a router-node record to represent the remote router. + // + dx_router_node_t *rnode = new_dx_router_node_t(); + DEQ_ITEM_INIT(rnode); + rnode->owning_addr = addr; + rnode->mask_bit = router_maskbit; + rnode->next_hop = 0; + rnode->peer_link = 0; + rnode->ref_count = 0; + rnode->valid_origins = dx_bitmask(0); + + DEQ_INSERT_TAIL(router->routers, rnode); + + // + // Link the router record to the address record. + // + dx_router_add_node_ref_LH(&addr->rnodes, rnode); + + // + // Link the router record to the router address record. + // + dx_router_add_node_ref_LH(&router->router_addr->rnodes, rnode); + + // + // Add the router record to the mask-bit index. + // + router->routers_by_mask_bit[router_maskbit] = rnode; + + // + // If this is a neighbor router, add the peer_link reference to the + // router record. + // + if (link_maskbit >= 0) + rnode->peer_link = router->out_links_by_mask_bit[link_maskbit]; + + sys_mutex_unlock(router->lock); + return 0; +} + + +static char *dx_del_router(dx_router_t *router, int router_maskbit) +{ + if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) + return "Router bit mask out of range"; + + sys_mutex_lock(router->lock); + if (router->routers_by_mask_bit[router_maskbit] == 0) { + sys_mutex_unlock(router->lock); + return "Deleting nonexistent router"; + } + + dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit]; + dx_address_t *oaddr = rnode->owning_addr; + assert(oaddr); + + // + // Unlink the router node from the address record + // + dx_router_del_node_ref_LH(&oaddr->rnodes, rnode); + + // + // While the router node has a non-zero reference count, look for addresses + // to unlink the node from. + // + dx_address_t *addr = DEQ_HEAD(router->addrs); + while (addr && rnode->ref_count > 0) { + dx_router_del_node_ref_LH(&addr->rnodes, rnode); + addr = DEQ_NEXT(addr); + } + assert(rnode->ref_count == 0); + + // + // Free the router node and the owning address records. + // + dx_bitmask_free(rnode->valid_origins); + free_dx_router_node_t(rnode); + + hash_remove_by_handle(router->addr_hash, oaddr->hash_handle); + DEQ_REMOVE(router->addrs, oaddr); + hash_handle_free(oaddr->hash_handle); + router->routers_by_mask_bit[router_maskbit] = 0; + free_dx_address_t(oaddr); + + sys_mutex_unlock(router->lock); + return 0; +} + + +static PyObject* dx_add_remote_router(PyObject *self, PyObject *args) +{ + RouterAdapter *adapter = (RouterAdapter*) self; + dx_router_t *router = adapter->router; const char *address; - int is_reachable; - int is_neighbor; - int link_maskbit; int router_maskbit; - if (!PyArg_ParseTuple(args, "siiii", &address, &is_reachable, &is_neighbor, - &link_maskbit, &router_maskbit)) + if (!PyArg_ParseTuple(args, "si", &address, &router_maskbit)) return 0; - // TODO + char *error = dx_add_router(router, address, router_maskbit, -1); + if (error) { + PyErr_SetString(PyExc_Exception, error); + return 0; + } Py_INCREF(Py_None); return Py_None; } -static PyObject* dx_router_add_route(PyObject *self, PyObject *args) +static PyObject* dx_del_remote_router(PyObject *self, PyObject *args) +{ + RouterAdapter *adapter = (RouterAdapter*) self; + dx_router_t *router = adapter->router; + int router_maskbit; + + if (!PyArg_ParseTuple(args, "i", &router_maskbit)) + return 0; + + char *error = dx_del_router(router, router_maskbit); + if (error) { + PyErr_SetString(PyExc_Exception, error); + return 0; + } + + Py_INCREF(Py_None); + return Py_None; +} + + +static PyObject* dx_set_next_hop(PyObject *self, PyObject *args) +{ + RouterAdapter *adapter = (RouterAdapter*) self; + dx_router_t *router = adapter->router; + int router_maskbit; + int next_hop_maskbit; + + if (!PyArg_ParseTuple(args, "ii", &router_maskbit, &next_hop_maskbit)) + return 0; + + if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) { + PyErr_SetString(PyExc_Exception, "Router bit mask out of range"); + return 0; + } + + if (next_hop_maskbit >= dx_bitmask_width() || next_hop_maskbit < 0) { + PyErr_SetString(PyExc_Exception, "Next Hop bit mask out of range"); + return 0; + } + + if (router->routers_by_mask_bit[router_maskbit] == 0) { + PyErr_SetString(PyExc_Exception, "Router Not Found"); + return 0; + } + + if (router->routers_by_mask_bit[next_hop_maskbit] == 0) { + PyErr_SetString(PyExc_Exception, "Next Hop Not Found"); + return 0; + } + + if (router_maskbit != next_hop_maskbit) { + dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit]; + rnode->next_hop = router->routers_by_mask_bit[next_hop_maskbit]; + } + + Py_INCREF(Py_None); + return Py_None; +} + + +static PyObject* dx_add_neighbor_router(PyObject *self, PyObject *args) +{ + RouterAdapter *adapter = (RouterAdapter*) self; + dx_router_t *router = adapter->router; + const char *address; + int router_maskbit; + int link_maskbit; + + if (!PyArg_ParseTuple(args, "sii", &address, &router_maskbit, &link_maskbit)) + return 0; + + char *error = dx_add_router(router, address, router_maskbit, link_maskbit); + if (error) { + PyErr_SetString(PyExc_Exception, error); + return 0; + } + + Py_INCREF(Py_None); + return Py_None; +} + + +static PyObject* dx_del_neighbor_router(PyObject *self, PyObject *args) +{ + RouterAdapter *adapter = (RouterAdapter*) self; + dx_router_t *router = adapter->router; + int router_maskbit; + + if (!PyArg_ParseTuple(args, "i", &router_maskbit)) + return 0; + + char *error = dx_del_router(router, router_maskbit); + if (error) { + PyErr_SetString(PyExc_Exception, error); + return 0; + } + + Py_INCREF(Py_None); + return Py_None; +} + + +static PyObject* dx_map_destination(PyObject *self, PyObject *args) { //RouterAdapter *adapter = (RouterAdapter*) self; - const char *addr; - const char *peer; + //dx_router_t *router = adapter->router; + const char *addr; + int router_maskbit; - if (!PyArg_ParseTuple(args, "ss", &addr, &peer)) + if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit)) return 0; // TODO @@ -71,13 +305,14 @@ static PyObject* dx_router_add_route(PyObject *self, PyObject *args) } -static PyObject* dx_router_del_route(PyObject *self, PyObject *args) +static PyObject* dx_unmap_destination(PyObject *self, PyObject *args) { //RouterAdapter *adapter = (RouterAdapter*) self; - const char *addr; - const char *peer; + //dx_router_t *router = adapter->router; + const char *addr; + int router_maskbit; - if (!PyArg_ParseTuple(args, "ss", &addr, &peer)) + if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit)) return 0; // TODO @@ -88,9 +323,13 @@ static PyObject* dx_router_del_route(PyObject *self, PyObject *args) static PyMethodDef RouterAdapter_methods[] = { - {"node_updated", dx_router_node_updated, METH_VARARGS, "Update the status of a remote router node"}, - {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"}, - {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"}, + {"add_remote_router", dx_add_remote_router, METH_VARARGS, "A new remote/reachable router has been discovered"}, + {"del_remote_router", dx_del_remote_router, METH_VARARGS, "We've lost reachability to a remote router"}, + {"set_next_hop", dx_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"}, + {"add_neighbor_router", dx_add_neighbor_router, METH_VARARGS, "A new neighbor router has been discovered"}, + {"del_neighbor_router", dx_del_neighbor_router, METH_VARARGS, "We've lost reachability to a neighbor router"}, + {"map_destination", dx_map_destination, METH_VARARGS, "Add a newly discovered destination mapping"}, + {"unmap_destination", dx_unmap_destination, METH_VARARGS, "Delete a destination mapping"}, {0, 0, 0, 0} }; diff --git a/qpid/extras/dispatch/tests/field_test.c b/qpid/extras/dispatch/tests/field_test.c index 59fc3c4cfb..33f9fe5d03 100644 --- a/qpid/extras/dispatch/tests/field_test.c +++ b/qpid/extras/dispatch/tests/field_test.c @@ -115,6 +115,10 @@ static char* test_view_address_hash(void *context) {"amqp:/_topo/my-area/all/local/sub", "Llocal/sub"}, {"amqp:/_topo/all/all/local/sub", "Llocal/sub"}, {"amqp://host:port/_local/my-addr", "Lmy-addr"}, + {"_topo/area/router/my-addr", "Aarea"}, + {"_topo/my-area/router/my-addr", "Rrouter"}, + {"_topo/my-area/my-router/my-addr", "Lmy-addr"}, + {"_topo/my-area/router", "Rrouter"}, {0, 0} }; int idx; diff --git a/qpid/extras/dispatch/tests/router_engine_test.py b/qpid/extras/dispatch/tests/router_engine_test.py index 7e3940fda4..47917b0f5f 100644 --- a/qpid/extras/dispatch/tests/router_engine_test.py +++ b/qpid/extras/dispatch/tests/router_engine_test.py @@ -97,62 +97,74 @@ class NodeTrackerTest(unittest.TestCase): def log(self, level, text): pass - def node_updated(self, address, reachable, neighbor, link_bit, router_bit): + def add_neighbor_router(self, address, router_bit, link_bit): self.address = address - self.reachable = reachable - self.neighbor = neighbor + self.router_bit = router_bit self.link_bit = link_bit + self.calls += 1 + + def del_neighbor_router(self, router_bit): + self.address = None self.router_bit = router_bit + self.link_bit = None + self.calls += 1 - def reset(self): + def add_remote_router(self, address, router_bit): + self.address = address + self.router_bit = router_bit + self.link_bit = None + self.calls += 1 + + def del_remote_router(self, router_bit): self.address = None - self.reachable = None - self.neighbor = None + self.router_bit = router_bit self.link_bit = None + self.calls += 1 + + def reset(self): + self.address = None self.router_bit = None + self.link_bit = None + self.area = "area" + self.calls = 0 def test_node_tracker_limits(self): tracker = NodeTracker(self, 5) self.reset() tracker.new_neighbor('A', 1) - self.assertEqual(self.address, 'RA') - self.assertTrue(self.reachable) - self.assertTrue(self.neighbor) + self.assertEqual(self.address, 'amqp:/_topo/area/A') self.assertEqual(self.link_bit, 1) self.assertEqual(self.router_bit, 0) + self.assertEqual(self.calls, 1) self.reset() tracker.new_neighbor('B', 5) - self.assertEqual(self.address, 'RB') - self.assertTrue(self.reachable) - self.assertTrue(self.neighbor) + self.assertEqual(self.address, 'amqp:/_topo/area/B') self.assertEqual(self.link_bit, 5) self.assertEqual(self.router_bit, 1) + self.assertEqual(self.calls, 1) self.reset() tracker.new_neighbor('C', 6) - self.assertEqual(self.address, 'RC') - self.assertTrue(self.reachable) - self.assertTrue(self.neighbor) + self.assertEqual(self.address, 'amqp:/_topo/area/C') self.assertEqual(self.link_bit, 6) self.assertEqual(self.router_bit, 2) + self.assertEqual(self.calls, 1) self.reset() tracker.new_neighbor('D', 7) - self.assertEqual(self.address, 'RD') - self.assertTrue(self.reachable) - self.assertTrue(self.neighbor) + self.assertEqual(self.address, 'amqp:/_topo/area/D') self.assertEqual(self.link_bit, 7) self.assertEqual(self.router_bit, 3) + self.assertEqual(self.calls, 1) self.reset() tracker.new_neighbor('E', 8) - self.assertEqual(self.address, 'RE') - self.assertTrue(self.reachable) - self.assertTrue(self.neighbor) + self.assertEqual(self.address, 'amqp:/_topo/area/E') self.assertEqual(self.link_bit, 8) self.assertEqual(self.router_bit, 4) + self.assertEqual(self.calls, 1) self.reset() try: @@ -163,16 +175,15 @@ class NodeTrackerTest(unittest.TestCase): self.reset() tracker.lost_neighbor('C') - self.assertEqual(self.address, 'RC') - self.assertFalse(self.reachable) + self.assertEqual(self.router_bit, 2) + self.assertEqual(self.calls, 1) self.reset() tracker.new_neighbor('F', 9) - self.assertEqual(self.address, 'RF') - self.assertTrue(self.reachable) - self.assertTrue(self.neighbor) + self.assertEqual(self.address, 'amqp:/_topo/area/F') self.assertEqual(self.link_bit, 9) self.assertEqual(self.router_bit, 2) + self.assertEqual(self.calls, 1) def test_node_tracker_remote_neighbor(self): @@ -180,32 +191,29 @@ class NodeTrackerTest(unittest.TestCase): self.reset() tracker.new_node('A') - self.assertEqual(self.address, 'RA') - self.assertTrue(self.reachable) - self.assertFalse(self.neighbor) + self.assertEqual(self.address, 'amqp:/_topo/area/A') self.assertFalse(self.link_bit) self.assertEqual(self.router_bit, 0) + self.assertEqual(self.calls, 1) self.reset() tracker.new_neighbor('A', 3) - self.assertEqual(self.address, 'RA') - self.assertTrue(self.reachable) - self.assertTrue(self.neighbor) + self.assertEqual(self.address, 'amqp:/_topo/area/A') self.assertEqual(self.link_bit, 3) self.assertEqual(self.router_bit, 0) + self.assertEqual(self.calls, 2) self.reset() tracker.lost_node('A') - self.assertEqual(self.address, 'RA') - self.assertTrue(self.reachable) - self.assertTrue(self.neighbor) - self.assertEqual(self.link_bit, 3) - self.assertEqual(self.router_bit, 0) + self.assertFalse(self.address) + self.assertFalse(self.link_bit) + self.assertFalse(self.router_bit) + self.assertEqual(self.calls, 0) self.reset() tracker.lost_neighbor('A') - self.assertEqual(self.address, 'RA') - self.assertFalse(self.reachable) + self.assertEqual(self.router_bit, 0) + self.assertEqual(self.calls, 1) def test_node_tracker_neighbor_remote(self): @@ -213,32 +221,28 @@ class NodeTrackerTest(unittest.TestCase): self.reset() tracker.new_neighbor('A', 3) - self.assertEqual(self.address, 'RA') - self.assertTrue(self.reachable) - self.assertTrue(self.neighbor) + self.assertEqual(self.address, 'amqp:/_topo/area/A') self.assertEqual(self.link_bit, 3) self.assertEqual(self.router_bit, 0) + self.assertEqual(self.calls, 1) self.reset() tracker.new_node('A') - self.assertEqual(self.address, 'RA') - self.assertTrue(self.reachable) - self.assertTrue(self.neighbor) - self.assertEqual(self.link_bit, 3) - self.assertEqual(self.router_bit, 0) + self.assertFalse(self.address) + self.assertFalse(self.link_bit) + self.assertFalse(self.router_bit) + self.assertEqual(self.calls, 0) self.reset() tracker.lost_neighbor('A') - self.assertEqual(self.address, 'RA') - self.assertTrue(self.reachable) - self.assertFalse(self.neighbor) - self.assertFalse(self.link_bit) + self.assertEqual(self.address, 'amqp:/_topo/area/A') self.assertEqual(self.router_bit, 0) + self.assertEqual(self.calls, 2) self.reset() tracker.lost_node('A') - self.assertEqual(self.address, 'RA') - self.assertFalse(self.reachable) + self.assertEqual(self.router_bit, 0) + self.assertEqual(self.calls, 1) class NeighborTest(unittest.TestCase): @@ -274,7 +278,7 @@ class NeighborTest(unittest.TestCase): self.engine.tick(1.5) self.assertEqual(len(self.sent), 1) dest, msg = self.sent.pop(0) - self.assertEqual(dest, "_local/qdxrouter") + self.assertEqual(dest, "amqp:/_local/qdxhello") self.assertEqual(msg.get_opcode(), "HELLO") self.assertEqual(msg.id, self.id) self.assertEqual(msg.area, self.area) diff --git a/qpid/extras/dispatch/tests/system_tests_one_router.py b/qpid/extras/dispatch/tests/system_tests_one_router.py index b25568d283..8d486e2b85 100644 --- a/qpid/extras/dispatch/tests/system_tests_one_router.py +++ b/qpid/extras/dispatch/tests/system_tests_one_router.py @@ -310,7 +310,7 @@ class RouterTest(unittest.TestCase): self.assertEqual(i, rm.body['number']) da = rm.instructions self.assertEqual(da.__class__, dict) - self.assertEqual(da['qdx.ingress'], 'Qpid.Dispatch.Router.A') + self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/') self.assertFalse('qdx.trace' in da) ## @@ -346,7 +346,7 @@ class RouterTest(unittest.TestCase): self.assertEqual(i, rm.body['number']) da = rm.instructions self.assertEqual(da.__class__, dict) - self.assertEqual(da['qdx.ingress'], 'Qpid.Dispatch.Router.A') + self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/') self.assertFalse('qdx.trace' in da) ## @@ -364,8 +364,8 @@ class RouterTest(unittest.TestCase): self.assertEqual(i, rm.body['number']) da = rm.instructions self.assertEqual(da.__class__, dict) - self.assertEqual(da['qdx.ingress'], 'Qpid.Dispatch.Router.A') - self.assertEqual(da['qdx.trace'], ['Qpid.Dispatch.Router.A']) + self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/') + self.assertEqual(da['qdx.trace'], ['_topo/area/Qpid.Dispatch.Router.A/']) ## ## Non-empty trace @@ -382,8 +382,8 @@ class RouterTest(unittest.TestCase): self.assertEqual(i, rm.body['number']) da = rm.instructions self.assertEqual(da.__class__, dict) - self.assertEqual(da['qdx.ingress'], 'Qpid.Dispatch.Router.A') - self.assertEqual(da['qdx.trace'], ['first.hop', 'Qpid.Dispatch.Router.A']) + self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/') + self.assertEqual(da['qdx.trace'], ['first.hop', '_topo/area/Qpid.Dispatch.Router.A/']) M1.stop() M2.stop() diff --git a/qpid/extras/dispatch/tests/tworouters-A.conf b/qpid/extras/dispatch/tests/tworouters-A.conf new file mode 100644 index 0000000000..b4a5770c2d --- /dev/null +++ b/qpid/extras/dispatch/tests/tworouters-A.conf @@ -0,0 +1,54 @@ +## +## Licensed to the Apache Software Foundation (ASF) under one +## or more contributor license agreements. See the NOTICE file +## distributed with this work for additional information +## regarding copyright ownership. The ASF licenses this file +## to you under the Apache License, Version 2.0 (the +## "License"); you may not use this file except in compliance +## with the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, +## software distributed under the License is distributed on an +## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +## KIND, either express or implied. See the License for the +## specific language governing permissions and limitations +## under the License +## + + +## +## Container section - Configures the general operation of the AMQP container. +## +container { + ## + ## worker-threads - The number of threads that will be created to + ## process message traffic and other application work (timers, non-amqp + ## file descriptors, etc.) + ## + ## The number of threads should be related to the number of available + ## processor cores. To fully utilize a quad-core system, set the + ## number of threads to 4. + ## + worker-threads: 4 + + ## + ## container-name - The name of the AMQP container. If not specified, + ## the container name will be set to a value of the container's + ## choosing. The automatically assigned container name is not + ## guaranteed to be persistent across restarts of the container. + ## + container-name: Qpid.Dispatch.Router.A +} + + +## +## Listeners and Connectors +## +listener { + addr: 0.0.0.0 + port: 20001 + sasl-mechanisms: ANONYMOUS +} + diff --git a/qpid/extras/dispatch/tests/tworouters-B.conf b/qpid/extras/dispatch/tests/tworouters-B.conf new file mode 100644 index 0000000000..d56bd45078 --- /dev/null +++ b/qpid/extras/dispatch/tests/tworouters-B.conf @@ -0,0 +1,61 @@ +## +## Licensed to the Apache Software Foundation (ASF) under one +## or more contributor license agreements. See the NOTICE file +## distributed with this work for additional information +## regarding copyright ownership. The ASF licenses this file +## to you under the Apache License, Version 2.0 (the +## "License"); you may not use this file except in compliance +## with the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, +## software distributed under the License is distributed on an +## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +## KIND, either express or implied. See the License for the +## specific language governing permissions and limitations +## under the License +## + + +## +## Container section - Configures the general operation of the AMQP container. +## +container { + ## + ## worker-threads - The number of threads that will be created to + ## process message traffic and other application work (timers, non-amqp + ## file descriptors, etc.) + ## + ## The number of threads should be related to the number of available + ## processor cores. To fully utilize a quad-core system, set the + ## number of threads to 4. + ## + worker-threads: 4 + + ## + ## container-name - The name of the AMQP container. If not specified, + ## the container name will be set to a value of the container's + ## choosing. The automatically assigned container name is not + ## guaranteed to be persistent across restarts of the container. + ## + container-name: Qpid.Dispatch.Router.B +} + + +## +## Listeners and Connectors +## +listener { + addr: 0.0.0.0 + port: 20002 + sasl-mechanisms: ANONYMOUS +} + +connector { + label: Router Uplink + addr: 0.0.0.0 + port: 20001 + sasl-mechanisms: ANONYMOUS +} + |
