summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py99
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/binding.py134
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py87
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/node.py28
-rw-r--r--qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py37
-rw-r--r--qpid/extras/dispatch/src/router_agent.c2
-rw-r--r--qpid/extras/dispatch/src/router_node.c51
-rw-r--r--qpid/extras/dispatch/src/router_private.h9
-rw-r--r--qpid/extras/dispatch/src/router_pynode.c129
-rw-r--r--qpid/extras/dispatch/tests/system_tests_one_router.py4
-rwxr-xr-xqpid/extras/dispatch/tools/src/py/qdstat7
11 files changed, 249 insertions, 338 deletions
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py b/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py
deleted file mode 100644
index d21f834751..0000000000
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/adapter.py
+++ /dev/null
@@ -1,99 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-try:
- from dispatch import *
-except ImportError:
- from ..stubs import *
-
-ENTRY_OLD = 1
-ENTRY_CURRENT = 2
-ENTRY_NEW = 3
-
-class AdapterEngine(object):
- """
- This module is responsible for managing the Adapter's key bindings (list of address-subject:next-hop).
- Key binding lists are kept in disjoint key-classes that can come from different parts of the router
- (i.e. topological keys for inter-router communication and mobile keys for end users).
-
- For each key-class, a mirror copy of what the adapter has is kept internally. This allows changes to the
- routing tables to be efficiently communicated to the adapter in the form of table deltas.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.key_classes = {} # map [key_class] => (addr-key, next-hop)
-
-
- def tick(self, now):
- """
- There is no periodic processing needed for this module.
- """
- pass
-
-
- def remote_routes_changed(self, key_class, new_table):
- old_table = []
- if key_class in self.key_classes:
- old_table = self.key_classes[key_class]
-
- # flag all of the old entries
- old_flags = {}
- for a,b in old_table:
- old_flags[(a,b)] = ENTRY_OLD
-
- # flag the new entries
- new_flags = {}
- for a,b in new_table:
- new_flags[(a,b)] = ENTRY_NEW
-
- # calculate the differences from old to new
- for a,b in new_table:
- if old_table.count((a,b)) > 0:
- old_flags[(a,b)] = ENTRY_CURRENT
- new_flags[(a,b)] = ENTRY_CURRENT
-
- # make to_add and to_delete lists
- to_add = []
- to_delete = []
- for (a,b),f in old_flags.items():
- if f == ENTRY_OLD:
- to_delete.append((a,b))
- for (a,b),f in new_flags.items():
- if f == ENTRY_NEW:
- to_add.append((a,b))
-
- # set the routing table to the new contents
- self.key_classes[key_class] = new_table
-
- # update the adapter's routing tables
- # Note: Do deletions before adds to avoid overlapping routes that may cause
- # messages to be duplicated. It's better to have gaps in the routing
- # tables momentarily because unroutable messages are stored for retry.
- for a,b in to_delete:
- self.container.router_adapter.remote_unbind(a, b)
- for a,b in to_add:
- self.container.router_adapter.remote_bind(a, b)
-
- self.container.log(LOG_INFO, "New Routing Table (class=%s):" % key_class)
- for a,b in new_table:
- self.container.log(LOG_INFO, " %s => %s" % (a, b))
-
-
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py b/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py
deleted file mode 100644
index db62f6e8a5..0000000000
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/binding.py
+++ /dev/null
@@ -1,134 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-try:
- from dispatch import *
-except ImportError:
- from ..stubs import *
-
-
-class BindingEngine(object):
- """
- This module is responsible for responding to two different events:
- 1) The learning of new remote mobile addresses
- 2) The change of topology (i.e. different next-hops for remote routers)
- When these occur, this module converts the mobile routing table (address => router)
- to a next-hop routing table (address => next-hop), compresses the keys in case there
- are wild-card overlaps, and notifies outbound of changes in the "mobile-key" address class.
- """
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
- self.current_keys = {}
-
-
- def tick(self, now):
- pass
-
-
- def mobile_keys_changed(self, keys):
- self.current_keys = keys
- next_hop_keys = self._convert_ids_to_next_hops(keys)
- routing_table = self._compress_keys(next_hop_keys)
- self.container.remote_routes_changed('mobile-key', routing_table)
-
-
- def next_hops_changed(self):
- next_hop_keys = self._convert_ids_to_next_hops(self.current_keys)
- routing_table = self._compress_keys(next_hop_keys)
- self.container.remote_routes_changed('mobile-key', routing_table)
-
-
- def _convert_ids_to_next_hops(self, keys):
- next_hops = self.container.get_next_hops()
- new_keys = {}
- for _id, value in keys.items():
- if _id in next_hops:
- next_hop = next_hops[_id]
- if next_hop not in new_keys:
- new_keys[next_hop] = []
- new_keys[next_hop].extend(value)
- return new_keys
-
-
- def _compress_keys(self, keys):
- trees = {}
- for _id, key_list in keys.items():
- trees[_id] = TopicElementList()
- for key in key_list:
- trees[_id].add_key(key)
- routing_table = []
- for _id, tree in trees.items():
- tree_keys = tree.get_list()
- for tk in tree_keys:
- routing_table.append((tk, _id))
- return routing_table
-
-
-class TopicElementList(object):
- """
- """
- def __init__(self):
- self.elements = {} # map text => (terminal, sub-list)
-
- def __repr__(self):
- return "%r" % self.elements
-
- def add_key(self, key):
- self.add_tokens(key.split('.'))
-
- def add_tokens(self, tokens):
- first = tokens.pop(0)
- terminal = len(tokens) == 0
-
- if terminal and first == '#':
- ## Optimization #1A (A.B.C.D followed by A.B.#)
- self.elements = {'#':(True, TopicElementList())}
- return
-
- if '#' in self.elements:
- _t,_el = self.elements['#']
- if _t:
- ## Optimization #1B (A.B.# followed by A.B.C.D)
- return
-
- if first not in self.elements:
- self.elements[first] = (terminal, TopicElementList())
- else:
- _t,_el = self.elements[first]
- if terminal and not _t:
- self.elements[first] = (terminal, _el)
-
- if not terminal:
- _t,_el = self.elements[first]
- _el.add_tokens(tokens)
-
- def get_list(self):
- keys = []
- for token, (_t,_el) in self.elements.items():
- if _t: keys.append(token)
- _el.build_list(token, keys)
- return keys
-
- def build_list(self, prefix, keys):
- for token, (_t,_el) in self.elements.items():
- if _t: keys.append("%s.%s" % (prefix, token))
- _el.build_list("%s.%s" % (prefix, token), keys)
-
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
index 0707f7f250..0e27910e42 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/mobile.py
@@ -31,18 +31,18 @@ class MobileAddressEngine(object):
Note that this routing table maps from the mobile address to the remote router where that address
is directly bound.
"""
- def __init__(self, container):
+ def __init__(self, container, node_tracker):
self.container = container
+ self.node_tracker = node_tracker
self.id = self.container.id
self.area = self.container.area
self.mobile_addr_max_age = self.container.config.mobile_addr_max_age
self.mobile_seq = 0
- self.local_keys = []
- self.added_keys = []
- self.deleted_keys = []
- self.remote_lists = {} # map router_id => (sequence, list of keys)
+ self.local_addrs = []
+ self.added_addrs = []
+ self.deleted_addrs = []
+ self.remote_lists = {} # map router_id => (sequence, list of addrs)
self.remote_last_seen = {} # map router_id => time of last seen advertizement/update
- self.remote_changed = False
self.needed_mars = {}
@@ -51,48 +51,41 @@ class MobileAddressEngine(object):
self._send_mars()
##
- ## If local keys have changed, collect the changes and send a MAU with the diffs
+ ## If local addrs have changed, collect the changes and send a MAU with the diffs
## Note: it is important that the differential-MAU be sent before a RA is sent
##
- if len(self.added_keys) > 0 or len(self.deleted_keys) > 0:
+ if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0:
self.mobile_seq += 1
self.container.send('amqp:/_topo/%s/all/qdxrouter' % self.area,
- MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys))
- self.local_keys.extend(self.added_keys)
- for key in self.deleted_keys:
- self.local_keys.remove(key)
- self.added_keys = []
- self.deleted_keys = []
+ MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_addrs, self.deleted_addrs))
+ self.local_addrs.extend(self.added_addrs)
+ for addr in self.deleted_addrs:
+ self.local_addrs.remove(addr)
+ self.added_addrs = []
+ self.deleted_addrs = []
self.container.mobile_sequence_changed(self.mobile_seq)
- ##
- ## If remotes have changed, start the process of updating local bindings
- ##
- if self.remote_changed:
- self.remote_changed = False
- self._update_remote_keys()
-
- def add_local_address(self, key):
+ def add_local_address(self, addr):
"""
"""
- if self.local_keys.count(key) == 0:
- if self.added_keys.count(key) == 0:
- self.added_keys.append(key)
+ if self.local_addrs.count(addr) == 0:
+ if self.added_addrs.count(addr) == 0:
+ self.added_addrs.append(addr)
else:
- if self.deleted_keys.count(key) > 0:
- self.deleted_keys.remove(key)
+ if self.deleted_addrs.count(addr) > 0:
+ self.deleted_addrs.remove(addr)
- def del_local_address(self, key):
+ def del_local_address(self, addr):
"""
"""
- if self.local_keys.count(key) > 0:
- if self.deleted_keys.count(key) == 0:
- self.deleted_keys.append(key)
+ if self.local_addrs.count(addr) > 0:
+ if self.deleted_addrs.count(addr) == 0:
+ self.deleted_addrs.append(addr)
else:
- if self.added_keys.count(key) > 0:
- self.added_keys.remove(key)
+ if self.added_addrs.count(addr) > 0:
+ self.added_addrs.remove(addr)
def handle_ra(self, msg, now):
@@ -131,7 +124,8 @@ class MobileAddressEngine(object):
return
self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list)
self.remote_last_seen[msg.id] = now
- self.remote_changed = True
+ (add_list, del_list) = self.node_tracker.overwrite_addresses(msg.id, msg.exist_list)
+ self._activate_remotes(msg.id, add_list, del_list)
else:
##
## Differential MAU
@@ -148,10 +142,12 @@ class MobileAddressEngine(object):
if msg.add_list and msg.add_list.__class__ == list:
_list.extend(msg.add_list)
if msg.del_list and msg.del_list.__class__ == list:
- for key in msg.del_list:
- _list.remove(key)
+ for addr in msg.del_list:
+ _list.remove(addr)
self.remote_lists[msg.id] = (msg.mobile_seq, _list)
- self.remote_changed = True
+ self.node_tracker.add_addresses(msg.id, msg.add_list)
+ self.node_tracker.del_addresses(msg.id, msg.del_list)
+ self._activate_remotes(msg.id, msg.add_list, msg.del_list)
else:
self.needed_mars[(msg.id, msg.area, _seq)] = None
else:
@@ -163,14 +159,7 @@ class MobileAddressEngine(object):
return
if msg.have_seq < self.mobile_seq:
self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (msg.area, msg.id),
- MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys))
-
-
- def _update_remote_keys(self):
- keys = {}
- for _id,(seq,key_list) in self.remote_lists.items():
- keys[_id] = key_list
- self.container.mobile_keys_changed(keys)
+ MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_addrs))
def _expire_remotes(self, now):
@@ -186,3 +175,11 @@ class MobileAddressEngine(object):
self.container.send('amqp:/_topo/%s/%s/qdxrouter' % (_area, _id), MessageMAR(None, self.id, self.area, _seq))
self.needed_mars = {}
+
+ def _activate_remotes(self, _id, added, deleted):
+ bit = self.node_tracker.maskbit_for_node(_id)
+ for a in added:
+ self.container.router_adapter.map_destination(a, bit)
+ for d in deleted:
+ self.container.router_adapter.unmap_destination(d, bit)
+
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
index f1abb5702a..ef697428da 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/node.py
@@ -113,6 +113,33 @@ class NodeTracker(object):
return None
+ def add_addresses(self, node_id, addrs):
+ node = self.nodes[node_id]
+ for a in addrs:
+ node.addrs[a] = 1
+
+
+ def del_addresses(self, node_id, addrs):
+ node = self.nodes[node_id]
+ for a in addrs:
+ node.addrs.pop(a)
+
+
+ def overwrite_addresses(self, node_id, addrs):
+ node = self.nodes[node_id]
+ added = []
+ deleted = []
+ for a in addrs:
+ if a not in node.addrs.keys():
+ added.append(a)
+ for a in node.addrs.keys():
+ if a not in addrs:
+ deleted.append(a)
+ for a in addrs:
+ node.addrs[a] = 1
+ return (added, deleted)
+
+
def _allocate_maskbit(self):
if self.next_maskbit == None:
raise Exception("Exceeded Maximum Router Count")
@@ -143,4 +170,5 @@ class RemoteNode(object):
self.maskbit = maskbit
self.neighbor = neighbor
self.remote = not neighbor
+ self.addrs = {} # Address => Count at Node (1 only for the present)
diff --git a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
index d5250872b2..5128d175a6 100644
--- a/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
+++ b/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
@@ -27,8 +27,6 @@ from link import LinkStateEngine
from path import PathEngine
from mobile import MobileAddressEngine
from routing import RoutingTableEngine
-from binding import BindingEngine
-from adapter import AdapterEngine
from node import NodeTracker
##
@@ -75,10 +73,8 @@ class RouterEngine:
self.neighbor_engine = NeighborEngine(self)
self.link_state_engine = LinkStateEngine(self)
self.path_engine = PathEngine(self)
- self.mobile_address_engine = MobileAddressEngine(self)
+ self.mobile_address_engine = MobileAddressEngine(self, self.node_tracker)
self.routing_table_engine = RoutingTableEngine(self, self.node_tracker)
- self.binding_engine = BindingEngine(self)
- self.adapter_engine = AdapterEngine(self)
@@ -92,24 +88,26 @@ class RouterEngine:
return self.id
- def addLocalAddress(self, key):
+ def addressAdded(self, addr):
"""
"""
try:
- if key.find('_topo') == 0 or key.find('_local') == 0:
+ if addr.find('Mtemp.') == 0:
return
- self.mobile_address_engine.add_local_address(key)
+ if addr.find('M') == 0:
+ self.mobile_address_engine.add_local_address(addr[1:])
except Exception, e:
self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e)
- def delLocalAddress(self, key):
+ def addressRemoved(self, addr):
"""
"""
try:
- if key.find('_topo') == 0 or key.find('_local') == 0:
+ if addr.find('Mtemp.') == 0:
return
- self.mobile_address_engine.del_local_address(key)
+ if key.find('M') == 0:
+ self.mobile_address_engine.del_local_address(addr[1:])
except Exception, e:
self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e)
@@ -124,8 +122,6 @@ class RouterEngine:
self.path_engine.tick(now)
self.mobile_address_engine.tick(now)
self.routing_table_engine.tick(now)
- self.binding_engine.tick(now)
- self.adapter_engine.tick(now)
self.node_tracker.tick(now)
except Exception, e:
self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e)
@@ -190,14 +186,10 @@ class RouterEngine:
return { 'help' : "Get list of supported values for kind",
'link-state' : "This router's link state",
'link-state-set' : "The set of link states from known routers",
- 'next-hops' : "Next hops to each known router",
- 'topo-table' : "Topological routing table",
- 'mobile-table' : "Mobile key routing table"
+ 'next-hops' : "Next hops to each known router"
}
if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict()
if kind == 'next-hops' : return self.routing_table_engine.next_hops
- if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']}
- if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']}
if kind == 'link-state-set' :
copy = {}
for _id,_ls in self.link_state_engine.collection.items():
@@ -249,7 +241,6 @@ class RouterEngine:
def next_hops_changed(self, next_hop_table):
self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
self.routing_table_engine.next_hops_changed(next_hop_table)
- self.binding_engine.next_hops_changed()
def valid_origins_changed(self, valid_origins):
self.log(LOG_DEBUG, "Event: valid_origins_changed: %r" % valid_origins)
@@ -259,17 +250,9 @@ class RouterEngine:
self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
self.link_state_engine.set_mobile_sequence(mobile_seq)
- def mobile_keys_changed(self, keys):
- self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys)
- self.binding_engine.mobile_keys_changed(keys)
-
def get_next_hops(self):
return self.routing_table_engine.get_next_hops()
- def remote_routes_changed(self, key_class, routes):
- self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes))
- self.adapter_engine.remote_routes_changed(key_class, routes)
-
def new_neighbor(self, rid, link_id):
self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d" % (rid, link_id))
self.node_tracker.new_neighbor(rid, link_id)
diff --git a/qpid/extras/dispatch/src/router_agent.c b/qpid/extras/dispatch/src/router_agent.c
index 0dee4841d5..d4b719732c 100644
--- a/qpid/extras/dispatch/src/router_agent.c
+++ b/qpid/extras/dispatch/src/router_agent.c
@@ -140,6 +140,8 @@ static void dx_router_query_address(dx_router_t *router, void *cor)
dx_agent_value_uint(cor, "deliveries-ingress", addr->deliveries_ingress);
dx_agent_value_uint(cor, "deliveries-egress", addr->deliveries_egress);
dx_agent_value_uint(cor, "deliveries-transit", addr->deliveries_transit);
+ dx_agent_value_uint(cor, "deliveries-to-container", addr->deliveries_to_container);
+ dx_agent_value_uint(cor, "deliveries-from-container", addr->deliveries_from_container);
addr = DEQ_NEXT(addr);
dx_agent_value_complete(cor, addr != 0);
}
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index 414d920428..84dd1c0fa7 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -107,7 +107,7 @@ void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t
* Depending on its policy, the address may be eligible for being closed out
* (i.e. Logging its terminal statistics and freeing its resources).
*/
-static void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr)
+void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr)
{
if (addr == 0)
return;
@@ -314,7 +314,7 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
}
-static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg)
+static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_message_t *msg, int *drop)
{
dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
dx_composed_field_t *out_da = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
@@ -333,25 +333,27 @@ static dx_field_iterator_t *router_annotate_message(dx_router_t *router, dx_mess
//
// If there is a trace field, append this router's ID to the trace.
//
+ dx_compose_insert_string(out_da, DX_DA_TRACE);
+ dx_compose_start_list(out_da);
if (trace) {
- dx_compose_insert_string(out_da, DX_DA_TRACE);
- dx_compose_start_list(out_da);
-
if (dx_parse_is_list(trace)) {
uint32_t idx = 0;
dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx);
while (trace_item) {
dx_field_iterator_t *iter = dx_parse_raw(trace_item);
+ if (dx_field_iterator_equal(iter, (unsigned char*) direct_prefix))
+ *drop = 1;
+ dx_field_iterator_reset(iter);
dx_compose_insert_string_iterator(out_da, iter);
idx++;
trace_item = dx_parse_sub_value(trace, idx);
}
}
-
- dx_compose_insert_string(out_da, direct_prefix);
- dx_compose_end_list(out_da);
}
+ dx_compose_insert_string(out_da, direct_prefix);
+ dx_compose_end_list(out_da);
+
//
// If there is no ingress field, annotate the ingress as this router else
// keep the original field.
@@ -475,25 +477,26 @@ static void router_rx_handler(void* context, dx_link_t *link, dx_delivery_t *del
// Interpret and update the delivery annotations of the message. As a convenience,
// this function returns the iterator to the ingress field (if it exists).
//
- dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg);
+ int drop = 0;
+ dx_field_iterator_t *ingress_iter = router_annotate_message(router, msg, &drop);
//
// Forward to the in-process handler for this message if there is one. The
// actual invocation of the handler will occur later after we've released
// the lock.
//
- if (addr->handler) {
+ if (!drop && addr->handler) {
in_process_copy = dx_message_copy(msg);
handler = addr->handler;
handler_context = addr->handler_context;
- addr->deliveries_egress++;
+ addr->deliveries_to_container++;
}
//
// If the address form is local (i.e. is prefixed by _local), don't forward
// outside of the router process.
//
- if (!is_local) {
+ if (!drop && !is_local) {
//
// Forward to all of the local links receiving this address.
//
@@ -725,6 +728,7 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
const char *r_src = pn_terminus_get_address(dx_link_remote_source(link));
int is_dynamic = pn_terminus_is_dynamic(dx_link_remote_source(link));
int is_router = dx_router_terminus_is_router(dx_link_remote_target(link));
+ int propagate = 0;
dx_field_iterator_t *iter = 0;
if (is_router && !dx_router_connection_is_inter_router(dx_link_connection(link))) {
@@ -745,15 +749,15 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
//
// If this is an endpoint link with a source address, make sure the address is
- // appropriate for endpoint links. If it is not a local or mobile address, (i.e.
- // a router or area address), it cannot be bound to an endpoint link.
+ // appropriate for endpoint links. If it is not mobile address, it cannot be
+ // bound to an endpoint link.
//
if(r_src && !is_router && !is_dynamic) {
iter = dx_field_iterator_string(r_src, ITER_VIEW_ADDRESS_HASH);
unsigned char prefix = dx_field_iterator_octet(iter);
dx_field_iterator_reset(iter);
- if (prefix != 'L' && prefix != 'M') {
+ if (prefix != 'M') {
dx_field_iterator_free(iter);
pn_link_close(pn_link);
dx_log(module, LOG_WARNING, "Rejected an outgoing endpoint link with a router address: %s", r_src);
@@ -819,15 +823,26 @@ static int router_outgoing_link_handler(void* context, dx_link_t *link)
dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
}
- dx_field_iterator_free(iter);
rlink->owning_addr = addr;
dx_router_add_link_ref_LH(&addr->rlinks, rlink);
+
+ //
+ // If this is not a dynamic address and it is the first local subscription
+ // to the address, supply the address to the router module for propagation
+ // to other nodes.
+ //
+ propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1);
}
DEQ_INSERT_TAIL(router->links, rlink);
sys_mutex_unlock(router->lock);
+ if (propagate)
+ dx_router_global_added(router, iter);
+
+ if (iter)
+ dx_field_iterator_free(iter);
pn_link_open(pn_link);
return 0;
}
@@ -1059,6 +1074,8 @@ dx_router_t *dx_router(dx_dispatch_t *dx, dx_router_mode_t mode, const char *are
router->dtag = 1;
router->pyRouter = 0;
router->pyTick = 0;
+ router->pyAdded = 0;
+ router->pyRemoved = 0;
//
// Create addresses for all of the routers in the topology. It will be registered
@@ -1172,7 +1189,7 @@ void dx_router_send(dx_dispatch_t *dx,
//
// Forward to all of the local links receiving this address.
//
- addr->deliveries_ingress++;
+ addr->deliveries_from_container++;
dx_router_link_ref_t *dest_link_ref = DEQ_HEAD(addr->rlinks);
while (dest_link_ref) {
dx_routed_event_t *re = new_dx_routed_event_t();
diff --git a/qpid/extras/dispatch/src/router_private.h b/qpid/extras/dispatch/src/router_private.h
index 35664f8e29..9800189548 100644
--- a/qpid/extras/dispatch/src/router_private.h
+++ b/qpid/extras/dispatch/src/router_private.h
@@ -120,6 +120,8 @@ struct dx_address_t {
uint64_t deliveries_ingress;
uint64_t deliveries_egress;
uint64_t deliveries_transit;
+ uint64_t deliveries_to_container;
+ uint64_t deliveries_from_container;
};
ALLOC_DECLARE(dx_address_t);
@@ -150,6 +152,8 @@ struct dx_router_t {
PyObject *pyRouter;
PyObject *pyTick;
+ PyObject *pyAdded;
+ PyObject *pyRemoved;
dx_agent_class_t *class_router;
dx_agent_class_t *class_link;
@@ -158,11 +162,16 @@ struct dx_router_t {
};
+
+void dx_router_check_addr_LH(dx_router_t *router, dx_address_t *addr);
void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
void dx_router_del_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link);
void dx_router_add_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
void dx_router_del_node_ref_LH(dx_router_ref_list_t *ref_list, dx_router_node_t *rnode);
+void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter);
+void dx_router_global_removed(dx_router_t *router, const char *addr);
+
#endif
diff --git a/qpid/extras/dispatch/src/router_pynode.c b/qpid/extras/dispatch/src/router_pynode.c
index 0638c06c3e..358fa50447 100644
--- a/qpid/extras/dispatch/src/router_pynode.c
+++ b/qpid/extras/dispatch/src/router_pynode.c
@@ -347,15 +347,48 @@ static PyObject* dx_del_neighbor_router(PyObject *self, PyObject *args)
static PyObject* dx_map_destination(PyObject *self, PyObject *args)
{
- //RouterAdapter *adapter = (RouterAdapter*) self;
- //dx_router_t *router = adapter->router;
- const char *addr;
- int router_maskbit;
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ const char *addr_string;
+ int maskbit;
+ dx_address_t *addr;
+ dx_field_iterator_t *iter;
- if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
+ if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
return 0;
- // TODO
+ if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Router Not Found");
+ return 0;
+ }
+
+ iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+
+ sys_mutex_lock(router->lock);
+ dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ memset(addr, 0, sizeof(dx_address_t));
+ DEQ_ITEM_INIT(addr);
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_ITEM_INIT(addr);
+ DEQ_INSERT_TAIL(router->addrs, addr);
+ }
+ dx_field_iterator_free(iter);
+
+ dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
+ dx_router_add_node_ref_LH(&addr->rnodes, rnode);
+
+ sys_mutex_unlock(router->lock);
+
+ dx_log(module, LOG_DEBUG, "Remote Destination '%s' Mapped to router %d", addr_string, maskbit);
Py_INCREF(Py_None);
return Py_None;
@@ -364,15 +397,43 @@ static PyObject* dx_map_destination(PyObject *self, PyObject *args)
static PyObject* dx_unmap_destination(PyObject *self, PyObject *args)
{
- //RouterAdapter *adapter = (RouterAdapter*) self;
- //dx_router_t *router = adapter->router;
- const char *addr;
- int router_maskbit;
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ dx_router_t *router = adapter->router;
+ const char *addr_string;
+ int maskbit;
+ dx_address_t *addr;
- if (!PyArg_ParseTuple(args, "si", &addr, &router_maskbit))
+ if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
return 0;
- // TODO
+ if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+ PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
+ return 0;
+ }
+
+ if (router->routers_by_mask_bit[maskbit] == 0) {
+ PyErr_SetString(PyExc_Exception, "Router Not Found");
+ return 0;
+ }
+
+ dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
+ dx_field_iterator_t *iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+
+ sys_mutex_lock(router->lock);
+ dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ dx_field_iterator_free(iter);
+
+ if (!addr) {
+ PyErr_SetString(PyExc_Exception, "Address Not Found");
+ sys_mutex_unlock(router->lock);
+ return 0;
+ }
+
+ dx_router_del_node_ref_LH(&addr->rnodes, rnode);
+ dx_router_check_addr_LH(router, addr);
+ sys_mutex_unlock(router->lock);
+
+ dx_log(module, LOG_DEBUG, "Remote Destination '%s' Unmapped from router %d", addr_string, maskbit);
Py_INCREF(Py_None);
return Py_None;
@@ -534,6 +595,18 @@ void dx_router_python_setup(dx_router_t *router)
dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method");
return;
}
+
+ router->pyAdded = PyObject_GetAttrString(router->pyRouter, "addressAdded");
+ if (!router->pyAdded || !PyCallable_Check(router->pyAdded)) {
+ dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressAdded method");
+ return;
+ }
+
+ router->pyRemoved = PyObject_GetAttrString(router->pyRouter, "addressRemoved");
+ if (!router->pyRemoved || !PyCallable_Check(router->pyRemoved)) {
+ dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressRemoved method");
+ return;
+ }
}
@@ -557,3 +630,35 @@ void dx_pyrouter_tick(dx_router_t *router)
}
}
+
+void dx_router_global_added(dx_router_t *router, dx_field_iterator_t *iter)
+{
+ PyObject *pArgs;
+ PyObject *pValue;
+
+ if (router->pyAdded && router->router_mode == DX_ROUTER_MODE_INTERIOR) {
+ dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+ char *address = (char*) dx_field_iterator_copy(iter);
+
+ dx_python_lock();
+ pArgs = PyTuple_New(1);
+ PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
+ pValue = PyObject_CallObject(router->pyAdded, pArgs);
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ }
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
+ dx_python_unlock();
+
+ free(address);
+ }
+}
+
+
+void dx_router_global_removed(dx_router_t *router, const char *addr)
+{
+}
+
diff --git a/qpid/extras/dispatch/tests/system_tests_one_router.py b/qpid/extras/dispatch/tests/system_tests_one_router.py
index 302030e463..1154b3ed37 100644
--- a/qpid/extras/dispatch/tests/system_tests_one_router.py
+++ b/qpid/extras/dispatch/tests/system_tests_one_router.py
@@ -370,7 +370,7 @@ class RouterTest(unittest.TestCase):
da = rm.instructions
self.assertEqual(da.__class__, dict)
self.assertEqual(da['qdx.ingress'], '_topo/area/Qpid.Dispatch.Router.A/')
- self.assertFalse('qdx.trace' in da)
+ self.assertEqual(da['qdx.trace'], ['_topo/area/Qpid.Dispatch.Router.A/'])
##
## Pre-existing ingress
@@ -388,7 +388,7 @@ class RouterTest(unittest.TestCase):
da = rm.instructions
self.assertEqual(da.__class__, dict)
self.assertEqual(da['qdx.ingress'], 'ingress-router')
- self.assertFalse('qdx.trace' in da)
+ self.assertEqual(da['qdx.trace'], ['_topo/area/Qpid.Dispatch.Router.A/'])
##
## Invalid trace type
diff --git a/qpid/extras/dispatch/tools/src/py/qdstat b/qpid/extras/dispatch/tools/src/py/qdstat
index 869d38cf66..3633548c00 100755
--- a/qpid/extras/dispatch/tools/src/py/qdstat
+++ b/qpid/extras/dispatch/tools/src/py/qdstat
@@ -102,7 +102,7 @@ class BusManager:
self.M.start()
self.M.route("amqp:/*", "amqp://%s/$1" % host)
self.address = "amqp:/_local/agent"
- self.reply = "amqp:/reply-address/0001" # FIX THIS!
+ self.reply = "amqp:/temp.reply-address/0001" # FIX THIS!
self.M.subscribe(self.reply)
def Disconnect(self):
@@ -217,6 +217,8 @@ class BusManager:
heads.append(Header("in", Header.COMMAS))
heads.append(Header("out", Header.COMMAS))
heads.append(Header("thru", Header.COMMAS))
+ heads.append(Header("to-proc", Header.COMMAS))
+ heads.append(Header("from-proc", Header.COMMAS))
rows = []
request = Message()
@@ -243,6 +245,8 @@ class BusManager:
row.append(addr['deliveries-ingress'])
row.append(addr['deliveries-egress'])
row.append(addr['deliveries-transit'])
+ row.append(addr['deliveries-to-container'])
+ row.append(addr['deliveries-from-container'])
rows.append(row)
title = "Router Addresses"
sorter = Sorter(heads, rows, 'address', 0, True)
@@ -316,7 +320,6 @@ def main(argv=None):
print
except Exception,e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
- raise
bm.Disconnect() # try to deallocate brokers
return 1