summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-08-08 12:17:23 +0000
committerTed Ross <tross@apache.org>2013-08-08 12:17:23 +0000
commitbf9cb6739b8f49a866a3770df587361374ef8d04 (patch)
tree12d9e4a233cdfb0b21e45ea21773e6aabbaf47da /qpid/extras/dispatch
parentccd770b8d4cb231ad43a55fd83dbf577f67b4aec (diff)
downloadqpid-python-bf9cb6739b8f49a866a3770df587361374ef8d04.tar.gz
QPID-5045 - Added system tests for the routing scenarios, fixed discovered defects.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1511737 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch')
-rw-r--r--qpid/extras/dispatch/src/container.c6
-rw-r--r--qpid/extras/dispatch/src/router_node.c26
-rw-r--r--qpid/extras/dispatch/tests/CMakeLists.txt5
-rw-r--r--qpid/extras/dispatch/tests/onerouter.conf54
-rw-r--r--qpid/extras/dispatch/tests/system_tests_one_router.py191
5 files changed, 268 insertions, 14 deletions
diff --git a/qpid/extras/dispatch/src/container.c b/qpid/extras/dispatch/src/container.c
index 4a38cfc69c..d4766f0432 100644
--- a/qpid/extras/dispatch/src/container.c
+++ b/qpid/extras/dispatch/src/container.c
@@ -326,14 +326,12 @@ static int process_handler(dx_container_t *container, void* unused, pn_connectio
//
// Step 2.5: Traverse all of the links on the connection looking for
- // outgoing links with non-zero credit. Call the attached node's
- // writable handler for such links.
+ // links. Call the attached node's writable handler for such links.
//
pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
while (pn_link) {
assert(pn_session_connection(pn_link_session(pn_link)) == conn);
- if (pn_link_is_sender(pn_link))
- event_count += do_writable(pn_link);
+ event_count += do_writable(pn_link);
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index 1c5e7c266c..0bab382b52 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -228,10 +228,14 @@ static int router_writable_link_handler(void* context, dx_link_t *link)
DEQ_REMOVE_HEAD(events);
if (re->delivery) {
- if (re->disposition)
+ if (re->disposition) {
pn_delivery_update(dx_delivery_pn(re->delivery), re->disposition);
- if (re->settle)
+ event_count++;
+ }
+ if (re->settle) {
dx_delivery_free(re->delivery, 0);
+ event_count++;
+ }
}
free_dx_routed_event_t(re);
@@ -1096,14 +1100,16 @@ static void dx_pyrouter_tick(dx_router_t *router)
PyObject *pArgs;
PyObject *pValue;
- pArgs = PyTuple_New(0);
- pValue = PyObject_CallObject(router->pyTick, pArgs);
- if (PyErr_Occurred()) {
- PyErr_Print();
- }
- Py_DECREF(pArgs);
- if (pValue) {
- Py_DECREF(pValue);
+ if (router->pyTick) {
+ pArgs = PyTuple_New(0);
+ pValue = PyObject_CallObject(router->pyTick, pArgs);
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ }
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
}
}
diff --git a/qpid/extras/dispatch/tests/CMakeLists.txt b/qpid/extras/dispatch/tests/CMakeLists.txt
index 9523ab9ad4..b70972b37b 100644
--- a/qpid/extras/dispatch/tests/CMakeLists.txt
+++ b/qpid/extras/dispatch/tests/CMakeLists.txt
@@ -51,3 +51,8 @@ add_test(unit_tests_size_2 unit_tests_size 2)
add_test(unit_tests_size_1 unit_tests_size 1)
add_test(unit_tests unit_tests ${CMAKE_CURRENT_SOURCE_DIR}/threads4.conf)
add_test(router_tests python ${CMAKE_CURRENT_SOURCE_DIR}/router_engine_test.py -v)
+add_test(system_tests_single python ${CMAKE_CURRENT_SOURCE_DIR}/system_tests_one_router.py -v)
+
+set_property(TEST system_tests_single PROPERTY
+ ENVIRONMENT "CTEST_SOURCE_DIR=${CMAKE_CURRENT_SOURCE_DIR}"
+ )
diff --git a/qpid/extras/dispatch/tests/onerouter.conf b/qpid/extras/dispatch/tests/onerouter.conf
new file mode 100644
index 0000000000..308d86473c
--- /dev/null
+++ b/qpid/extras/dispatch/tests/onerouter.conf
@@ -0,0 +1,54 @@
+##
+## Licensed to the Apache Software Foundation (ASF) under one
+## or more contributor license agreements. See the NOTICE file
+## distributed with this work for additional information
+## regarding copyright ownership. The ASF licenses this file
+## to you under the Apache License, Version 2.0 (the
+## "License"); you may not use this file except in compliance
+## with the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing,
+## software distributed under the License is distributed on an
+## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+## KIND, either express or implied. See the License for the
+## specific language governing permissions and limitations
+## under the License
+##
+
+
+##
+## Container section - Configures the general operation of the AMQP container.
+##
+container {
+ ##
+ ## worker-threads - The number of threads that will be created to
+ ## process message traffic and other application work (timers, non-amqp
+ ## file descriptors, etc.)
+ ##
+ ## The number of threads should be related to the number of available
+ ## processor cores. To fully utilize a quad-core system, set the
+ ## number of threads to 4.
+ ##
+ worker-threads: 4
+
+ ##
+ ## container-name - The name of the AMQP container. If not specified,
+ ## the container name will be set to a value of the container's
+ ## choosing. The automatically assigned container name is not
+ ## guaranteed to be persistent across restarts of the container.
+ ##
+ container-name: Qpid.Dispatch.Router.A
+}
+
+
+##
+## Listeners and Connectors
+##
+listener {
+ addr: 0.0.0.0
+ port: 20000
+ sasl-mechanisms: ANONYMOUS
+}
+
diff --git a/qpid/extras/dispatch/tests/system_tests_one_router.py b/qpid/extras/dispatch/tests/system_tests_one_router.py
new file mode 100644
index 0000000000..bb9f757cfa
--- /dev/null
+++ b/qpid/extras/dispatch/tests/system_tests_one_router.py
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import time
+import unittest
+import subprocess
+from proton import Messenger, Message, PENDING, ACCEPTED, REJECTED
+
+class RouterTest(unittest.TestCase):
+
+ def setUp(self):
+ if 'CTEST_SOURCE_DIR' not in os.environ:
+ raise Exception("Environment variable 'CTEST_SOURCE_DIR' not set")
+ srcdir = os.environ['CTEST_SOURCE_DIR']
+ self.router = subprocess.Popen(['../router/dispatch-router', '-c', '%s/onerouter.conf' % srcdir],
+ stderr=subprocess.PIPE, stdout=subprocess.PIPE)
+ time.sleep(1)
+
+ def tearDown(self):
+ self.router.terminate()
+ self.router.wait()
+
+ def subscribe(self, messenger, address):
+ messenger.subscribe(address)
+ while messenger.work(100):
+ pass
+
+ def test_0_discard(self):
+ addr = "amqp://0.0.0.0:20000/discard/1"
+ M1 = Messenger()
+ M1.timeout = 1000
+ M1.start()
+ tm = Message()
+ tm.address = addr
+ for i in range(100):
+ tm.body = {'number': i}
+ M1.put(tm)
+ M1.send()
+ M1.stop()
+
+
+ def test_1_pre_settled(self):
+ addr = "amqp://0.0.0.0:20000/pre_settled/1"
+ M1 = Messenger()
+ M2 = Messenger()
+
+ M1.timeout = 1000
+ M2.timeout = 1000
+
+ M1.start()
+ M2.start()
+ self.subscribe(M2, addr)
+
+ tm = Message()
+ rm = Message()
+
+ tm.address = addr
+ for i in range(100):
+ tm.body = {'number': i}
+ M1.put(tm)
+ M1.send()
+
+ for i in range(100):
+ M2.recv(1)
+ M2.get(rm)
+ self.assertEqual(i, rm.body['number'])
+
+ M1.stop()
+ M2.stop()
+
+
+ def test_2_multicast(self):
+ addr = "amqp://0.0.0.0:20000/pre_settled/multicast/1"
+ M1 = Messenger()
+ M2 = Messenger()
+ M3 = Messenger()
+ M4 = Messenger()
+
+ M1.timeout = 1000
+ M2.timeout = 1000
+ M3.timeout = 1000
+ M4.timeout = 1000
+
+ M1.start()
+ M2.start()
+ M3.start()
+ M4.start()
+ self.subscribe(M2, addr)
+ self.subscribe(M3, addr)
+ self.subscribe(M4, addr)
+
+ tm = Message()
+ rm = Message()
+
+ tm.address = addr
+ for i in range(100):
+ tm.body = {'number': i}
+ M1.put(tm)
+ M1.send()
+
+ for i in range(100):
+ M2.recv(1)
+ M2.get(rm)
+ self.assertEqual(i, rm.body['number'])
+
+ M3.recv(1)
+ M3.get(rm)
+ self.assertEqual(i, rm.body['number'])
+
+ M4.recv(1)
+ M4.get(rm)
+ self.assertEqual(i, rm.body['number'])
+
+ M1.stop()
+ M2.stop()
+ M3.stop()
+ M4.stop()
+
+
+ def test_3_propagated_disposition(self):
+ addr = "amqp://0.0.0.0:20000/unsettled/1"
+ M1 = Messenger()
+ M2 = Messenger()
+
+ M1.timeout = 1000
+ M2.timeout = 1000
+ M1.outgoing_window = 5
+ M2.incoming_window = 5
+
+ M1.start()
+ M2.start()
+ self.subscribe(M2, addr)
+
+ tm = Message()
+ rm = Message()
+
+ tm.address = addr
+ tm.body = {'number': 0}
+ tx_tracker = M1.put(tm)
+ M1.send(0)
+
+ M2.recv(1)
+ rx_tracker = M2.get(rm)
+ self.assertEqual(0, rm.body['number'])
+ self.assertEqual(PENDING, M1.status(tx_tracker))
+
+ M2.accept(rx_tracker)
+ M2.settle(rx_tracker)
+
+ while M2.work(100):
+ pass
+ while M1.work(100):
+ pass
+
+ self.assertEqual(ACCEPTED, M1.status(tx_tracker))
+
+ M1.stop()
+ M2.stop()
+
+
+# def test_4_unsettled_undeliverable(self):
+# pass
+
+# def test_4_three_ack(self):
+# pass
+
+# def test_5_link_route_sender(self):
+# pass
+
+# def test_6_link_route_receiver(self):
+# pass
+
+if __name__ == '__main__':
+ unittest.main()