summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-01-20 13:30:08 +0000
committerGordon Sim <gsim@apache.org>2009-01-20 13:30:08 +0000
commitafefc741a9ad4c6299a47805a45a1c81a048e0a2 (patch)
tree70120255a090b5def48b4f5c72d2c1004841772d /cpp/src/tests
parent1d5e6b196da4ba618ebc91054ee77e6c3c005333 (diff)
downloadqpid-python-afefc741a9ad4c6299a47805a45a1c81a048e0a2.tar.gz
QPID-1567: added 'exactly-once' guarantee to asynchronous replication of queue state
* altered replication protocol to detect and eliminate duplicates * added support for acknowledged transfer over inter-broker bridges * added option to qpid-route to control this git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736018 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/Makefile.am5
-rw-r--r--cpp/src/tests/ProxyTest.cpp52
-rw-r--r--cpp/src/tests/QueueEvents.cpp4
-rwxr-xr-xcpp/src/tests/federation.py23
-rwxr-xr-xcpp/src/tests/reliable_replication_test98
-rwxr-xr-xcpp/src/tests/replication_test19
6 files changed, 178 insertions, 23 deletions
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 314b90ba8b..ff2ee060c2 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -89,7 +89,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
ManagementTest.cpp \
MessageReplayTracker.cpp \
ConsoleTest.cpp \
- QueueEvents.cpp
+ QueueEvents.cpp \
+ ProxyTest.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -249,7 +250,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test $(unit_wrappers)
# Longer running stability tests, not run by default check: target.
# Not run under valgrind, too slow
-LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak
+LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak reliable_replication_test
EXTRA_DIST+=$(LONG_TESTS) run_perftest
check-long:
$(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=
diff --git a/cpp/src/tests/ProxyTest.cpp b/cpp/src/tests/ProxyTest.cpp
new file mode 100644
index 0000000000..9007f3dc97
--- /dev/null
+++ b/cpp/src/tests/ProxyTest.cpp
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/ExecutionSyncBody.h"
+#include "qpid/framing/Proxy.h"
+#include <alloca.h>
+
+#include "unit_test.h"
+
+using namespace qpid::framing;
+
+QPID_AUTO_TEST_SUITE(ProxyTestSuite)
+
+
+QPID_AUTO_TEST_CASE(testScopedSync)
+{
+ struct DummyHandler : FrameHandler
+ {
+ void handle(AMQFrame& f) {
+ AMQMethodBody* m = f.getMethod();
+ BOOST_CHECK(m);
+ BOOST_CHECK(m->isA<ExecutionSyncBody>());
+ BOOST_CHECK(m->isSync());
+ }
+ };
+ DummyHandler f;
+ Proxy p(f);
+ Proxy::ScopedSync s(p);
+ p.send(ExecutionSyncBody(p.getVersion()));
+}
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/QueueEvents.cpp b/cpp/src/tests/QueueEvents.cpp
index 7aea23922d..f6b76b69ba 100644
--- a/cpp/src/tests/QueueEvents.cpp
+++ b/cpp/src/tests/QueueEvents.cpp
@@ -168,7 +168,7 @@ QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing)
BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
}
fixture.connection.close();
- fixture.shutdownBroker();
+ fixture.broker->getQueueEvents().shutdown();
//check listener was notified of all events, and in correct order
SequenceNumber enqueueId(1);
@@ -215,7 +215,7 @@ QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly)
BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
}
fixture.connection.close();
- fixture.shutdownBroker();
+ fixture.broker->getQueueEvents().shutdown();
//check listener was notified of all events, and in correct order
SequenceNumber enqueueId(1);
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index ad82964007..9b0be8f979 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -64,7 +64,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False)
+ result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -88,7 +88,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False)
+ result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -135,7 +135,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False)
+ result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -195,7 +195,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False)
+ result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -244,8 +244,8 @@ class FederationTests(TestBase010):
l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0]
r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0]
- l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False)
- r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False)
+ l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0)
+ r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0)
self.assertEqual(l_res.status, 0)
self.assertEqual(r_res.status, 0)
@@ -296,7 +296,7 @@ class FederationTests(TestBase010):
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id",
- "exclude-me,also-exclude-me", False, False, False)
+ "exclude-me,also-exclude-me", False, False, False, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -354,7 +354,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True)
+ result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -401,7 +401,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True)
+ result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -448,7 +448,7 @@ class FederationTests(TestBase010):
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True)
+ result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -478,8 +478,7 @@ class FederationTests(TestBase010):
sleep(3)
self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
self.assertEqual(len(qmf.getObjects(_class="link")), 0)
-
-
+
def getProperty(self, msg, name):
for h in msg.headers:
if hasattr(h, name): return getattr(h, name)
diff --git a/cpp/src/tests/reliable_replication_test b/cpp/src/tests/reliable_replication_test
new file mode 100755
index 0000000000..1a2fa917b3
--- /dev/null
+++ b/cpp/src/tests/reliable_replication_test
@@ -0,0 +1,98 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Test reliability of the replication feature in the face of link
+# failures:
+MY_DIR=`dirname \`which $0\``
+PYTHON_DIR=${MY_DIR}/../../../python
+
+trap stop_brokers EXIT
+
+stop_brokers() {
+ if [[ $BROKER_A ]] ; then
+ ../qpidd -q --port $BROKER_A
+ unset BROKER_A
+ fi
+ if [[ $BROKER_B ]] ; then
+ ../qpidd -q --port $BROKER_B
+ unset BROKER_B
+ fi
+}
+
+setup() {
+ rm -f replication-source.log replication-dest.log
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable trace+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port
+ BROKER_A=`cat qpidd.port`
+
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port
+ BROKER_B=`cat qpidd.port`
+
+ #../qpidd --port 5555 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable trace+ --log-to-file replication-source.log --log-to-stderr 0 &
+ #BROKER_A=5555
+
+ #../qpidd --port 6666 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 &
+ #BROKER_B=6666
+ echo "Testing replication from port $BROKER_A to port $BROKER_B"
+ export PYTHONPATH=$PYTHON_DIR
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
+ $PYTHON_DIR/commands/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+
+ #create test queue (only replicate enqueues for this test):
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-a --generate-queue-events 1
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a
+}
+
+send() {
+ ./sender --port $BROKER_A --routing-key queue-a --send-eos 1 < replicated.expected
+}
+
+receive() {
+ rm -f replicated.actual
+ ./receiver --port $BROKER_B --queue queue-a > replicated.actual
+}
+
+bounce_link() {
+ echo "Destroying link..."
+ $PYTHON_DIR/commands/qpid-route link del "localhost:$BROKER_B" "localhost:$BROKER_A"
+ echo "Link destroyed; recreating route..."
+ sleep 2
+ $PYTHON_DIR/commands/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+ echo "Route re-established"
+}
+
+if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ; then
+ setup
+ for i in `seq 1 100000`; do echo Message $i; done > replicated.expected
+ send &
+ receive &
+ for i in `seq 1 5`; do sleep 10; bounce_link; done;
+ wait
+ #check that received list is identical to sent list
+ diff replicated.actual replicated.expected || FAIL=1
+ if [[ $FAIL ]]; then
+ echo reliable replication test failed: expectations not met!
+ else
+ echo replication reliable in the face of link failures
+ rm -f replication.actual replication.expected replication-source.log replication-dest.log
+ fi
+fi
+
diff --git a/cpp/src/tests/replication_test b/cpp/src/tests/replication_test
index 931078c047..9b6e5cfb29 100755
--- a/cpp/src/tests/replication_test
+++ b/cpp/src/tests/replication_test
@@ -19,7 +19,7 @@
# under the License.
#
-# Run the federation tests.
+# Run a test of the replication feature
MY_DIR=`dirname \`which $0\``
PYTHON_DIR=${MY_DIR}/../../../python
@@ -37,15 +37,18 @@ stop_brokers() {
}
if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ; then
- ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true > qpidd.port
+ rm -f queue-*.repl replication-*.log #cleanup from any earlier runs
+
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable info+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port
BROKER_A=`cat qpidd.port`
- ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so > qpidd.port
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port
BROKER_B=`cat qpidd.port`
export PYTHONPATH=$PYTHON_DIR
+ echo "Running replication test between localhost:$BROKER_A and localhost:$BROKER_B"
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
- $PYTHON_DIR/commands/qpid-route queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+ $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
#create test queues
@@ -58,7 +61,6 @@ if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c
#publish and consume from test queus on broker A:
- rm -f queue-*.repl
for i in `seq 1 10`; do echo Message $i for A >> queue-a-input.repl; done
for i in `seq 1 20`; do echo Message $i for B >> queue-b-input.repl; done
for i in `seq 1 15`; do echo Message $i for C >> queue-c-input.repl; done
@@ -79,6 +81,9 @@ if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e
./receiver --port $BROKER_B --queue queue-a > queue-a-backup.repl
./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl
./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl
+
+ stop_brokers
+
tail -5 queue-a-input.repl > queue-a-expected.repl
tail -10 queue-b-input.repl > queue-b-expected.repl
diff queue-a-backup.repl queue-a-expected.repl || FAIL=1
@@ -87,12 +92,12 @@ if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e
if [[ $FAIL ]]; then
echo replication test failed: expectations not met!
+ exit 1
else
echo queue state replicated as expected
- rm queue-*.repl
+ rm -f queue-*.repl replication-*.log
fi
- stop_brokers
else
echo "Skipping replication test, plugins not built or python utils not located"
fi