diff options
| author | Ted Ross <tross@apache.org> | 2013-06-28 13:42:12 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-06-28 13:42:12 +0000 |
| commit | 9f4d5ee381667a129d926ddd3df5735f839f7bbc (patch) | |
| tree | 17166f4423e10118a4c01fb6b6eddd355621fe24 /qpid/extras/dispatch | |
| parent | 914b52c41d17046d4549cd7f9f55d3c356ff8de5 (diff) | |
| download | qpid-python-9f4d5ee381667a129d926ddd3df5735f839f7bbc.tar.gz | |
QPID-4967 - Added the Python routing engine and integrated its tests into ctest
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1497770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch')
| -rw-r--r-- | qpid/extras/dispatch/src/agent.c | 2 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/message.c | 10 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/__init__.py | 20 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/adapter.py | 102 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/binding.py | 138 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/configuration.py | 47 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/data.py | 269 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/link.py | 143 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/mobile.py | 183 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/neighbor.py | 85 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/path.py | 205 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/router_engine.py | 246 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/routing.py | 59 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/router_engine_test.py | 410 |
15 files changed, 1914 insertions, 6 deletions
diff --git a/qpid/extras/dispatch/src/agent.c b/qpid/extras/dispatch/src/agent.c index 6a284fc87d..70f31d1522 100644 --- a/qpid/extras/dispatch/src/agent.c +++ b/qpid/extras/dispatch/src/agent.c @@ -168,7 +168,7 @@ static void dx_agent_process_request(dx_agent_t *agent, dx_message_t *msg) return; // - // Try to get a map-view of the application-properties. Exit if the it is not a map-value. + // Try to get a map-view of the application-properties. Exit if it is not a map-value. // dx_field_map_t *map = dx_field_map(ap, 1); if (map == 0) { diff --git a/qpid/extras/dispatch/src/message.c b/qpid/extras/dispatch/src/message.c index 8047552c2b..8576bd5146 100644 --- a/qpid/extras/dispatch/src/message.c +++ b/qpid/extras/dispatch/src/message.c @@ -95,7 +95,7 @@ static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field break; } - if (field) { + if (field && !field->parsed) { field->buffer = *buffer; field->offset = *cursor - dx_buffer_base(*buffer); field->length = consume; @@ -280,7 +280,7 @@ static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_mess result = traverse_field(&cursor, &buffer, 0); // message_id if (!result) return 0; - result = traverse_field(&cursor, &buffer, 0); // user_id + result = traverse_field(&cursor, &buffer, &content->field_user_id); // user_id if (!result) return 0; result = traverse_field(&cursor, &buffer, &content->field_to); // to if (!result) return 0; @@ -301,14 +301,14 @@ static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_mess int count = start_list(&cursor, &buffer); int result; - if (count < 3) + if (count < 5) break; result = traverse_field(&cursor, &buffer, 0); // message_id if (!result) return 0; - result = traverse_field(&cursor, &buffer, 0); // user_id + result = traverse_field(&cursor, &buffer, &content->field_user_id); // user_id if (!result) return 0; - result = traverse_field(&cursor, &buffer, 0); // to + result = traverse_field(&cursor, &buffer, &content->field_to); // to if (!result) return 0; result = traverse_field(&cursor, &buffer, 0); // subject if (!result) return 0; diff --git a/qpid/extras/dispatch/src/py/router/__init__.py b/qpid/extras/dispatch/src/py/router/__init__.py new file mode 100644 index 0000000000..bfc600d469 --- /dev/null +++ b/qpid/extras/dispatch/src/py/router/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from router.router_engine import * diff --git a/qpid/extras/dispatch/src/py/router/adapter.py b/qpid/extras/dispatch/src/py/router/adapter.py new file mode 100644 index 0000000000..b4a77d487e --- /dev/null +++ b/qpid/extras/dispatch/src/py/router/adapter.py @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +TRACE = 0 +DEBUG = 1 +INFO = 2 +NOTICE = 3 +WARNING = 4 +ERROR = 5 +CRITICAL = 6 + +ENTRY_OLD = 1 +ENTRY_CURRENT = 2 +ENTRY_NEW = 3 + +class AdapterEngine(object): + """ + This module is responsible for managing the Adapter's key bindings (list of address-subject:next-hop). + Key binding lists are kept in disjoint key-classes that can come from different parts of the router + (i.e. topological keys for inter-router communication and mobile keys for end users). + + For each key-class, a mirror copy of what the adapter has is kept internally. This allows changes to the + routing tables to be efficiently communicated to the adapter in the form of table deltas. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.key_classes = {} # map [key_class] => (addr-key, next-hop) + + + def tick(self, now): + """ + There is no periodic processing needed for this module. + """ + pass + + + def remote_routes_changed(self, key_class, new_table): + old_table = [] + if key_class in self.key_classes: + old_table = self.key_classes[key_class] + + # flag all of the old entries + old_flags = {} + for a,b in old_table: + old_flags[(a,b)] = ENTRY_OLD + + # flag the new entries + new_flags = {} + for a,b in new_table: + new_flags[(a,b)] = ENTRY_NEW + + # calculate the differences from old to new + for a,b in new_table: + if old_table.count((a,b)) > 0: + old_flags[(a,b)] = ENTRY_CURRENT + new_flags[(a,b)] = ENTRY_CURRENT + + # make to_add and to_delete lists + to_add = [] + to_delete = [] + for (a,b),f in old_flags.items(): + if f == ENTRY_OLD: + to_delete.append((a,b)) + for (a,b),f in new_flags.items(): + if f == ENTRY_NEW: + to_add.append((a,b)) + + # set the routing table to the new contents + self.key_classes[key_class] = new_table + + # update the adapter's routing tables + # Note: Do deletions before adds to avoid overlapping routes that may cause + # messages to be duplicated. It's better to have gaps in the routing + # tables momentarily because unroutable messages are stored for retry. + for a,b in to_delete: + self.container.adapter.remote_unbind(a, b) + for a,b in to_add: + self.container.adapter.remote_bind(a, b) + + self.container.log(INFO, "New Routing Table (class=%s):" % key_class) + for a,b in new_table: + self.container.log(INFO, " %s => %s" % (a, b)) + + diff --git a/qpid/extras/dispatch/src/py/router/binding.py b/qpid/extras/dispatch/src/py/router/binding.py new file mode 100644 index 0000000000..5bec0208c9 --- /dev/null +++ b/qpid/extras/dispatch/src/py/router/binding.py @@ -0,0 +1,138 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +TRACE = 0 +DEBUG = 1 +INFO = 2 +NOTICE = 3 +WARNING = 4 +ERROR = 5 +CRITICAL = 6 + +class BindingEngine(object): + """ + This module is responsible for responding to two different events: + 1) The learning of new remote mobile addresses + 2) The change of topology (i.e. different next-hops for remote routers) + When these occur, this module converts the mobile routing table (address => router) + to a next-hop routing table (address => next-hop), compresses the keys in case there + are wild-card overlaps, and notifies outbound of changes in the "mobile-key" address class. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.current_keys = {} + + + def tick(self, now): + pass + + + def mobile_keys_changed(self, keys): + self.current_keys = keys + next_hop_keys = self._convert_ids_to_next_hops(keys) + routing_table = self._compress_keys(next_hop_keys) + self.container.remote_routes_changed('mobile-key', routing_table) + + + def next_hops_changed(self): + next_hop_keys = self._convert_ids_to_next_hops(self.current_keys) + routing_table = self._compress_keys(next_hop_keys) + self.container.remote_routes_changed('mobile-key', routing_table) + + + def _convert_ids_to_next_hops(self, keys): + next_hops = self.container.get_next_hops() + new_keys = {} + for _id, value in keys.items(): + if _id in next_hops: + next_hop = next_hops[_id] + if next_hop not in new_keys: + new_keys[next_hop] = [] + new_keys[next_hop].extend(value) + return new_keys + + + def _compress_keys(self, keys): + trees = {} + for _id, key_list in keys.items(): + trees[_id] = TopicElementList() + for key in key_list: + trees[_id].add_key(key) + routing_table = [] + for _id, tree in trees.items(): + tree_keys = tree.get_list() + for tk in tree_keys: + routing_table.append((tk, _id)) + return routing_table + + +class TopicElementList(object): + """ + """ + def __init__(self): + self.elements = {} # map text => (terminal, sub-list) + + def __repr__(self): + return "%r" % self.elements + + def add_key(self, key): + self.add_tokens(key.split('.')) + + def add_tokens(self, tokens): + first = tokens.pop(0) + terminal = len(tokens) == 0 + + if terminal and first == '#': + ## Optimization #1A (A.B.C.D followed by A.B.#) + self.elements = {'#':(True, TopicElementList())} + return + + if '#' in self.elements: + _t,_el = self.elements['#'] + if _t: + ## Optimization #1B (A.B.# followed by A.B.C.D) + return + + if first not in self.elements: + self.elements[first] = (terminal, TopicElementList()) + else: + _t,_el = self.elements[first] + if terminal and not _t: + self.elements[first] = (terminal, _el) + + if not terminal: + _t,_el = self.elements[first] + _el.add_tokens(tokens) + + def get_list(self): + keys = [] + for token, (_t,_el) in self.elements.items(): + if _t: keys.append(token) + _el.build_list(token, keys) + return keys + + def build_list(self, prefix, keys): + for token, (_t,_el) in self.elements.items(): + if _t: keys.append("%s.%s" % (prefix, token)) + _el.build_list("%s.%s" % (prefix, token), keys) + + + diff --git a/qpid/extras/dispatch/src/py/router/configuration.py b/qpid/extras/dispatch/src/py/router/configuration.py new file mode 100644 index 0000000000..f87d2ee7d2 --- /dev/null +++ b/qpid/extras/dispatch/src/py/router/configuration.py @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class Configuration(object): + """ + This module manages and holds the configuration and tuning parameters for a router. + """ + def __init__(self, overrides={}): + ## + ## Load default values + ## + self.values = { 'hello_interval' : 1.0, + 'hello_max_age' : 3.0, + 'ra_interval' : 30.0, + 'remote_ls_max_age' : 60.0, + 'mobile_addr_max_age' : 60.0 } + + ## + ## Apply supplied overrides + ## + for k, v in overrides.items(): + self.values[k] = v + + def __getattr__(self, key): + if key in self.values: + return self.values[key] + raise KeyError + + def __repr__(self): + return "%r" % self.values + diff --git a/qpid/extras/dispatch/src/py/router/data.py b/qpid/extras/dispatch/src/py/router/data.py new file mode 100644 index 0000000000..3fd938d30d --- /dev/null +++ b/qpid/extras/dispatch/src/py/router/data.py @@ -0,0 +1,269 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +def getMandatory(data, key, cls=None): + """ + Get the value mapped to the requested key. If it's not present, raise an exception. + """ + if key in data: + value = data[key] + if cls and value.__class__ != cls: + raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls)) + return value + raise Exception("Mandatory protocol field missing: '%s'" % key) + + +def getOptional(data, key, default=None, cls=None): + """ + Get the value mapped to the requested key. If it's not present, return the default value. + """ + if key in data: + value = data[key] + if cls and value.__class__ != cls: + raise Exception("Protocol field has wrong data type: '%s' type=%r expected=%r" % (key, value.__class__, cls)) + return value + return default + + +class LinkState(object): + """ + The link-state of a single router. The link state consists of a list of neighbor routers reachable from + the reporting router. The link-state-sequence number is incremented each time the link state changes. + """ + def __init__(self, body, _id=None, _area=None, _ls_seq=None, _peers=None): + self.last_seen = 0 + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.ls_seq = getMandatory(body, 'ls_seq', long) + self.peers = getMandatory(body, 'peers', list) + else: + self.id = _id + self.area = _area + self.ls_seq = long(_ls_seq) + self.peers = _peers + + def __repr__(self): + return "LS(id=%s area=%s ls_seq=%d peers=%r)" % (self.id, self.area, self.ls_seq, self.peers) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'ls_seq' : self.ls_seq, + 'peers' : self.peers} + + def add_peer(self, _id): + if self.peers.count(_id) == 0: + self.peers.append(_id) + return True + return False + + def del_peer(self, _id): + if self.peers.count(_id) > 0: + self.peers.remove(_id) + return True + return False + + def bump_sequence(self): + self.ls_seq += 1 + + +class MessageHELLO(object): + """ + HELLO Message + scope: neighbors only - HELLO messages travel at most one hop + This message is used by directly connected routers to determine with whom they have + bidirectional connectivity. + """ + def __init__(self, body, _id=None, _area=None, _seen_peers=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.seen_peers = getMandatory(body, 'seen', list) + else: + self.id = _id + self.area = _area + self.seen_peers = _seen_peers + + def __repr__(self): + return "HELLO(id=%s area=%s seen=%r)" % (self.id, self.area, self.seen_peers) + + def get_opcode(self): + return 'HELLO' + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'seen' : self.seen_peers} + + def is_seen(self, _id): + return self.seen_peers.count(_id) > 0 + + +class MessageRA(object): + """ + Router Advertisement (RA) Message + scope: all routers in the area and all designated routers + This message is sent periodically to indicate the originating router's sequence numbers + for link-state and mobile-address-state. + """ + def __init__(self, body, _id=None, _area=None, _ls_seq=None, _mobile_seq=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.ls_seq = getMandatory(body, 'ls_seq', long) + self.mobile_seq = getMandatory(body, 'mobile_seq', long) + else: + self.id = _id + self.area = _area + self.ls_seq = long(_ls_seq) + self.mobile_seq = long(_mobile_seq) + + def get_opcode(self): + return 'RA' + + def __repr__(self): + return "RA(id=%s area=%s ls_seq=%d mobile_seq=%d)" % \ + (self.id, self.area, self.ls_seq, self.mobile_seq) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'ls_seq' : self.ls_seq, + 'mobile_seq' : self.mobile_seq} + + +class MessageLSU(object): + """ + """ + def __init__(self, body, _id=None, _area=None, _ls_seq=None, _ls=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.ls_seq = getMandatory(body, 'ls_seq', long) + self.ls = LinkState(getMandatory(body, 'ls', dict)) + else: + self.id = _id + self.area = _area + self.ls_seq = long(_ls_seq) + self.ls = _ls + + def get_opcode(self): + return 'LSU' + + def __repr__(self): + return "LSU(id=%s area=%s ls_seq=%d ls=%r)" % \ + (self.id, self.area, self.ls_seq, self.ls) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'ls_seq' : self.ls_seq, + 'ls' : self.ls.to_dict()} + + +class MessageLSR(object): + """ + """ + def __init__(self, body, _id=None, _area=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + else: + self.id = _id + self.area = _area + + def get_opcode(self): + return 'LSR' + + def __repr__(self): + return "LSR(id=%s area=%s)" % (self.id, self.area) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area} + + +class MessageMAU(object): + """ + """ + def __init__(self, body, _id=None, _area=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.mobile_seq = getMandatory(body, 'mobile_seq', long) + self.add_list = getOptional(body, 'add', None, list) + self.del_list = getOptional(body, 'del', None, list) + self.exist_list = getOptional(body, 'exist', None, list) + else: + self.id = _id + self.area = _area + self.mobile_seq = long(_seq) + self.add_list = _add_list + self.del_list = _del_list + self.exist_list = _exist_list + + def get_opcode(self): + return 'MAU' + + def __repr__(self): + _add = '' + _del = '' + _exist = '' + if self.add_list: _add = ' add=%r' % self.add_list + if self.del_list: _del = ' del=%r' % self.del_list + if self.exist_list: _exist = ' exist=%r' % self.exist_list + return "MAU(id=%s area=%s mobile_seq=%d%s%s%s)" % \ + (self.id, self.area, self.mobile_seq, _add, _del, _exist) + + def to_dict(self): + body = { 'id' : self.id, + 'area' : self.area, + 'mobile_seq' : self.mobile_seq } + if self.add_list: body['add'] = self.add_list + if self.del_list: body['del'] = self.del_list + if self.exist_list: body['exist'] = self.exist_list + return body + + +class MessageMAR(object): + """ + """ + def __init__(self, body, _id=None, _area=None, _have_seq=None): + if body: + self.id = getMandatory(body, 'id', str) + self.area = getMandatory(body, 'area', str) + self.have_seq = getMandatory(body, 'have_seq', long) + else: + self.id = _id + self.area = _area + self.have_seq = long(_have_seq) + + def get_opcode(self): + return 'MAR' + + def __repr__(self): + return "MAR(id=%s area=%s have_seq=%d)" % (self.id, self.area, self.have_seq) + + def to_dict(self): + return {'id' : self.id, + 'area' : self.area, + 'have_seq' : self.have_seq} + diff --git a/qpid/extras/dispatch/src/py/router/link.py b/qpid/extras/dispatch/src/py/router/link.py new file mode 100644 index 0000000000..38494e71b7 --- /dev/null +++ b/qpid/extras/dispatch/src/py/router/link.py @@ -0,0 +1,143 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from data import MessageRA, MessageLSU, MessageLSR +from time import time + +TRACE = 0 +DEBUG = 1 +INFO = 2 +NOTICE = 3 +WARNING = 4 +ERROR = 5 +CRITICAL = 6 + +class LinkStateEngine(object): + """ + This module is responsible for running the Link State protocol and maintaining the set + of link states that are gathered from the domain. It notifies outbound when changes to + the link-state-collection are detected. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.ra_interval = self.container.config.ra_interval + self.remote_ls_max_age = self.container.config.remote_ls_max_age + self.last_ra_time = 0 + self.collection = {} + self.collection_changed = False + self.mobile_seq = 0 + self.needed_lsrs = {} + + + def tick(self, now): + self._expire_ls(now) + self._send_lsrs() + + if now - self.last_ra_time >= self.ra_interval: + self.last_ra_time = now + self._send_ra() + + if self.collection_changed: + self.collection_changed = False + self.container.log(INFO, "New Link-State Collection:") + for a,b in self.collection.items(): + self.container.log(INFO, " %s => %r" % (a, b.peers)) + self.container.ls_collection_changed(self.collection) + + + def handle_ra(self, msg, now): + if msg.id == self.id: + return + if msg.id in self.collection: + ls = self.collection[msg.id] + ls.last_seen = now + if ls.ls_seq < msg.ls_seq: + self.needed_lsrs[(msg.area, msg.id)] = None + else: + self.needed_lsrs[(msg.area, msg.id)] = None + + + def handle_lsu(self, msg, now): + if msg.id == self.id: + return + if msg.id in self.collection: + ls = self.collection[msg.id] + if ls.ls_seq < msg.ls_seq: + ls = msg.ls + self.collection[msg.id] = ls + self.collection_changed = True + ls.last_seen = now + else: + ls = msg.ls + self.collection[msg.id] = ls + self.collection_changed = True + ls.last_seen = now + self.container.log(INFO, "Learned link-state from new router: %s" % msg.id) + # Schedule LSRs for any routers referenced in this LS that we don't know about + for _id in msg.ls.peers: + if _id not in self.collection: + self.needed_lsrs[(msg.area, _id)] = None + + + def handle_lsr(self, msg, now): + if msg.id == self.id: + return + if self.id not in self.collection: + return + my_ls = self.collection[self.id] + self.container.send('_topo.%s.%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls)) + + + def new_local_link_state(self, link_state): + self.collection[self.id] = link_state + self.collection_changed = True + self._send_ra() + + def set_mobile_sequence(self, seq): + self.mobile_seq = seq + + + def get_collection(self): + return self.collection + + + def _expire_ls(self, now): + to_delete = [] + for key, ls in self.collection.items(): + if key != self.id and now - ls.last_seen > self.remote_ls_max_age: + to_delete.append(key) + for key in to_delete: + ls = self.collection.pop(key) + self.collection_changed = True + self.container.log(INFO, "Expired link-state from router: %s" % key) + + + def _send_lsrs(self): + for (_area, _id) in self.needed_lsrs.keys(): + self.container.send('_topo.%s.%s' % (_area, _id), MessageLSR(None, self.id, self.area)) + self.needed_lsrs = {} + + + def _send_ra(self): + ls_seq = 0 + if self.id in self.collection: + ls_seq = self.collection[self.id].ls_seq + self.container.send('_topo.%s.all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq)) diff --git a/qpid/extras/dispatch/src/py/router/mobile.py b/qpid/extras/dispatch/src/py/router/mobile.py new file mode 100644 index 0000000000..0dd53af649 --- /dev/null +++ b/qpid/extras/dispatch/src/py/router/mobile.py @@ -0,0 +1,183 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from data import MessageRA, MessageMAR, MessageMAU + +class MobileAddressEngine(object): + """ + This module is responsible for maintaining an up-to-date list of mobile addresses in the domain. + It runs the Mobile-Address protocol and generates an un-optimized routing table for mobile addresses. + Note that this routing table maps from the mobile address to the remote router where that address + is directly bound. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.mobile_addr_max_age = self.container.config.mobile_addr_max_age + self.mobile_seq = 0 + self.local_keys = [] + self.added_keys = [] + self.deleted_keys = [] + self.remote_lists = {} # map router_id => (sequence, list of keys) + self.remote_last_seen = {} # map router_id => time of last seen advertizement/update + self.remote_changed = False + self.needed_mars = {} + + + def tick(self, now): + self._expire_remotes(now) + self._send_mars() + + ## + ## If local keys have changed, collect the changes and send a MAU with the diffs + ## Note: it is important that the differential-MAU be sent before a RA is sent + ## + if len(self.added_keys) > 0 or len(self.deleted_keys) > 0: + self.mobile_seq += 1 + self.container.send('_topo.%s.all' % self.area, + MessageMAU(None, self.id, self.area, self.mobile_seq, self.added_keys, self.deleted_keys)) + self.local_keys.extend(self.added_keys) + for key in self.deleted_keys: + self.local_keys.remove(key) + self.added_keys = [] + self.deleted_keys = [] + self.container.mobile_sequence_changed(self.mobile_seq) + + ## + ## If remotes have changed, start the process of updating local bindings + ## + if self.remote_changed: + self.remote_changed = False + self._update_remote_keys() + + + def add_local_address(self, key): + """ + """ + if self.local_keys.count(key) == 0: + if self.added_keys.count(key) == 0: + self.added_keys.append(key) + else: + if self.deleted_keys.count(key) > 0: + self.deleted_keys.remove(key) + + + def del_local_address(self, key): + """ + """ + if self.local_keys.count(key) > 0: + if self.deleted_keys.count(key) == 0: + self.deleted_keys.append(key) + else: + if self.added_keys.count(key) > 0: + self.added_keys.remove(key) + + + def handle_ra(self, msg, now): + if msg.id == self.id: + return + + if msg.mobile_seq == 0: + return + + if msg.id in self.remote_lists: + _seq, _list = self.remote_lists[msg.id] + self.remote_last_seen[msg.id] = now + if _seq < msg.mobile_seq: + self.needed_mars[(msg.id, msg.area, _seq)] = None + else: + self.needed_mars[(msg.id, msg.area, 0)] = None + + + def handle_mau(self, msg, now): + ## + ## If the MAU is differential, we can only use it if its sequence is exactly one greater + ## than our stored sequence. If not, we will ignore the content and schedule a MAR. + ## + ## If the MAU is absolute, we can use it in all cases. + ## + if msg.id == self.id: + return + + if msg.exist_list: + ## + ## Absolute MAU + ## + if msg.id in self.remote_lists: + _seq, _list = self.remote_lists[msg.id] + if _seq >= msg.mobile_seq: # ignore duplicates + return + self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list) + self.remote_last_seen[msg.id] = now + self.remote_changed = True + else: + ## + ## Differential MAU + ## + if msg.id in self.remote_lists: + _seq, _list = self.remote_lists[msg.id] + if _seq == msg.mobile_seq: # ignore duplicates + return + self.remote_last_seen[msg.id] = now + if _seq + 1 == msg.mobile_seq: + ## + ## This is one greater than our stored value, incorporate the deltas + ## + if msg.add_list and msg.add_list.__class__ == list: + _list.extend(msg.add_list) + if msg.del_list and msg.del_list.__class__ == list: + for key in msg.del_list: + _list.remove(key) + self.remote_lists[msg.id] = (msg.mobile_seq, _list) + self.remote_changed = True + else: + self.needed_mars[(msg.id, msg.area, _seq)] = None + else: + self.needed_mars[(msg.id, msg.area, 0)] = None + + + def handle_mar(self, msg, now): + if msg.id == self.id: + return + if msg.have_seq < self.mobile_seq: + self.container.send('_topo.%s.%s' % (msg.area, msg.id), + MessageMAU(None, self.id, self.area, self.mobile_seq, None, None, self.local_keys)) + + + def _update_remote_keys(self): + keys = {} + for _id,(seq,key_list) in self.remote_lists.items(): + keys[_id] = key_list + self.container.mobile_keys_changed(keys) + + + def _expire_remotes(self, now): + for _id, t in self.remote_last_seen.items(): + if now - t > self.mobile_addr_max_age: + self.remote_lists.pop(_id) + self.remote_last_seen.pop(_id) + self.remote_changed = True + + + def _send_mars(self): + for _id, _area, _seq in self.needed_mars.keys(): + self.container.send('_topo.%s.%s' % (_area, _id), MessageMAR(None, self.id, self.area, _seq)) + self.needed_mars = {} + diff --git a/qpid/extras/dispatch/src/py/router/neighbor.py b/qpid/extras/dispatch/src/py/router/neighbor.py new file mode 100644 index 0000000000..ece9c07ef1 --- /dev/null +++ b/qpid/extras/dispatch/src/py/router/neighbor.py @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from data import LinkState, MessageHELLO +from time import time + +TRACE = 0 +DEBUG = 1 +INFO = 2 +NOTICE = 3 +WARNING = 4 +ERROR = 5 +CRITICAL = 6 + +class NeighborEngine(object): + """ + This module is responsible for maintaining this router's link-state. It runs the HELLO protocol + with the router's neighbors and notifies outbound when the list of neighbors-in-good-standing (the + link-state) changes. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.last_hello_time = 0.0 + self.hello_interval = container.config.hello_interval + self.hello_max_age = container.config.hello_max_age + self.hellos = {} + self.link_state_changed = False + self.link_state = LinkState(None, self.id, self.area, 0, []) + + + def tick(self, now): + self._expire_hellos(now) + + if now - self.last_hello_time >= self.hello_interval: + self.last_hello_time = now + self.container.send('_peer', MessageHELLO(None, self.id, self.area, self.hellos.keys())) + + if self.link_state_changed: + self.link_state_changed = False + self.link_state.bump_sequence() + self.container.local_link_state_changed(self.link_state) + + + def handle_hello(self, msg, now): + if msg.id == self.id: + return + self.hellos[msg.id] = now + if msg.is_seen(self.id): + if self.link_state.add_peer(msg.id): + self.link_state_changed = True + self.container.log(INFO, "New neighbor established: %s" % msg.id) + ## + ## TODO - Use this function to detect area boundaries + ## + + def _expire_hellos(self, now): + to_delete = [] + for key, last_seen in self.hellos.items(): + if now - last_seen > self.hello_max_age: + to_delete.append(key) + for key in to_delete: + self.hellos.pop(key) + if self.link_state.del_peer(key): + self.link_state_changed = True + self.container.log(INFO, "Neighbor lost: %s" % key) + + diff --git a/qpid/extras/dispatch/src/py/router/path.py b/qpid/extras/dispatch/src/py/router/path.py new file mode 100644 index 0000000000..018c967f7a --- /dev/null +++ b/qpid/extras/dispatch/src/py/router/path.py @@ -0,0 +1,205 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +TRACE = 0 +DEBUG = 1 +INFO = 2 +NOTICE = 3 +WARNING = 4 +ERROR = 5 +CRITICAL = 6 + +class PathEngine(object): + """ + This module is responsible for computing the next-hop for every router/area in the domain + based on the collection of link states that have been gathered. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.recalculate = False + self.collection = None + + + def tick(self, now_unused): + if self.recalculate: + self.recalculate = False + self._calculate_routes() + + + def ls_collection_changed(self, collection): + self.recalculate = True + self.collection = collection + + + def _calculate_tree_from_root(self, root): + ## + ## Make a copy of the current collection of link-states that contains + ## an empty link-state for nodes that are known-peers but are not in the + ## collection currently. This is needed to establish routes to those nodes + ## so we can trade link-state information with them. + ## + link_states = {} + for _id, ls in self.collection.items(): + link_states[_id] = ls.peers + for p in ls.peers: + if p not in link_states: + link_states[p] = [] + + ## + ## Setup Dijkstra's Algorithm + ## + cost = {} + prev = {} + for _id in link_states: + cost[_id] = None # infinite + prev[_id] = None # undefined + cost[root] = 0 # no cost to the root node + unresolved = NodeSet(cost) + + ## + ## Process unresolved nodes until lowest cost paths to all reachable nodes have been found. + ## + while not unresolved.empty(): + u = unresolved.lowest_cost() + if cost[u] == None: + # There are no more reachable nodes in unresolved + break + for v in link_states[u]: + if unresolved.contains(v): + alt = cost[u] + 1 # TODO - Use link cost instead of 1 + if cost[v] == None or alt < cost[v]: + cost[v] = alt + prev[v] = u + unresolved.set_cost(v, alt) + + ## + ## Remove unreachable nodes from the map. Note that this will also remove the + ## root node (has no previous node) from the map. + ## + for u, val in prev.items(): + if not val: + prev.pop(u) + + ## + ## Return previous-node map. This is a map of all reachable, remote nodes to + ## their predecessor node. + ## + return prev + + + def _calculate_routes(self): + ## + ## Generate the shortest-path tree with the local node as root + ## + prev = self._calculate_tree_from_root(self.id) + nodes = prev.keys() + + ## + ## Distill the path tree into a map of next hops for each node + ## + next_hops = {} + while len(nodes) > 0: + u = nodes[0] # pick any destination + path = [u] + nodes.remove(u) + v = prev[u] + while v != self.id: # build a list of nodes in the path back to the root + if v in nodes: + path.append(v) + nodes.remove(v) + u = v + v = prev[u] + for w in path: # mark each node in the path as reachable via the next hop + next_hops[w] = u + + ## + ## TODO - Calculate the tree from each origin, determine the set of origins-per-dest + ## for which the path from origin to dest passes through us. This is the set + ## of valid origins for forwarding to the destination. + ## + + self.container.next_hops_changed(next_hops) + + +class NodeSet(object): + """ + This data structure is an ordered list of node IDs, sorted in increasing order by their cost. + Equal cost nodes are secondarily sorted by their ID in order to provide deterministic and + repeatable ordering. + """ + def __init__(self, cost_map): + self.nodes = [] + for _id, cost in cost_map.items(): + ## + ## Assume that nodes are either unreachable (cost = None) or local (cost = 0) + ## during this initialization. + ## + if cost == 0: + self.nodes.insert(0, (_id, cost)) + else: + ## + ## There is no need to sort unreachable nodes by ID + ## + self.nodes.append((_id, cost)) + + + def __repr__(self): + return self.nodes.__repr__() + + + def empty(self): + return len(self.nodes) == 0 + + + def contains(self, _id): + for a, b in self.nodes: + if a == _id: + return True + return False + + + def lowest_cost(self): + """ + Remove and return the lowest cost node ID. + """ + _id, cost = self.nodes.pop(0) + return _id + + + def set_cost(self, _id, new_cost): + """ + Set the cost for an ID in the NodeSet and re-insert the ID so that the list + remains sorted in increasing cost order. + """ + index = 0 + for i, c in self.nodes: + if i == _id: + break + index += 1 + self.nodes.pop(index) + + index = 0 + for i, c in self.nodes: + if c == None or new_cost < c or (new_cost == c and _id < i): + break + index += 1 + + self.nodes.insert(index, (_id, new_cost)) diff --git a/qpid/extras/dispatch/src/py/router/router_engine.py b/qpid/extras/dispatch/src/py/router/router_engine.py new file mode 100644 index 0000000000..e5bf5517b5 --- /dev/null +++ b/qpid/extras/dispatch/src/py/router/router_engine.py @@ -0,0 +1,246 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from time import time +from uuid import uuid4 + +from configuration import Configuration +from data import * +from neighbor import NeighborEngine +from link import LinkStateEngine +from path import PathEngine +from mobile import MobileAddressEngine +from routing import RoutingTableEngine +from binding import BindingEngine +from adapter import AdapterEngine + +TRACE = 0 +DEBUG = 1 +INFO = 2 +NOTICE = 3 +WARNING = 4 +ERROR = 5 +CRITICAL = 6 + +class RouterEngine: + """ + """ + + def __init__(self, adapter, domain, router_id=None, area='area', config_override={}): + """ + Initialize an instance of a router for a domain. + """ + ## + ## Record important information about this router instance + ## + self.adapter = adapter + self.domain = domain + if router_id: + self.id = router_id + else: + self.id = str(uuid4()) + self.area = area + self.log(NOTICE, "Router Engine Instantiated: area=%s id=%s" % (self.area, self.id)) + + ## + ## Setup configuration + ## + self.config = Configuration(config_override) + self.log(INFO, "Config: %r" % self.config) + + ## + ## Launch the sub-module engines + ## + self.neighbor_engine = NeighborEngine(self) + self.link_state_engine = LinkStateEngine(self) + self.path_engine = PathEngine(self) + self.mobile_address_engine = MobileAddressEngine(self) + self.routing_table_engine = RoutingTableEngine(self) + self.binding_engine = BindingEngine(self) + self.adapter_engine = AdapterEngine(self) + + ## + ## Establish the local bindings so that this router instance can receive + ## traffic addressed to it + ## + self.adapter.local_bind('router') + self.adapter.local_bind('_topo/%s/%s' % (self.area, self.id)) + self.adapter.local_bind('_topo/%s/all' % self.area) + + + ##======================================================================================== + ## Adapter Entry Points - invoked from the adapter + ##======================================================================================== + def getId(self): + """ + Return the router's ID + """ + return self.id + + + def addLocalAddress(self, key): + """ + """ + try: + if key.find('_topo') == 0 or key.find('_local') == 0: + return + self.mobile_address_engine.add_local_address(key) + except Exception, e: + self.log(ERROR, "Exception in new-address processing: exception=%r" % e) + + def delLocalAddress(self, key): + """ + """ + try: + if key.find('_topo') == 0 or key.find('_local') == 0: + return + self.mobile_address_engine.del_local_address(key) + except Exception, e: + self.log(ERROR, "Exception in del-address processing: exception=%r" % e) + + + def handleTimerTick(self): + """ + """ + try: + now = time() + self.neighbor_engine.tick(now) + self.link_state_engine.tick(now) + self.path_engine.tick(now) + self.mobile_address_engine.tick(now) + self.routing_table_engine.tick(now) + self.binding_engine.tick(now) + self.adapter_engine.tick(now) + except Exception, e: + self.log(ERROR, "Exception in timer processing: exception=%r" % e) + + + def handleControlMessage(self, opcode, body): + """ + """ + try: + now = time() + if opcode == 'HELLO': + msg = MessageHELLO(body) + self.log(TRACE, "RCVD: %r" % msg) + self.neighbor_engine.handle_hello(msg, now) + + elif opcode == 'RA': + msg = MessageRA(body) + self.log(TRACE, "RCVD: %r" % msg) + self.link_state_engine.handle_ra(msg, now) + self.mobile_address_engine.handle_ra(msg, now) + + elif opcode == 'LSU': + msg = MessageLSU(body) + self.log(TRACE, "RCVD: %r" % msg) + self.link_state_engine.handle_lsu(msg, now) + + elif opcode == 'LSR': + msg = MessageLSR(body) + self.log(TRACE, "RCVD: %r" % msg) + self.link_state_engine.handle_lsr(msg, now) + + elif opcode == 'MAU': + msg = MessageMAU(body) + self.log(TRACE, "RCVD: %r" % msg) + self.mobile_address_engine.handle_mau(msg, now) + + elif opcode == 'MAR': + msg = MessageMAR(body) + self.log(TRACE, "RCVD: %r" % msg) + self.mobile_address_engine.handle_mar(msg, now) + + except Exception, e: + self.log(ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e)) + + + def getRouterData(self, kind): + """ + """ + if kind == 'help': + return { 'help' : "Get list of supported values for kind", + 'link-state' : "This router's link state", + 'link-state-set' : "The set of link states from known routers", + 'next-hops' : "Next hops to each known router", + 'topo-table' : "Topological routing table", + 'mobile-table' : "Mobile key routing table" + } + if kind == 'link-state' : return self.neighbor_engine.link_state.to_dict() + if kind == 'next-hops' : return self.routing_table_engine.next_hops + if kind == 'topo-table' : return {'table': self.adapter_engine.key_classes['topological']} + if kind == 'mobile-table' : return {'table': self.adapter_engine.key_classes['mobile-key']} + if kind == 'link-state-set' : + copy = {} + for _id,_ls in self.link_state_engine.collection.items(): + copy[_id] = _ls.to_dict() + return copy + + return {'notice':'Use kind="help" to get a list of possibilities'} + + + ##======================================================================================== + ## Adapter Calls - outbound calls to the adapter + ##======================================================================================== + def log(self, level, text): + """ + Emit a log message to the host's event log + """ + self.adapter.log(level, text) + + + def send(self, dest, msg): + """ + Send a control message to another router. + """ + self.adapter.send(dest, msg.get_opcode(), msg.to_dict()) + self.log(TRACE, "SENT: %r dest=%s" % (msg, dest)) + + + ##======================================================================================== + ## Interconnect between the Sub-Modules + ##======================================================================================== + def local_link_state_changed(self, link_state): + self.log(DEBUG, "Event: local_link_state_changed: %r" % link_state) + self.link_state_engine.new_local_link_state(link_state) + + def ls_collection_changed(self, collection): + self.log(DEBUG, "Event: ls_collection_changed: %r" % collection) + self.path_engine.ls_collection_changed(collection) + + def next_hops_changed(self, next_hop_table): + self.log(DEBUG, "Event: next_hops_changed: %r" % next_hop_table) + self.routing_table_engine.next_hops_changed(next_hop_table) + self.binding_engine.next_hops_changed() + + def mobile_sequence_changed(self, mobile_seq): + self.log(DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq) + self.link_state_engine.set_mobile_sequence(mobile_seq) + + def mobile_keys_changed(self, keys): + self.log(DEBUG, "Event: mobile_keys_changed: %r" % keys) + self.binding_engine.mobile_keys_changed(keys) + + def get_next_hops(self): + return self.routing_table_engine.get_next_hops() + + def remote_routes_changed(self, key_class, routes): + self.log(DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes)) + self.adapter_engine.remote_routes_changed(key_class, routes) + diff --git a/qpid/extras/dispatch/src/py/router/routing.py b/qpid/extras/dispatch/src/py/router/routing.py new file mode 100644 index 0000000000..439da66653 --- /dev/null +++ b/qpid/extras/dispatch/src/py/router/routing.py @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +TRACE = 0 +DEBUG = 1 +INFO = 2 +NOTICE = 3 +WARNING = 4 +ERROR = 5 +CRITICAL = 6 + +class RoutingTableEngine(object): + """ + This module is responsible for converting the set of next hops to remote routers to a routing + table in the "topological" address class. + """ + def __init__(self, container): + self.container = container + self.id = self.container.id + self.area = self.container.area + self.next_hops = {} + + + def tick(self, now): + pass + + + def next_hops_changed(self, next_hops): + # Convert next_hops into routing table + self.next_hops = next_hops + new_table = [] + for _id, next_hop in next_hops.items(): + new_table.append(('_topo.%s.%s.#' % (self.area, _id), next_hop)) + pair = ('_topo.%s.all' % (self.area), next_hop) + if new_table.count(pair) == 0: + new_table.append(pair) + + self.container.remote_routes_changed('topological', new_table) + + + def get_next_hops(self): + return self.next_hops + diff --git a/qpid/extras/dispatch/tests/CMakeLists.txt b/qpid/extras/dispatch/tests/CMakeLists.txt index 3d90f163b3..351a0e4764 100644 --- a/qpid/extras/dispatch/tests/CMakeLists.txt +++ b/qpid/extras/dispatch/tests/CMakeLists.txt @@ -49,3 +49,4 @@ add_test(unit_tests_size_3 unit_tests_size 3) add_test(unit_tests_size_2 unit_tests_size 2) add_test(unit_tests_size_1 unit_tests_size 1) add_test(unit_tests unit_tests ${CMAKE_CURRENT_SOURCE_DIR}/threads4.conf) +add_test(router_tests python ${CMAKE_CURRENT_SOURCE_DIR}/router_engine_test.py -v) diff --git a/qpid/extras/dispatch/tests/router_engine_test.py b/qpid/extras/dispatch/tests/router_engine_test.py new file mode 100644 index 0000000000..ff3acb3850 --- /dev/null +++ b/qpid/extras/dispatch/tests/router_engine_test.py @@ -0,0 +1,410 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +from router.router_engine import NeighborEngine, PathEngine, Configuration +from router.data import LinkState, MessageHELLO + +class Adapter(object): + def __init__(self, domain): + self._domain = domain + + def log(self, level, text): + print "Adapter.log(%d): domain=%s, text=%s" % (level, self._domain, text) + + def send(self, dest, opcode, body): + print "Adapter.send: domain=%s, dest=%s, opcode=%s, body=%s" % (self._domain, dest, opcode, body) + + def local_bind(self, key): + print "Adapter.local_bind: key=%s" % key + + def remote_bind(self, subject, peer): + print "Adapter.remote_bind: subject=%s, peer=%s" % (subject, peer) + + def remote_unbind(self, subject, peer): + print "Adapter.remote_unbind: subject=%s, peer=%s" % (subject, peer) + + def remote_rebind(self, subject, old_peer, new_peer): + print "Adapter.remote_rebind: subject=%s, old_peer=%s, new_peer=%s" % (subject, old_peer, new_peer) + + +class DataTest(unittest.TestCase): + def test_link_state(self): + ls = LinkState(None, 'R1', 'area', 1, ['R2', 'R3']) + self.assertEqual(ls.id, 'R1') + self.assertEqual(ls.area, 'area') + self.assertEqual(ls.ls_seq, 1) + self.assertEqual(ls.peers, ['R2', 'R3']) + ls.bump_sequence() + self.assertEqual(ls.id, 'R1') + self.assertEqual(ls.area, 'area') + self.assertEqual(ls.ls_seq, 2) + self.assertEqual(ls.peers, ['R2', 'R3']) + + result = ls.add_peer('R4') + self.assertTrue(result) + self.assertEqual(ls.peers, ['R2', 'R3', 'R4']) + result = ls.add_peer('R2') + self.assertFalse(result) + self.assertEqual(ls.peers, ['R2', 'R3', 'R4']) + + result = ls.del_peer('R3') + self.assertTrue(result) + self.assertEqual(ls.peers, ['R2', 'R4']) + result = ls.del_peer('R5') + self.assertFalse(result) + self.assertEqual(ls.peers, ['R2', 'R4']) + + encoded = ls.to_dict() + new_ls = LinkState(encoded) + self.assertEqual(new_ls.id, 'R1') + self.assertEqual(new_ls.area, 'area') + self.assertEqual(new_ls.ls_seq, 2) + self.assertEqual(new_ls.peers, ['R2', 'R4']) + + + def test_hello_message(self): + msg1 = MessageHELLO(None, 'R1', 'area', ['R2', 'R3', 'R4']) + self.assertEqual(msg1.get_opcode(), "HELLO") + self.assertEqual(msg1.id, 'R1') + self.assertEqual(msg1.area, 'area') + self.assertEqual(msg1.seen_peers, ['R2', 'R3', 'R4']) + encoded = msg1.to_dict() + msg2 = MessageHELLO(encoded) + self.assertEqual(msg2.get_opcode(), "HELLO") + self.assertEqual(msg2.id, 'R1') + self.assertEqual(msg2.area, 'area') + self.assertEqual(msg2.seen_peers, ['R2', 'R3', 'R4']) + self.assertTrue(msg2.is_seen('R3')) + self.assertFalse(msg2.is_seen('R9')) + + + +class NeighborTest(unittest.TestCase): + def log(self, level, text): + pass + + def send(self, dest, msg): + self.sent.append((dest, msg)) + + def local_link_state_changed(self, link_state): + self.local_link_state = link_state + + def setUp(self): + self.sent = [] + self.local_link_state = None + self.id = "R1" + self.area = "area" + self.config = Configuration() + + def test_hello_sent(self): + self.sent = [] + self.local_link_state = None + self.engine = NeighborEngine(self) + self.engine.tick(0.5) + self.assertEqual(self.sent, []) + self.engine.tick(1.5) + self.assertEqual(len(self.sent), 1) + dest, msg = self.sent.pop(0) + self.assertEqual(dest, "_peer") + self.assertEqual(msg.get_opcode(), "HELLO") + self.assertEqual(msg.id, self.id) + self.assertEqual(msg.area, self.area) + self.assertEqual(msg.seen_peers, []) + self.assertEqual(self.local_link_state, None) + + def test_sees_peer(self): + self.sent = [] + self.local_link_state = None + self.engine = NeighborEngine(self) + self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', []), 2.0) + self.engine.tick(5.0) + self.assertEqual(len(self.sent), 1) + dest, msg = self.sent.pop(0) + self.assertEqual(msg.seen_peers, ['R2']) + + def test_establish_peer(self): + self.sent = [] + self.local_link_state = None + self.engine = NeighborEngine(self) + self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R1']), 0.5) + self.engine.tick(1.0) + self.engine.tick(2.0) + self.engine.tick(3.0) + self.assertEqual(self.local_link_state.id, 'R1') + self.assertEqual(self.local_link_state.area, 'area') + self.assertEqual(self.local_link_state.ls_seq, 1) + self.assertEqual(self.local_link_state.peers, ['R2']) + + def test_establish_multiple_peers(self): + self.sent = [] + self.local_link_state = None + self.engine = NeighborEngine(self) + self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R1']), 0.5) + self.engine.tick(1.0) + self.engine.handle_hello(MessageHELLO(None, 'R3', 'area', ['R1', 'R2']), 1.5) + self.engine.tick(2.0) + self.engine.handle_hello(MessageHELLO(None, 'R4', 'area', ['R1']), 2.5) + self.engine.handle_hello(MessageHELLO(None, 'R5', 'area', ['R2']), 2.5) + self.engine.handle_hello(MessageHELLO(None, 'R6', 'area', ['R1']), 2.5) + self.engine.tick(3.0) + self.assertEqual(self.local_link_state.id, 'R1') + self.assertEqual(self.local_link_state.area, 'area') + self.assertEqual(self.local_link_state.ls_seq, 3) + self.local_link_state.peers.sort() + self.assertEqual(self.local_link_state.peers, ['R2', 'R3', 'R4', 'R6']) + + def test_timeout_peer(self): + self.sent = [] + self.local_link_state = None + self.engine = NeighborEngine(self) + self.engine.handle_hello(MessageHELLO(None, 'R2', 'area', ['R3', 'R1']), 2.0) + self.engine.tick(5.0) + self.engine.tick(17.1) + self.assertEqual(self.local_link_state.id, 'R1') + self.assertEqual(self.local_link_state.area, 'area') + self.assertEqual(self.local_link_state.ls_seq, 2) + self.assertEqual(self.local_link_state.peers, []) + + +class PathTest(unittest.TestCase): + def setUp(self): + self.id = 'R1' + self.area = 'area' + self.next_hops = None + self.engine = PathEngine(self) + + def log(self, level, text): + pass + + def next_hops_changed(self, nh): + self.next_hops = nh + + def test_topology1(self): + """ + + +====+ +----+ +----+ + | R1 |------| R2 |------| R3 | + +====+ +----+ +----+ + + """ + collection = { 'R1': LinkState(None, 'R1', 'area', 1, ['R2']), + 'R2': LinkState(None, 'R2', 'area', 1, ['R1', 'R3']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R2']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 2) + self.assertEqual(self.next_hops['R2'], 'R2') + self.assertEqual(self.next_hops['R3'], 'R2') + + def test_topology2(self): + """ + + +====+ +----+ +----+ + | R1 |------| R2 |------| R4 | + +====+ +----+ +----+ + | | + +----+ +----+ +----+ + | R3 |------| R5 |------| R6 | + +----+ +----+ +----+ + + """ + collection = { 'R1': LinkState(None, 'R1', 'area', 1, ['R2']), + 'R2': LinkState(None, 'R2', 'area', 1, ['R1', 'R3', 'R4']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R2', 'R5']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R2', 'R5']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R3', 'R4', 'R6']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 5) + self.assertEqual(self.next_hops['R2'], 'R2') + self.assertEqual(self.next_hops['R3'], 'R2') + self.assertEqual(self.next_hops['R4'], 'R2') + self.assertEqual(self.next_hops['R5'], 'R2') + self.assertEqual(self.next_hops['R6'], 'R2') + + def test_topology3(self): + """ + + +----+ +----+ +----+ + | R2 |------| R3 |------| R4 | + +----+ +----+ +----+ + | | + +====+ +----+ +----+ + | R1 |------| R5 |------| R6 | + +====+ +----+ +----+ + + """ + collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), + 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 5) + self.assertEqual(self.next_hops['R2'], 'R3') + self.assertEqual(self.next_hops['R3'], 'R3') + self.assertEqual(self.next_hops['R4'], 'R3') + self.assertEqual(self.next_hops['R5'], 'R5') + self.assertEqual(self.next_hops['R6'], 'R5') + + def test_topology4(self): + """ + + +----+ +----+ +----+ + | R2 |------| R3 |------| R4 | + +----+ +----+ +----+ + | | + +====+ +----+ +----+ + | R1 |------| R5 |------| R6 |------ R7 (no ls from R7) + +====+ +----+ +----+ + + """ + collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), + 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 6) + self.assertEqual(self.next_hops['R2'], 'R3') + self.assertEqual(self.next_hops['R3'], 'R3') + self.assertEqual(self.next_hops['R4'], 'R3') + self.assertEqual(self.next_hops['R5'], 'R5') + self.assertEqual(self.next_hops['R6'], 'R5') + self.assertEqual(self.next_hops['R7'], 'R5') + + def test_topology5(self): + """ + + +----+ +----+ +----+ + | R2 |------| R3 |------| R4 | + +----+ +----+ +----+ + | | | + | +====+ +----+ +----+ + +--------| R1 |------| R5 |------| R6 |------ R7 (no ls from R7) + +====+ +----+ +----+ + + """ + collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), + 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5', 'R2']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 6) + self.assertEqual(self.next_hops['R2'], 'R2') + self.assertEqual(self.next_hops['R3'], 'R3') + self.assertEqual(self.next_hops['R4'], 'R3') + self.assertEqual(self.next_hops['R5'], 'R5') + self.assertEqual(self.next_hops['R6'], 'R5') + self.assertEqual(self.next_hops['R7'], 'R5') + + def test_topology5_with_asymmetry1(self): + """ + + +----+ +----+ +----+ + | R2 |------| R3 |------| R4 | + +----+ +----+ +----+ + ^ | | + ^ +====+ +----+ +----+ + +-<-<-<--| R1 |------| R5 |------| R6 |------ R7 (no ls from R7) + +====+ +----+ +----+ + + """ + collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), + 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5', 'R2']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 6) + self.assertEqual(self.next_hops['R2'], 'R2') + self.assertEqual(self.next_hops['R3'], 'R3') + self.assertEqual(self.next_hops['R4'], 'R3') + self.assertEqual(self.next_hops['R5'], 'R5') + self.assertEqual(self.next_hops['R6'], 'R5') + self.assertEqual(self.next_hops['R7'], 'R5') + + def test_topology5_with_asymmetry2(self): + """ + + +----+ +----+ +----+ + | R2 |------| R3 |------| R4 | + +----+ +----+ +----+ + v | | + v +====+ +----+ +----+ + +->->->->| R1 |------| R5 |------| R6 |------ R7 (no ls from R7) + +====+ +----+ +----+ + + """ + collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), + 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4', 'R6']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 6) + self.assertEqual(self.next_hops['R2'], 'R3') + self.assertEqual(self.next_hops['R3'], 'R3') + self.assertEqual(self.next_hops['R4'], 'R3') + self.assertEqual(self.next_hops['R5'], 'R5') + self.assertEqual(self.next_hops['R6'], 'R5') + self.assertEqual(self.next_hops['R7'], 'R5') + + def test_topology5_with_asymmetry3(self): + """ + + +----+ +----+ +----+ + | R2 |------| R3 |------| R4 | + +----+ +----+ +----+ + v | | + v +====+ +----+ +----+ + +->->->->| R1 |------| R5 |<-<-<-| R6 |------ R7 (no ls from R7) + +====+ +----+ +----+ + + """ + collection = { 'R2': LinkState(None, 'R2', 'area', 1, ['R3', 'R1']), + 'R3': LinkState(None, 'R3', 'area', 1, ['R1', 'R2', 'R4']), + 'R4': LinkState(None, 'R4', 'area', 1, ['R3', 'R5']), + 'R1': LinkState(None, 'R1', 'area', 1, ['R3', 'R5']), + 'R5': LinkState(None, 'R5', 'area', 1, ['R1', 'R4']), + 'R6': LinkState(None, 'R6', 'area', 1, ['R5', 'R7']) } + self.engine.ls_collection_changed(collection) + self.engine.tick(1.0) + self.assertEqual(len(self.next_hops), 4) + self.assertEqual(self.next_hops['R2'], 'R3') + self.assertEqual(self.next_hops['R3'], 'R3') + self.assertEqual(self.next_hops['R4'], 'R3') + self.assertEqual(self.next_hops['R5'], 'R5') + + +if __name__ == '__main__': + unittest.main() |
