diff options
| author | Alan Conway <aconway@apache.org> | 2010-10-25 18:00:34 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-10-25 18:00:34 +0000 |
| commit | f27a733f9a4cca3ad2a42acb35ab4620a47e320d (patch) | |
| tree | a4d7d7a34a6cf42e1241e998f7da012ee37b109f /cpp/src/tests | |
| parent | 2c422462dc717e667c13aa74bbc552c8507e3f83 (diff) | |
| download | qpid-python-f27a733f9a4cca3ad2a42acb35ab4620a47e320d.tar.gz | |
New cluster: core framework and initial implementation of enqueue logic.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1027210 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
| -rw-r--r-- | cpp/src/tests/BrokerClusterCalls.cpp | 48 | ||||
| -rwxr-xr-x | cpp/src/tests/cluster2_tests.py | 66 | ||||
| -rwxr-xr-x | cpp/src/tests/run_cluster_tests | 2 | ||||
| -rw-r--r-- | cpp/src/tests/test_env.sh.in | 1 |
4 files changed, 93 insertions, 24 deletions
diff --git a/cpp/src/tests/BrokerClusterCalls.cpp b/cpp/src/tests/BrokerClusterCalls.cpp index 6cdd6fc9bf..f659702387 100644 --- a/cpp/src/tests/BrokerClusterCalls.cpp +++ b/cpp/src/tests/BrokerClusterCalls.cpp @@ -42,6 +42,7 @@ using namespace boost; using namespace boost::assign; using namespace qpid::messaging; using boost::format; +using boost::intrusive_ptr; namespace qpid { namespace tests { @@ -59,6 +60,9 @@ class DummyCluster : public broker::Cluster history += (format("%s(%s, %d, %s)") % op % qm.queue->getName() % qm.position % qm.payload->getFrames().getContent()).str(); } + void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) { + history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str(); + } void recordStr(const string& op, const string& name) { history += (format("%s(%s)") % op % name).str(); } @@ -70,7 +74,10 @@ class DummyCluster : public broker::Cluster history += (format("routing(%s)") % m->getFrames().getContent()).str(); } - virtual void enqueue(broker::QueuedMessage& qm) { recordQm("enqueue", qm); } + virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) { + recordMsg("enqueue", q, msg); + return true; + } virtual void routed(const boost::intrusive_ptr<broker::Message>& m) { history += (format("routed(%s)") % m->getFrames().getContent()).str(); @@ -91,9 +98,8 @@ class DummyCluster : public broker::Cluster virtual void release(const broker::QueuedMessage& qm) { if (!isRouting) recordQm("release", qm); } - virtual void dequeue(const broker::QueuedMessage& qm) { - // Never ignore dequeue, used to avoid resource leaks. - recordQm("dequeue", qm); + virtual void drop(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("dequeue", qm); } // Consumers @@ -156,7 +162,7 @@ QPID_AUTO_TEST_CASE(testSimplePubSub) { sender.send(Message("a")); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); // Don't check size here as it is uncertain whether acquire has happened yet. @@ -221,7 +227,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { f.s.reject(m); BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)"); @@ -239,7 +245,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { bool received = receiver.fetch(m, Duration::IMMEDIATE); BOOST_CHECK(!received); // Timed out BOOST_CHECK_EQUAL(h.at(i++), "routing(t)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, t)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); BOOST_CHECK_EQUAL(h.size(), i); @@ -252,7 +258,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.size(), i); @@ -261,11 +267,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) { sender.send(m); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); - // FIXME: bug in Queue.cpp gives the incorrect position when - // dequeueing a replaced LVQ message. - // BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 2, a)"); // Should be 1 - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); BOOST_CHECK_EQUAL(h.size(), i); @@ -345,20 +347,19 @@ QPID_AUTO_TEST_CASE(testRingQueue) { BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(c)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(c)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(d)"); - BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 4, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(d)"); Receiver receiver = f.s.createReceiver("ring"); @@ -399,15 +400,16 @@ QPID_AUTO_TEST_CASE(testTransactions) { BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit ts.commit(); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)"); - BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, b)"); - BOOST_CHECK_EQUAL(h.size(), i); - // FIXME aconway 2010-10-18: As things stand the cluster is not // compatible with transactions - // - enqueues occur after routing is complete. + // - enqueues occur after routing is complete + // - no call to Cluster::enqueue, should be in Queue::process? // - no transaction context associated with messages in the Cluster interface. // - no call to Cluster::accept in Queue::dequeueCommitted + // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); + // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)"); + BOOST_CHECK_EQUAL(h.size(), i); + Receiver receiver = ts.createReceiver("q"); BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a"); diff --git a/cpp/src/tests/cluster2_tests.py b/cpp/src/tests/cluster2_tests.py new file mode 100755 index 0000000000..e3a19ae2a0 --- /dev/null +++ b/cpp/src/tests/cluster2_tests.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, time, imp, re, subprocess +from qpid import datatypes, messaging +from qpid.brokertest import * +from qpid.harness import Skipped +from qpid.messaging import Message +from qpid.messaging.exceptions import Empty +from threading import Thread, Lock +from logging import getLogger +from itertools import chain + +log = getLogger("qpid.cluster_tests") + +class Cluster2Tests(BrokerTest): + """Tests for new cluster code.""" + + def test_message_enqueue(self): + """Test basic replication of enqueued messages.""" + + cluster = self.cluster(2, cluster2=True, args=["--log-enable=trace+:cluster"]) + + sn0 = cluster[0].connect().session() + r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); + r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); + s0 = sn0.sender("amq.fanout"); + + sn1 = cluster[1].connect().session() + r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}"); + r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}"); + + + # Send messages on member 0 + content = ["a","b","c"] + for m in content: s0.send(Message(m)) + + # Browse on both members. + def check(content, receiver): + for c in content: self.assertEqual(c, receiver.fetch(1).content) + self.assertRaises(Empty, receiver.fetch, 0) + + check(content, r0p) + check(content, r0q) + check(content, r1p) + check(content, r1q) + + sn1.connection.close() + sn0.connection.close() diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests index e136d3810a..3971a39144 100755 --- a/cpp/src/tests/run_cluster_tests +++ b/cpp/src/tests/run_cluster_tests @@ -33,5 +33,5 @@ mkdir -p $OUTDIR CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail} CLUSTER_TESTS=${CLUSTER_TESTS:-$*} -with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 +with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests -m cluster2_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 rm -rf $OUTDIR diff --git a/cpp/src/tests/test_env.sh.in b/cpp/src/tests/test_env.sh.in index b5c3b0fa3d..96fe6b64f4 100644 --- a/cpp/src/tests/test_env.sh.in +++ b/cpp/src/tests/test_env.sh.in @@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/test_store.so exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; } exportmodule ACL_LIB acl.so exportmodule CLUSTER_LIB cluster.so +exportmodule CLUSTER2_LIB cluster2.so exportmodule REPLICATING_LISTENER_LIB replicating_listener.so exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so exportmodule SSLCONNECTOR_LIB sslconnector.so |
