summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-10-25 18:00:34 +0000
committerAlan Conway <aconway@apache.org>2010-10-25 18:00:34 +0000
commitf27a733f9a4cca3ad2a42acb35ab4620a47e320d (patch)
treea4d7d7a34a6cf42e1241e998f7da012ee37b109f /cpp/src/tests
parent2c422462dc717e667c13aa74bbc552c8507e3f83 (diff)
downloadqpid-python-f27a733f9a4cca3ad2a42acb35ab4620a47e320d.tar.gz
New cluster: core framework and initial implementation of enqueue logic.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1027210 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/BrokerClusterCalls.cpp48
-rwxr-xr-xcpp/src/tests/cluster2_tests.py66
-rwxr-xr-xcpp/src/tests/run_cluster_tests2
-rw-r--r--cpp/src/tests/test_env.sh.in1
4 files changed, 93 insertions, 24 deletions
diff --git a/cpp/src/tests/BrokerClusterCalls.cpp b/cpp/src/tests/BrokerClusterCalls.cpp
index 6cdd6fc9bf..f659702387 100644
--- a/cpp/src/tests/BrokerClusterCalls.cpp
+++ b/cpp/src/tests/BrokerClusterCalls.cpp
@@ -42,6 +42,7 @@ using namespace boost;
using namespace boost::assign;
using namespace qpid::messaging;
using boost::format;
+using boost::intrusive_ptr;
namespace qpid {
namespace tests {
@@ -59,6 +60,9 @@ class DummyCluster : public broker::Cluster
history += (format("%s(%s, %d, %s)") % op % qm.queue->getName()
% qm.position % qm.payload->getFrames().getContent()).str();
}
+ void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) {
+ history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str();
+ }
void recordStr(const string& op, const string& name) {
history += (format("%s(%s)") % op % name).str();
}
@@ -70,7 +74,10 @@ class DummyCluster : public broker::Cluster
history += (format("routing(%s)") % m->getFrames().getContent()).str();
}
- virtual void enqueue(broker::QueuedMessage& qm) { recordQm("enqueue", qm); }
+ virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) {
+ recordMsg("enqueue", q, msg);
+ return true;
+ }
virtual void routed(const boost::intrusive_ptr<broker::Message>& m) {
history += (format("routed(%s)") % m->getFrames().getContent()).str();
@@ -91,9 +98,8 @@ class DummyCluster : public broker::Cluster
virtual void release(const broker::QueuedMessage& qm) {
if (!isRouting) recordQm("release", qm);
}
- virtual void dequeue(const broker::QueuedMessage& qm) {
- // Never ignore dequeue, used to avoid resource leaks.
- recordQm("dequeue", qm);
+ virtual void drop(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("dequeue", qm);
}
// Consumers
@@ -156,7 +162,7 @@ QPID_AUTO_TEST_CASE(testSimplePubSub) {
sender.send(Message("a"));
f.s.sync();
BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)");
BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
// Don't check size here as it is uncertain whether acquire has happened yet.
@@ -221,7 +227,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
f.s.reject(m);
BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)");
BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)");
BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)");
@@ -239,7 +245,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
bool received = receiver.fetch(m, Duration::IMMEDIATE);
BOOST_CHECK(!received); // Timed out
BOOST_CHECK_EQUAL(h.at(i++), "routing(t)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, t)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
BOOST_CHECK_EQUAL(h.size(), i);
@@ -252,7 +258,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
f.s.sync();
BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)");
BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)");
BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
BOOST_CHECK_EQUAL(h.size(), i);
@@ -261,11 +267,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
sender.send(m);
f.s.sync();
BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
- // FIXME: bug in Queue.cpp gives the incorrect position when
- // dequeueing a replaced LVQ message.
- // BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 2, a)"); // Should be 1
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)");
BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
BOOST_CHECK_EQUAL(h.size(), i);
@@ -345,20 +347,19 @@ QPID_AUTO_TEST_CASE(testRingQueue) {
BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)");
BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)");
BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)");
BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
BOOST_CHECK_EQUAL(h.at(i++), "routing(c)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 3, c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)");
BOOST_CHECK_EQUAL(h.at(i++), "routed(c)");
BOOST_CHECK_EQUAL(h.at(i++), "routing(d)");
- BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 4, d)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)");
BOOST_CHECK_EQUAL(h.at(i++), "routed(d)");
Receiver receiver = f.s.createReceiver("ring");
@@ -399,15 +400,16 @@ QPID_AUTO_TEST_CASE(testTransactions) {
BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit
ts.commit();
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)");
- BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, b)");
- BOOST_CHECK_EQUAL(h.size(), i);
-
// FIXME aconway 2010-10-18: As things stand the cluster is not
// compatible with transactions
- // - enqueues occur after routing is complete.
+ // - enqueues occur after routing is complete
+ // - no call to Cluster::enqueue, should be in Queue::process?
// - no transaction context associated with messages in the Cluster interface.
// - no call to Cluster::accept in Queue::dequeueCommitted
+ // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)");
+ // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
Receiver receiver = ts.createReceiver("q");
BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a");
diff --git a/cpp/src/tests/cluster2_tests.py b/cpp/src/tests/cluster2_tests.py
new file mode 100755
index 0000000000..e3a19ae2a0
--- /dev/null
+++ b/cpp/src/tests/cluster2_tests.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess
+from qpid import datatypes, messaging
+from qpid.brokertest import *
+from qpid.harness import Skipped
+from qpid.messaging import Message
+from qpid.messaging.exceptions import Empty
+from threading import Thread, Lock
+from logging import getLogger
+from itertools import chain
+
+log = getLogger("qpid.cluster_tests")
+
+class Cluster2Tests(BrokerTest):
+ """Tests for new cluster code."""
+
+ def test_message_enqueue(self):
+ """Test basic replication of enqueued messages."""
+
+ cluster = self.cluster(2, cluster2=True, args=["--log-enable=trace+:cluster"])
+
+ sn0 = cluster[0].connect().session()
+ r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+ r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+ s0 = sn0.sender("amq.fanout");
+
+ sn1 = cluster[1].connect().session()
+ r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+ r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+
+
+ # Send messages on member 0
+ content = ["a","b","c"]
+ for m in content: s0.send(Message(m))
+
+ # Browse on both members.
+ def check(content, receiver):
+ for c in content: self.assertEqual(c, receiver.fetch(1).content)
+ self.assertRaises(Empty, receiver.fetch, 0)
+
+ check(content, r0p)
+ check(content, r0q)
+ check(content, r1p)
+ check(content, r1q)
+
+ sn1.connection.close()
+ sn0.connection.close()
diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests
index e136d3810a..3971a39144 100755
--- a/cpp/src/tests/run_cluster_tests
+++ b/cpp/src/tests/run_cluster_tests
@@ -33,5 +33,5 @@ mkdir -p $OUTDIR
CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail}
CLUSTER_TESTS=${CLUSTER_TESTS:-$*}
-with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
+with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests -m cluster2_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
rm -rf $OUTDIR
diff --git a/cpp/src/tests/test_env.sh.in b/cpp/src/tests/test_env.sh.in
index b5c3b0fa3d..96fe6b64f4 100644
--- a/cpp/src/tests/test_env.sh.in
+++ b/cpp/src/tests/test_env.sh.in
@@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/test_store.so
exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
exportmodule ACL_LIB acl.so
exportmodule CLUSTER_LIB cluster.so
+exportmodule CLUSTER2_LIB cluster2.so
exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
exportmodule SSLCONNECTOR_LIB sslconnector.so