diff options
| author | Gordon Sim <gsim@apache.org> | 2009-01-20 13:30:08 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-01-20 13:30:08 +0000 |
| commit | afefc741a9ad4c6299a47805a45a1c81a048e0a2 (patch) | |
| tree | 70120255a090b5def48b4f5c72d2c1004841772d /cpp/src/tests | |
| parent | 1d5e6b196da4ba618ebc91054ee77e6c3c005333 (diff) | |
| download | qpid-python-afefc741a9ad4c6299a47805a45a1c81a048e0a2.tar.gz | |
QPID-1567: added 'exactly-once' guarantee to asynchronous replication of queue state
* altered replication protocol to detect and eliminate duplicates
* added support for acknowledged transfer over inter-broker bridges
* added option to qpid-route to control this
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736018 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
| -rw-r--r-- | cpp/src/tests/Makefile.am | 5 | ||||
| -rw-r--r-- | cpp/src/tests/ProxyTest.cpp | 52 | ||||
| -rw-r--r-- | cpp/src/tests/QueueEvents.cpp | 4 | ||||
| -rwxr-xr-x | cpp/src/tests/federation.py | 23 | ||||
| -rwxr-xr-x | cpp/src/tests/reliable_replication_test | 98 | ||||
| -rwxr-xr-x | cpp/src/tests/replication_test | 19 |
6 files changed, 178 insertions, 23 deletions
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 314b90ba8b..ff2ee060c2 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -89,7 +89,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ ManagementTest.cpp \ MessageReplayTracker.cpp \ ConsoleTest.cpp \ - QueueEvents.cpp + QueueEvents.cpp \ + ProxyTest.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp @@ -249,7 +250,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers) # Longer running stability tests, not run by default check: target. # Not run under valgrind, too slow -LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak +LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak reliable_replication_test EXTRA_DIST+=$(LONG_TESTS) run_perftest check-long: $(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND= diff --git a/cpp/src/tests/ProxyTest.cpp b/cpp/src/tests/ProxyTest.cpp new file mode 100644 index 0000000000..9007f3dc97 --- /dev/null +++ b/cpp/src/tests/ProxyTest.cpp @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/ExecutionSyncBody.h" +#include "qpid/framing/Proxy.h" +#include <alloca.h> + +#include "unit_test.h" + +using namespace qpid::framing; + +QPID_AUTO_TEST_SUITE(ProxyTestSuite) + + +QPID_AUTO_TEST_CASE(testScopedSync) +{ + struct DummyHandler : FrameHandler + { + void handle(AMQFrame& f) { + AMQMethodBody* m = f.getMethod(); + BOOST_CHECK(m); + BOOST_CHECK(m->isA<ExecutionSyncBody>()); + BOOST_CHECK(m->isSync()); + } + }; + DummyHandler f; + Proxy p(f); + Proxy::ScopedSync s(p); + p.send(ExecutionSyncBody(p.getVersion())); +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/QueueEvents.cpp b/cpp/src/tests/QueueEvents.cpp index 7aea23922d..f6b76b69ba 100644 --- a/cpp/src/tests/QueueEvents.cpp +++ b/cpp/src/tests/QueueEvents.cpp @@ -168,7 +168,7 @@ QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing) BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str()); } fixture.connection.close(); - fixture.shutdownBroker(); + fixture.broker->getQueueEvents().shutdown(); //check listener was notified of all events, and in correct order SequenceNumber enqueueId(1); @@ -215,7 +215,7 @@ QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly) BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str()); } fixture.connection.close(); - fixture.shutdownBroker(); + fixture.broker->getQueueEvents().shutdown(); //check listener was notified of all events, and in correct order SequenceNumber enqueueId(1); diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index ad82964007..9b0be8f979 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -64,7 +64,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False) + result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -88,7 +88,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False) + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -135,7 +135,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False) + result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -195,7 +195,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False) + result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -244,8 +244,8 @@ class FederationTests(TestBase010): l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0] r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0] - l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False) - r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False) + l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0) + r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0) self.assertEqual(l_res.status, 0) self.assertEqual(r_res.status, 0) @@ -296,7 +296,7 @@ class FederationTests(TestBase010): link = qmf.getObjects(_class="link")[0] result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id", - "exclude-me,also-exclude-me", False, False, False) + "exclude-me,also-exclude-me", False, False, False, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] @@ -354,7 +354,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True) + result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] sleep(5) @@ -401,7 +401,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True) + result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] sleep(5) @@ -448,7 +448,7 @@ class FederationTests(TestBase010): self.assertEqual(result.status, 0) link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True) + result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0) self.assertEqual(result.status, 0) bridge = qmf.getObjects(_class="bridge")[0] sleep(5) @@ -478,8 +478,7 @@ class FederationTests(TestBase010): sleep(3) self.assertEqual(len(qmf.getObjects(_class="bridge")), 0) self.assertEqual(len(qmf.getObjects(_class="link")), 0) - - + def getProperty(self, msg, name): for h in msg.headers: if hasattr(h, name): return getattr(h, name) diff --git a/cpp/src/tests/reliable_replication_test b/cpp/src/tests/reliable_replication_test new file mode 100755 index 0000000000..1a2fa917b3 --- /dev/null +++ b/cpp/src/tests/reliable_replication_test @@ -0,0 +1,98 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Test reliability of the replication feature in the face of link +# failures: +MY_DIR=`dirname \`which $0\`` +PYTHON_DIR=${MY_DIR}/../../../python + +trap stop_brokers EXIT + +stop_brokers() { + if [[ $BROKER_A ]] ; then + ../qpidd -q --port $BROKER_A + unset BROKER_A + fi + if [[ $BROKER_B ]] ; then + ../qpidd -q --port $BROKER_B + unset BROKER_B + fi +} + +setup() { + rm -f replication-source.log replication-dest.log + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable trace+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port + BROKER_A=`cat qpidd.port` + + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port + BROKER_B=`cat qpidd.port` + + #../qpidd --port 5555 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable trace+ --log-to-file replication-source.log --log-to-stderr 0 & + #BROKER_A=5555 + + #../qpidd --port 6666 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 & + #BROKER_B=6666 + echo "Testing replication from port $BROKER_A to port $BROKER_B" + export PYTHONPATH=$PYTHON_DIR + + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication + $PYTHON_DIR/commands/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication + + #create test queue (only replicate enqueues for this test): + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-a --generate-queue-events 1 + $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a +} + +send() { + ./sender --port $BROKER_A --routing-key queue-a --send-eos 1 < replicated.expected +} + +receive() { + rm -f replicated.actual + ./receiver --port $BROKER_B --queue queue-a > replicated.actual +} + +bounce_link() { + echo "Destroying link..." + $PYTHON_DIR/commands/qpid-route link del "localhost:$BROKER_B" "localhost:$BROKER_A" + echo "Link destroyed; recreating route..." + sleep 2 + $PYTHON_DIR/commands/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication + echo "Route re-established" +} + +if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ; then + setup + for i in `seq 1 100000`; do echo Message $i; done > replicated.expected + send & + receive & + for i in `seq 1 5`; do sleep 10; bounce_link; done; + wait + #check that received list is identical to sent list + diff replicated.actual replicated.expected || FAIL=1 + if [[ $FAIL ]]; then + echo reliable replication test failed: expectations not met! + else + echo replication reliable in the face of link failures + rm -f replication.actual replication.expected replication-source.log replication-dest.log + fi +fi + diff --git a/cpp/src/tests/replication_test b/cpp/src/tests/replication_test index 931078c047..9b6e5cfb29 100755 --- a/cpp/src/tests/replication_test +++ b/cpp/src/tests/replication_test @@ -19,7 +19,7 @@ # under the License. # -# Run the federation tests. +# Run a test of the replication feature MY_DIR=`dirname \`which $0\`` PYTHON_DIR=${MY_DIR}/../../../python @@ -37,15 +37,18 @@ stop_brokers() { } if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ; then - ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true > qpidd.port + rm -f queue-*.repl replication-*.log #cleanup from any earlier runs + + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable info+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port BROKER_A=`cat qpidd.port` - ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so > qpidd.port + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port BROKER_B=`cat qpidd.port` export PYTHONPATH=$PYTHON_DIR + echo "Running replication test between localhost:$BROKER_A and localhost:$BROKER_B" $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication - $PYTHON_DIR/commands/qpid-route queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication + $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication #create test queues @@ -58,7 +61,6 @@ if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c #publish and consume from test queus on broker A: - rm -f queue-*.repl for i in `seq 1 10`; do echo Message $i for A >> queue-a-input.repl; done for i in `seq 1 20`; do echo Message $i for B >> queue-b-input.repl; done for i in `seq 1 15`; do echo Message $i for C >> queue-c-input.repl; done @@ -79,6 +81,9 @@ if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ./receiver --port $BROKER_B --queue queue-a > queue-a-backup.repl ./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl ./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl + + stop_brokers + tail -5 queue-a-input.repl > queue-a-expected.repl tail -10 queue-b-input.repl > queue-b-expected.repl diff queue-a-backup.repl queue-a-expected.repl || FAIL=1 @@ -87,12 +92,12 @@ if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e if [[ $FAIL ]]; then echo replication test failed: expectations not met! + exit 1 else echo queue state replicated as expected - rm queue-*.repl + rm -f queue-*.repl replication-*.log fi - stop_brokers else echo "Skipping replication test, plugins not built or python utils not located" fi |
