diff options
| -rw-r--r-- | qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py | 99 | ||||
| -rw-r--r-- | qpid/extras/dispatch/python/qpid/dispatch/router/binding.py | 134 | ||||
| -rw-r--r-- | qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py | 87 | ||||
| -rw-r--r-- | qpid/extras/dispatch/python/qpid/dispatch/router/node.py | 28 | ||||
| -rw-r--r-- | qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py | 37 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_agent.c | 2 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 51 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_private.h | 9 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_pynode.c | 129 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/system_tests_one_router.py | 4 | ||||
| -rwxr-xr-x | qpid/extras/dispatch/tools/src/py/qdstat | 7 |
11 files changed, 249 insertions, 338 deletions
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py b/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py deleted file mode 100644 index d21f834751..0000000000 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py +++ /dev/null @@ -1,99 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -try: - from dispatch import * -except ImportError: - from ..stubs import * - -ENTRY_OLD = 1 -ENTRY_CURRENT = 2 -ENTRY_NEW = 3 - -class AdapterEngine(object): - """ - This module is responsible for managing the Adapter's key bindings (list of address-subject:next-hop). - Key binding lists are kept in disjoint key-classes that can come from different parts of the router - (i.e. topological keys for inter-router communication and mobile keys for end users). - - For each key-class, a mirror copy of what the adapter has is kept internally. This allows changes to the - routing tables to be efficiently communicated to the adapter in the form of table deltas. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.key_classes = {} # map [key_class] => (addr-key, next-hop) - - - def tick(self, now): - """ - There is no periodic processing needed for this module. - """ - pass - - - def remote_routes_changed(self, key_class, new_table): - old_table = [] - if key_class in self.key_classes: - old_table = self.key_classes[key_class] - - # flag all of the old entries - old_flags = {} - for a,b in old_table: - old_flags[(a,b)] = ENTRY_OLD - - # flag the new entries - new_flags = {} - for a,b in new_table: - new_flags[(a,b)] = ENTRY_NEW - - # calculate the differences from old to new - for a,b in new_table: - if old_table.count((a,b)) > 0: - old_flags[(a,b)] = ENTRY_CURRENT - new_flags[(a,b)] = ENTRY_CURRENT - - # make to_add and to_delete lists - to_add = [] - to_delete = [] - for (a,b),f in old_flags.items(): - if f == ENTRY_OLD: - to_delete.append((a,b)) - for (a,b),f in new_flags.items(): - if f == ENTRY_NEW: - to_add.append((a,b)) - - # set the routing table to the new contents - self.key_classes[key_class] = new_table - - # update the adapter's routing tables - # Note: Do deletions before adds to avoid overlapping routes that may cause - # messages to be duplicated. It's better to have gaps in the routing - # tables momentarily because unroutable messages are stored for retry. - for a,b in to_delete: - self.container.router_adapter.remote_unbind(a, b) - for a,b in to_add: - self.container.router_adapter.remote_bind(a, b) - - self.container.log(LOG_INFO, "New Routing Table (class=%s):" % key_class) - for a,b in new_table: - self.container.log(LOG_INFO, " %s => %s" % (a, b)) - - diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py b/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py deleted file mode 100644 index db62f6e8a5..0000000000 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py +++ /dev/null @@ -1,134 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -try: - from dispatch import * -except ImportError: - from ..stubs import * - - -class BindingEngine(object): - """ - This module is responsible for responding to two different events: - 1) The learning of new remote mobile addresses - 2) The change of topology (i.e. different next-hops for remote routers) - When these occur, this module converts the mobile routing table (address => router) - to a next-hop routing table (address => next-hop), compresses the keys in case there - are wild-card overlaps, and notifies outbound of changes in the "mobile-key" address class. - """ - def __init__(self, container): - self.container = container - self.id = self.container.id - self.area = self.container.area - self.current_keys = {} - - - def tick(self, now): - pass - - - def mobile_keys_changed(self, keys): - self.current_keys = keys - next_hop_keys = self._convert_ids_to_next_hops(keys) - routing_table = self._compress_keys(next_hop_keys) - self.container.remote_routes_changed('mobile-key', routing_table) - - - def next_hops_changed(self): - next_hop_keys = self._convert_ids_to_next_hops(self.current_keys) - routing_table = self._compress_keys(next_hop_keys) - self.container.remote_routes_changed('mobile-key', routing_table) - - - def _convert_ids_to_next_hops(self, keys): - next_hops = self.container.get_next_hops() - new_keys = {} - for _id, value in keys.items(): - if _id in next_hops: - next_hop = next_hops[_id] - if next_hop not in new_keys: - new_keys[next_hop] = [] - new_keys[next_hop].extend(value) - return new_keys - - - def _compress_keys(self, keys): - trees = {} - for _id, key_list in keys.items(): - trees[_id] = TopicElementList() - for key in key_list: - trees[_id].add_key(key) - routing_table = [] - for _id, tree in trees.items(): - tree_keys = tree.get_list() - for tk in tree_keys: - routing_table.append((tk, _id)) - return routing_table - - -class TopicElementList(object): - """ - """ - def __init__(self): - self.elements = {} # map text => (terminal, sub-list) - - def __repr__(self): - return "%r" % self.elements - - def add_key(self, key): - self.add_tokens(key.split('.')) - - def add_tokens(self, tokens): - first = tokens.pop(0) - terminal = len(tokens) == 0 - - if terminal and first == '#': - ## Optimization #1A (A.B.C.D followed by A.B.#) - self.elements = {'#':(True, TopicElementList())} - return - - if '#' in self.elements: - _t,_el = self.elements['#'] - if _t: - ## Optimization #1B (A.B.# followed by A.B.C.D) - return - - if first not in self.elements: - self.elements[first] = (terminal, TopicElementList()) - else: - _t,_el = self.elements[first] - if terminal and not _t: - self.elements[first] = (terminal, _el) - - if not terminal: - _t,_el = self.elements[first] - _el.add_tokens(tokens) - - def get_list(self): - keys = [] - for token, (_t,_el) in self.elements.items(): - if _t: keys.append(token) - _el.build_list(token, keys) - return keys - - def build_list(self, prefix, keys): - for token, (_t,_el) in self.elements.items(): - if _t: keys.append("%s.%s" % (prefix, token)) - _el.build_list("%s.%s" % (prefix, token), keys) - diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py index 0707f7f250..0e27910e42 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py @@ -31,18 +31,18 @@ class MobileAddressEngine(object): Note that this routing table maps from the mobile address to the remote router where that address is directly bound. """ - def __init__(self, container): + def __init__(self, container, node_tracker): self.container = container + self.node_tracker = node_tracker self.id = self.container.id self.area = self.container.area self.mobile_addr_max_age = self.container.config.mobile_addr_max_age self.mobile_seq = 0 - self.local_keys = [] - self.added_keys = [] - self.deleted_keys = [] - self.remote_lists = {} # map router_id => (sequence, list of keys) + self.local_addrs = [] + self.added_addrs = [] + self.deleted_addrs = [] + self.remote_lists = {} # map router_id => (sequence, list of addrs) self.remote_last_seen = {} # map router_id => time of last seen advertizement/update - self.remote_changed = False self.needed_mars = {} @@ -51,48 +51,41 @@ class MobileAddressEngine(object): self._send_mars() ## - ## If local keys have changed, collect the changes and send a MAU with the diffs + ## If local addrs have changed, collect the changes and send a MAU with the diffs ## Note: it is important that the differential-MAU be sent before a RA is sent ## - if len(self.added_keys) > 0 or len(self.deleted_keys) > 0: + if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0: self.mobile_seq += 1 self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area, - MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys)) - self.local_keys.extend(self.added_keys) - for key in self.deleted_keys: - self.local_keys.remove(key) - self.added_keys = [] - self.deleted_keys = [] + MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_addrs, self.deleted_addrs)) + self.local_addrs.extend(self.added_addrs) + for addr in self.deleted_addrs: + self.local_addrs.remove(addr) + self.added_addrs = [] + self.deleted_addrs = [] self.container.mobile_sequence_changed(self.mobile_seq) - ## - ## If remotes have changed, start the process of updating local bindings - ## - if self.remote_changed: - self.remote_changed = False - self._update_remote_keys() - - def add_local_address(self, key): + def add_local_address(self, addr): """ """ - if self.local_keys.count(key) == 0: - if self.added_keys.count(key) == 0: - self.added_keys.append(key) + if self.local_addrs.count(addr) == 0: + if self.added_addrs.count(addr) == 0: + self.added_addrs.append(addr) else: - if self.deleted_keys.count(key) > 0: - self.deleted_keys.remove(key) + if self.deleted_addrs.count(addr) > 0: + self.deleted_addrs.remove(addr) - def del_local_address(self, key): + def del_local_address(self, addr): """ """ - if self.local_keys.count(key) > 0: - if self.deleted_keys.count(key) == 0: - self.deleted_keys.append(key) + if self.local_addrs.count(addr) > 0: + if self.deleted_addrs.count(addr) == 0: + self.deleted_addrs.append(addr) else: - if self.added_keys.count(key) > 0: - self.added_keys.remove(key) + if self.added_addrs.count(addr) > 0: + self.added_addrs.remove(addr) def handle_ra(self, msg, now): @@ -131,7 +124,8 @@ class MobileAddressEngine(object): return self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list) self.remote_last_seen[msg.id] = now - self.remote_changed = True + (add_list, del_list) = self.node_tracker.overwrite_addresses(msg.id, msg.exist_list) + self._activate_remotes(msg.id, add_list, del_list) else: ## ## Differential MAU @@ -148,10 +142,12 @@ class MobileAddressEngine(object): if msg.add_list and msg.add_list.__class__ == list: _list.extend(msg.add_list) if msg.del_list and msg.del_list.__class__ == list: - for key in msg.del_list: - _list.remove(key) + for addr in msg.del_list: + _list.remove(addr) self.remote_lists[msg.id] = (msg.mobile_seq, _list) - self.remote_changed = True + self.node_tracker.add_addresses(msg.id, msg.add_list) + self.node_tracker.del_addresses(msg.id, msg.del_list) + self._activate_remotes(msg.id, msg.add_list, msg.del_list) else: self.needed_mars[(msg.id, msg.area, _seq)] = None else: @@ -163,14 +159,7 @@ class MobileAddressEngine(object): return if msg.have_seq < self.mobile_seq: self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id), - MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys)) - - - def _update_remote_keys(self): - keys = {} - for _id,(seq,key_list) in self.remote_lists.items(): - keys[_id] = key_list - self.container.mobile_keys_changed(keys) + MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_addrs)) def _expire_remotes(self, now): @@ -186,3 +175,11 @@ class MobileAddressEngine(object): self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageMAR(None, self.id, self.area, _seq)) self.needed_mars = {} + + def _activate_remotes(self, _id, added, deleted): + bit = self.node_tracker.maskbit_for_node(_id) + for a in added: + self.container.router_adapter.map_destination(a, bit) + for d in deleted: + self.container.router_adapter.unmap_destination(d, bit) + diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py index f1abb5702a..ef697428da 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py @@ -113,6 +113,33 @@ class NodeTracker(object): return None + def add_addresses(self, node_id, addrs): + node = self.nodes[node_id] + for a in addrs: + node.addrs[a] = 1 + + + def del_addresses(self, node_id, addrs): + node = self.nodes[node_id] + for a in addrs: + node.addrs.pop(a) + + + def overwrite_addresses(self, node_id, addrs): + node = self.nodes[node_id] + added = [] + deleted = [] + for a in addrs: + if a not in node.addrs.keys(): + added.append(a) + for a in node.addrs.keys(): + if a not in addrs: + deleted.append(a) + for a in addrs: + node.addrs[a] = 1 + return (added, deleted) + + def _allocate_maskbit(self): if self.next_maskbit == None: raise Exception("Exceeded Maximum Router Count") @@ -143,4 +170,5 @@ class RemoteNode(object): self.maskbit = maskbit self.neighbor = neighbor self.remote = not neighbor + self.addrs = {} # Address => Count at Node (1 only for the present) diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py index d5250872b2..5128d175a6 100644 --- a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py +++ b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py @@ -27,8 +27,6 @@ from link import LinkStateEngine from path import PathEngine from mobile import MobileAddressEngine from routing import RoutingTableEngine -from binding import BindingEngine -from adapter import AdapterEngine from node import NodeTracker ## @@ -75,10 +73,8 @@ class RouterEngine: self.neighbor_engine = NeighborEngine(self) self.link_state_engine = LinkStateEngine(self) self.path_engine = PathEngine(self) - self.mobile_address_engine = MobileAddressEngine(self) + self.mobile_address_engine = MobileAddressEngine(self, self.node_tracker) self.routing_table_engine = RoutingTableEngine(self, self.node_tracker) - self.binding_engine = BindingEngine(self) - self.adapter_engine = AdapterEngine(self) @@ -92,24 +88,26 @@ class RouterEngine: return self.id - def addLocalAddress(self, key): + def addressAdded(self, addr): """ """ try: - if key.find('_topo') == 0 or key.find('_local') == 0: + if addr.find('Mtemp.') == 0: return - self.mobile_address_engine.add_local_address(key) + if addr.find('M') == 0: + self.mobile_address_engine.add_local_address(addr[1:]) except Exception, e: self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e) - def delLocalAddress(self, key): + def addressRemoved(self, addr): """ """ try: - if key.find('_topo') == 0 or key.find('_local') == 0: + if addr.find('Mtemp.') == 0: return - self.mobile_address_engine.del_local_address(key) + if key.find('M') == 0: + self.mobile_address_engine.del_local_address(addr[1:]) except Exception, e: self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e) @@ -124,8 +122,6 @@ class RouterEngine: self.path_engine.tick(now) self.mobile_address_engine.tick(now) self.routing_table_engine.tick(now) - self.binding_engine.tick(now) - self.adapter_engine.tick(now) self.node_tracker.tick(now) except Exception, e: self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e) @@ -190,14 +186,10 @@ class RouterEngine: return { 'help' : "Get list of supported values for kind", 'link-state' : "This router's link state", 'link-state-set' : "The set of link states from known routers", - 'next-hops' : "Next hops to each known router", - 'topo-table' : "Topological routing table", - 'mobile-table' : "Mobile key routing table" + 'next-hops' : "Next hops to each known router" } if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict() if kind == 'next-hops' : return self.routing_table_engine.next_hops - if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']} - if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']} if kind == 'link-state-set' : copy = {} for _id,_ls in self.link_state_engine.collection.items(): @@ -249,7 +241,6 @@ class RouterEngine: def next_hops_changed(self, next_hop_table): self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table) self.routing_table_engine.next_hops_changed(next_hop_table) - self.binding_engine.next_hops_changed() def valid_origins_changed(self, valid_origins): self.log(LOG_DEBUG, "Event: valid_origins_changed: %r" % valid_origins) @@ -259,17 +250,9 @@ class RouterEngine: self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq) self.link_state_engine.set_mobile_sequence(mobile_seq) - def mobile_keys_changed(self, keys): - self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys) - self.binding_engine.mobile_keys_changed(keys) - def get_next_hops(self): return self.routing_table_engine.get_next_hops() - def remote_routes_changed(self, key_class, routes): - self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes)) - self.adapter_engine.remote_routes_changed(key_class, routes) - def new_neighbor(self, rid, link_id): self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d" % (rid, link_id)) self.node_tracker.new_neighbor(rid, link_id) diff --git a/qpid/extras/dispatch/src/router_agent.c b/qpid/extras/dispatch/src/router_agent.c index 0dee4841d5..d4b719732c 100644 --- a/qpid/extras/dispatch/src/router_agent.c +++ b/qpid/extras/dispatch/src/router_agent.c @@ -140,6 +140,8 @@ static void dx_router_query_address(dx_router_t *router, void *cor) dx_agent_value_uint(cor, "deliveries-ingress", addr->deliveries_ingress); dx_agent_value_uint(cor, "deliveries-egress", addr->deliveries_egress); dx_agent_value_uint(cor, "deliveries-transit", addr->deliveries_transit); + dx_agent_value_uint(cor, "deliveries-to-container", addr->deliveries_to_container); + dx_agent_value_uint(cor, "deliveries-from-container", addr->deliveries_from_container); addr = DEQ_NEXT(addr); dx_agent_value_complete(cor, addr != 0); } diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 414d920428..84dd1c0fa7 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -107,7 +107,7 @@ void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t * Depending on its policy, the address may be eligible for being closed out * (i.e. Logging its terminal statistics and freeing its resources). */ -static void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr) +void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr) { if (addr == 0) return; @@ -314,7 +314,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link) } -static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg) +static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg, int *drop) { dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg); dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0); @@ -333,25 +333,27 @@ static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_mess // // If there is a trace field, append this router's ID to the trace. // + dx_compose_insert_string(out_da, DX_DA_TRACE); + dx_compose_start_list(out_da); if (trace) { - dx_compose_insert_string(out_da, DX_DA_TRACE); - dx_compose_start_list(out_da); - if (dx_parse_is_list(trace)) { uint32_t idx = 0; dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx); while (trace_item) { dx_field_iterator_t *iter = dx_parse_raw(trace_item); + if (dx_field_iterator_equal(iter, (unsigned char*) direct_prefix)) + *drop = 1; + dx_field_iterator_reset(iter); dx_compose_insert_string_iterator(out_da, iter); idx++; trace_item = dx_parse_sub_value(trace, idx); } } - - dx_compose_insert_string(out_da, direct_prefix); - dx_compose_end_list(out_da); } + dx_compose_insert_string(out_da, direct_prefix); + dx_compose_end_list(out_da); + // // If there is no ingress field, annotate the ingress as this router else // keep the original field. @@ -475,25 +477,26 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del // Interpret and update the delivery annotations of the message. As a convenience, // this function returns the iterator to the ingress field (if it exists). // - dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg); + int drop = 0; + dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg, &drop); // // Forward to the in-process handler for this message if there is one. The // actual invocation of the handler will occur later after we've released // the lock. // - if (addr->handler) { + if (!drop && addr->handler) { in_process_copy = dx_message_copy(msg); handler = addr->handler; handler_context = addr->handler_context; - addr->deliveries_egress++; + addr->deliveries_to_container++; } // // If the address form is local (i.e. is prefixed by _local), don't forward // outside of the router process. // - if (!is_local) { + if (!drop && !is_local) { // // Forward to all of the local links receiving this address. // @@ -725,6 +728,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) const char *r_src = pn_terminus_get_address(dx_link_remote_source(link)); int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link)); int is_router = dx_router_terminus_is_router(dx_link_remote_target(link)); + int propagate = 0; dx_field_iterator_t *iter = 0; if (is_router && !dx_router_connection_is_inter_router(dx_link_connection(link))) { @@ -745,15 +749,15 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) // // If this is an endpoint link with a source address, make sure the address is - // appropriate for endpoint links. If it is not a local or mobile address, (i.e. - // a router or area address), it cannot be bound to an endpoint link. + // appropriate for endpoint links. If it is not mobile address, it cannot be + // bound to an endpoint link. // if(r_src && !is_router && !is_dynamic) { iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH); unsigned char prefix = dx_field_iterator_octet(iter); dx_field_iterator_reset(iter); - if (prefix != 'L' && prefix != 'M') { + if (prefix != 'M') { dx_field_iterator_free(iter); pn_link_close(pn_link); dx_log(module, LOG_WARNING, "Rejected an outgoing endpoint link with a router address: %s", r_src); @@ -819,15 +823,26 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link) dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); DEQ_INSERT_TAIL(router->addrs, addr); } - dx_field_iterator_free(iter); rlink->owning_addr = addr; dx_router_add_link_ref_LH(&addr->rlinks, rlink); + + // + // If this is not a dynamic address and it is the first local subscription + // to the address, supply the address to the router module for propagation + // to other nodes. + // + propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1); } DEQ_INSERT_TAIL(router->links, rlink); sys_mutex_unlock(router->lock); + if (propagate) + dx_router_global_added(router, iter); + + if (iter) + dx_field_iterator_free(iter); pn_link_open(pn_link); return 0; } @@ -1059,6 +1074,8 @@ dx_router_t *dx_router(dx_dispatch_t *dx, dx_router_mode_t mode, const char *are router->dtag = 1; router->pyRouter = 0; router->pyTick = 0; + router->pyAdded = 0; + router->pyRemoved = 0; // // Create addresses for all of the routers in the topology. It will be registered @@ -1172,7 +1189,7 @@ void dx_router_send(dx_dispatch_t *dx, // // Forward to all of the local links receiving this address. // - addr->deliveries_ingress++; + addr->deliveries_from_container++; dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks); while (dest_link_ref) { dx_routed_event_t *re = new_dx_routed_event_t(); diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h index 35664f8e29..9800189548 100644 --- a/qpid/extras/dispatch/src/router_private.h +++ b/qpid/extras/dispatch/src/router_private.h @@ -120,6 +120,8 @@ struct dx_address_t { uint64_t deliveries_ingress; uint64_t deliveries_egress; uint64_t deliveries_transit; + uint64_t deliveries_to_container; + uint64_t deliveries_from_container; }; ALLOC_DECLARE(dx_address_t); @@ -150,6 +152,8 @@ struct dx_router_t { PyObject *pyRouter; PyObject *pyTick; + PyObject *pyAdded; + PyObject *pyRemoved; dx_agent_class_t *class_router; dx_agent_class_t *class_link; @@ -158,11 +162,16 @@ struct dx_router_t { }; + +void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr); void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link); void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link); void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode); void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode); +void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter); +void dx_router_global_removed(dx_router_t *router, const char *addr); + #endif diff --git a/qpid/extras/dispatch/src/router_pynode.c b/qpid/extras/dispatch/src/router_pynode.c index 0638c06c3e..358fa50447 100644 --- a/qpid/extras/dispatch/src/router_pynode.c +++ b/qpid/extras/dispatch/src/router_pynode.c @@ -347,15 +347,48 @@ static PyObject* dx_del_neighbor_router(PyObject *self, PyObject *args) static PyObject* dx_map_destination(PyObject *self, PyObject *args) { - //RouterAdapter *adapter = (RouterAdapter*) self; - //dx_router_t *router = adapter->router; - const char *addr; - int router_maskbit; + RouterAdapter *adapter = (RouterAdapter*) self; + dx_router_t *router = adapter->router; + const char *addr_string; + int maskbit; + dx_address_t *addr; + dx_field_iterator_t *iter; - if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit)) + if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit)) return 0; - // TODO + if (maskbit >= dx_bitmask_width() || maskbit < 0) { + PyErr_SetString(PyExc_Exception, "Router bit mask out of range"); + return 0; + } + + if (router->routers_by_mask_bit[maskbit] == 0) { + PyErr_SetString(PyExc_Exception, "Router Not Found"); + return 0; + } + + iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH); + + sys_mutex_lock(router->lock); + dx_hash_retrieve(router->addr_hash, iter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + memset(addr, 0, sizeof(dx_address_t)); + DEQ_ITEM_INIT(addr); + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); + dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); + DEQ_ITEM_INIT(addr); + DEQ_INSERT_TAIL(router->addrs, addr); + } + dx_field_iterator_free(iter); + + dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit]; + dx_router_add_node_ref_LH(&addr->rnodes, rnode); + + sys_mutex_unlock(router->lock); + + dx_log(module, LOG_DEBUG, "Remote Destination '%s' Mapped to router %d", addr_string, maskbit); Py_INCREF(Py_None); return Py_None; @@ -364,15 +397,43 @@ static PyObject* dx_map_destination(PyObject *self, PyObject *args) static PyObject* dx_unmap_destination(PyObject *self, PyObject *args) { - //RouterAdapter *adapter = (RouterAdapter*) self; - //dx_router_t *router = adapter->router; - const char *addr; - int router_maskbit; + RouterAdapter *adapter = (RouterAdapter*) self; + dx_router_t *router = adapter->router; + const char *addr_string; + int maskbit; + dx_address_t *addr; - if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit)) + if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit)) return 0; - // TODO + if (maskbit >= dx_bitmask_width() || maskbit < 0) { + PyErr_SetString(PyExc_Exception, "Router bit mask out of range"); + return 0; + } + + if (router->routers_by_mask_bit[maskbit] == 0) { + PyErr_SetString(PyExc_Exception, "Router Not Found"); + return 0; + } + + dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit]; + dx_field_iterator_t *iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH); + + sys_mutex_lock(router->lock); + dx_hash_retrieve(router->addr_hash, iter, (void**) &addr); + dx_field_iterator_free(iter); + + if (!addr) { + PyErr_SetString(PyExc_Exception, "Address Not Found"); + sys_mutex_unlock(router->lock); + return 0; + } + + dx_router_del_node_ref_LH(&addr->rnodes, rnode); + dx_router_check_addr_LH(router, addr); + sys_mutex_unlock(router->lock); + + dx_log(module, LOG_DEBUG, "Remote Destination '%s' Unmapped from router %d", addr_string, maskbit); Py_INCREF(Py_None); return Py_None; @@ -534,6 +595,18 @@ void dx_router_python_setup(dx_router_t *router) dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method"); return; } + + router->pyAdded = PyObject_GetAttrString(router->pyRouter, "addressAdded"); + if (!router->pyAdded || !PyCallable_Check(router->pyAdded)) { + dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressAdded method"); + return; + } + + router->pyRemoved = PyObject_GetAttrString(router->pyRouter, "addressRemoved"); + if (!router->pyRemoved || !PyCallable_Check(router->pyRemoved)) { + dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressRemoved method"); + return; + } } @@ -557,3 +630,35 @@ void dx_pyrouter_tick(dx_router_t *router) } } + +void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter) +{ + PyObject *pArgs; + PyObject *pValue; + + if (router->pyAdded && router->router_mode == DX_ROUTER_MODE_INTERIOR) { + dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); + char *address = (char*) dx_field_iterator_copy(iter); + + dx_python_lock(); + pArgs = PyTuple_New(1); + PyTuple_SetItem(pArgs, 0, PyString_FromString(address)); + pValue = PyObject_CallObject(router->pyAdded, pArgs); + if (PyErr_Occurred()) { + PyErr_Print(); + } + Py_DECREF(pArgs); + if (pValue) { + Py_DECREF(pValue); + } + dx_python_unlock(); + + free(address); + } +} + + +void dx_router_global_removed(dx_router_t *router, const char *addr) +{ +} + diff --git a/qpid/extras/dispatch/tests/system_tests_one_router.py b/qpid/extras/dispatch/tests/system_tests_one_router.py index 302030e463..1154b3ed37 100644 --- a/qpid/extras/dispatch/tests/system_tests_one_router.py +++ b/qpid/extras/dispatch/tests/system_tests_one_router.py @@ -370,7 +370,7 @@ class RouterTest(unittest.TestCase): da = rm.instructions self.assertEqual(da.__class__, dict) self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/') - self.assertFalse('qdx.trace' in da) + self.assertEqual(da['qdx.trace'], ['_topo/area/Qpid.Dispatch.Router.A/']) ## ## Pre-existing ingress @@ -388,7 +388,7 @@ class RouterTest(unittest.TestCase): da = rm.instructions self.assertEqual(da.__class__, dict) self.assertEqual(da['qdx.ingress'], 'ingress-router') - self.assertFalse('qdx.trace' in da) + self.assertEqual(da['qdx.trace'], ['_topo/area/Qpid.Dispatch.Router.A/']) ## ## Invalid trace type diff --git a/qpid/extras/dispatch/tools/src/py/qdstat b/qpid/extras/dispatch/tools/src/py/qdstat index 869d38cf66..3633548c00 100755 --- a/qpid/extras/dispatch/tools/src/py/qdstat +++ b/qpid/extras/dispatch/tools/src/py/qdstat @@ -102,7 +102,7 @@ class BusManager: self.M.start() self.M.route("amqp:/*", "amqp://%s/$1" % host) self.address = "amqp:/_local/agent" - self.reply = "amqp:/reply-address/0001" # FIX THIS! + self.reply = "amqp:/temp.reply-address/0001" # FIX THIS! self.M.subscribe(self.reply) def Disconnect(self): @@ -217,6 +217,8 @@ class BusManager: heads.append(Header("in", Header.COMMAS)) heads.append(Header("out", Header.COMMAS)) heads.append(Header("thru", Header.COMMAS)) + heads.append(Header("to-proc", Header.COMMAS)) + heads.append(Header("from-proc", Header.COMMAS)) rows = [] request = Message() @@ -243,6 +245,8 @@ class BusManager: row.append(addr['deliveries-ingress']) row.append(addr['deliveries-egress']) row.append(addr['deliveries-transit']) + row.append(addr['deliveries-to-container']) + row.append(addr['deliveries-from-container']) rows.append(row) title = "Router Addresses" sorter = Sorter(heads, rows, 'address', 0, True) @@ -316,7 +320,6 @@ def main(argv=None): print except Exception,e: print "Failed: %s - %s" % (e.__class__.__name__, e) - raise bm.Disconnect() # try to deallocate brokers return 1 |
