diff options
| author | Ted Ross <tross@apache.org> | 2013-08-08 12:17:23 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-08-08 12:17:23 +0000 |
| commit | bf9cb6739b8f49a866a3770df587361374ef8d04 (patch) | |
| tree | 12d9e4a233cdfb0b21e45ea21773e6aabbaf47da /qpid/extras/dispatch | |
| parent | ccd770b8d4cb231ad43a55fd83dbf577f67b4aec (diff) | |
| download | qpid-python-bf9cb6739b8f49a866a3770df587361374ef8d04.tar.gz | |
QPID-5045 - Added system tests for the routing scenarios, fixed discovered defects.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1511737 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch')
| -rw-r--r-- | qpid/extras/dispatch/src/container.c | 6 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 26 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/CMakeLists.txt | 5 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/onerouter.conf | 54 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/system_tests_one_router.py | 191 |
5 files changed, 268 insertions, 14 deletions
diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c index 4a38cfc69c..d4766f0432 100644 --- a/qpid/extras/dispatch/src/container.c +++ b/qpid/extras/dispatch/src/container.c @@ -326,14 +326,12 @@ static int process_handler(dx_container_t *container, void* unused, pn_connectio // // Step 2.5: Traverse all of the links on the connection looking for - // outgoing links with non-zero credit. Call the attached node's - // writable handler for such links. + // links. Call the attached node's writable handler for such links. // pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); while (pn_link) { assert(pn_session_connection(pn_link_session(pn_link)) == conn); - if (pn_link_is_sender(pn_link)) - event_count += do_writable(pn_link); + event_count += do_writable(pn_link); pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); } diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 1c5e7c266c..0bab382b52 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -228,10 +228,14 @@ static int router_writable_link_handler(void* context, dx_link_t *link) DEQ_REMOVE_HEAD(events); if (re->delivery) { - if (re->disposition) + if (re->disposition) { pn_delivery_update(dx_delivery_pn(re->delivery), re->disposition); - if (re->settle) + event_count++; + } + if (re->settle) { dx_delivery_free(re->delivery, 0); + event_count++; + } } free_dx_routed_event_t(re); @@ -1096,14 +1100,16 @@ static void dx_pyrouter_tick(dx_router_t *router) PyObject *pArgs; PyObject *pValue; - pArgs = PyTuple_New(0); - pValue = PyObject_CallObject(router->pyTick, pArgs); - if (PyErr_Occurred()) { - PyErr_Print(); - } - Py_DECREF(pArgs); - if (pValue) { - Py_DECREF(pValue); + if (router->pyTick) { + pArgs = PyTuple_New(0); + pValue = PyObject_CallObject(router->pyTick, pArgs); + if (PyErr_Occurred()) { + PyErr_Print(); + } + Py_DECREF(pArgs); + if (pValue) { + Py_DECREF(pValue); + } } } diff --git a/qpid/extras/dispatch/tests/CMakeLists.txt b/qpid/extras/dispatch/tests/CMakeLists.txt index 9523ab9ad4..b70972b37b 100644 --- a/qpid/extras/dispatch/tests/CMakeLists.txt +++ b/qpid/extras/dispatch/tests/CMakeLists.txt @@ -51,3 +51,8 @@ add_test(unit_tests_size_2 unit_tests_size 2) add_test(unit_tests_size_1 unit_tests_size 1) add_test(unit_tests unit_tests ${CMAKE_CURRENT_SOURCE_DIR}/threads4.conf) add_test(router_tests python ${CMAKE_CURRENT_SOURCE_DIR}/router_engine_test.py -v) +add_test(system_tests_single python ${CMAKE_CURRENT_SOURCE_DIR}/system_tests_one_router.py -v) + +set_property(TEST system_tests_single PROPERTY + ENVIRONMENT "CTEST_SOURCE_DIR=${CMAKE_CURRENT_SOURCE_DIR}" + ) diff --git a/qpid/extras/dispatch/tests/onerouter.conf b/qpid/extras/dispatch/tests/onerouter.conf new file mode 100644 index 0000000000..308d86473c --- /dev/null +++ b/qpid/extras/dispatch/tests/onerouter.conf @@ -0,0 +1,54 @@ +## +## Licensed to the Apache Software Foundation (ASF) under one +## or more contributor license agreements. See the NOTICE file +## distributed with this work for additional information +## regarding copyright ownership. The ASF licenses this file +## to you under the Apache License, Version 2.0 (the +## "License"); you may not use this file except in compliance +## with the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, +## software distributed under the License is distributed on an +## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +## KIND, either express or implied. See the License for the +## specific language governing permissions and limitations +## under the License +## + + +## +## Container section - Configures the general operation of the AMQP container. +## +container { + ## + ## worker-threads - The number of threads that will be created to + ## process message traffic and other application work (timers, non-amqp + ## file descriptors, etc.) + ## + ## The number of threads should be related to the number of available + ## processor cores. To fully utilize a quad-core system, set the + ## number of threads to 4. + ## + worker-threads: 4 + + ## + ## container-name - The name of the AMQP container. If not specified, + ## the container name will be set to a value of the container's + ## choosing. The automatically assigned container name is not + ## guaranteed to be persistent across restarts of the container. + ## + container-name: Qpid.Dispatch.Router.A +} + + +## +## Listeners and Connectors +## +listener { + addr: 0.0.0.0 + port: 20000 + sasl-mechanisms: ANONYMOUS +} + diff --git a/qpid/extras/dispatch/tests/system_tests_one_router.py b/qpid/extras/dispatch/tests/system_tests_one_router.py new file mode 100644 index 0000000000..bb9f757cfa --- /dev/null +++ b/qpid/extras/dispatch/tests/system_tests_one_router.py @@ -0,0 +1,191 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import time +import unittest +import subprocess +from proton import Messenger, Message, PENDING, ACCEPTED, REJECTED + +class RouterTest(unittest.TestCase): + + def setUp(self): + if 'CTEST_SOURCE_DIR' not in os.environ: + raise Exception("Environment variable 'CTEST_SOURCE_DIR' not set") + srcdir = os.environ['CTEST_SOURCE_DIR'] + self.router = subprocess.Popen(['../router/dispatch-router', '-c', '%s/onerouter.conf' % srcdir], + stderr=subprocess.PIPE, stdout=subprocess.PIPE) + time.sleep(1) + + def tearDown(self): + self.router.terminate() + self.router.wait() + + def subscribe(self, messenger, address): + messenger.subscribe(address) + while messenger.work(100): + pass + + def test_0_discard(self): + addr = "amqp://0.0.0.0:20000/discard/1" + M1 = Messenger() + M1.timeout = 1000 + M1.start() + tm = Message() + tm.address = addr + for i in range(100): + tm.body = {'number': i} + M1.put(tm) + M1.send() + M1.stop() + + + def test_1_pre_settled(self): + addr = "amqp://0.0.0.0:20000/pre_settled/1" + M1 = Messenger() + M2 = Messenger() + + M1.timeout = 1000 + M2.timeout = 1000 + + M1.start() + M2.start() + self.subscribe(M2, addr) + + tm = Message() + rm = Message() + + tm.address = addr + for i in range(100): + tm.body = {'number': i} + M1.put(tm) + M1.send() + + for i in range(100): + M2.recv(1) + M2.get(rm) + self.assertEqual(i, rm.body['number']) + + M1.stop() + M2.stop() + + + def test_2_multicast(self): + addr = "amqp://0.0.0.0:20000/pre_settled/multicast/1" + M1 = Messenger() + M2 = Messenger() + M3 = Messenger() + M4 = Messenger() + + M1.timeout = 1000 + M2.timeout = 1000 + M3.timeout = 1000 + M4.timeout = 1000 + + M1.start() + M2.start() + M3.start() + M4.start() + self.subscribe(M2, addr) + self.subscribe(M3, addr) + self.subscribe(M4, addr) + + tm = Message() + rm = Message() + + tm.address = addr + for i in range(100): + tm.body = {'number': i} + M1.put(tm) + M1.send() + + for i in range(100): + M2.recv(1) + M2.get(rm) + self.assertEqual(i, rm.body['number']) + + M3.recv(1) + M3.get(rm) + self.assertEqual(i, rm.body['number']) + + M4.recv(1) + M4.get(rm) + self.assertEqual(i, rm.body['number']) + + M1.stop() + M2.stop() + M3.stop() + M4.stop() + + + def test_3_propagated_disposition(self): + addr = "amqp://0.0.0.0:20000/unsettled/1" + M1 = Messenger() + M2 = Messenger() + + M1.timeout = 1000 + M2.timeout = 1000 + M1.outgoing_window = 5 + M2.incoming_window = 5 + + M1.start() + M2.start() + self.subscribe(M2, addr) + + tm = Message() + rm = Message() + + tm.address = addr + tm.body = {'number': 0} + tx_tracker = M1.put(tm) + M1.send(0) + + M2.recv(1) + rx_tracker = M2.get(rm) + self.assertEqual(0, rm.body['number']) + self.assertEqual(PENDING, M1.status(tx_tracker)) + + M2.accept(rx_tracker) + M2.settle(rx_tracker) + + while M2.work(100): + pass + while M1.work(100): + pass + + self.assertEqual(ACCEPTED, M1.status(tx_tracker)) + + M1.stop() + M2.stop() + + +# def test_4_unsettled_undeliverable(self): +# pass + +# def test_4_three_ack(self): +# pass + +# def test_5_link_route_sender(self): +# pass + +# def test_6_link_route_receiver(self): +# pass + +if __name__ == '__main__': + unittest.main() |
