diff options
| author | Alan Conway <aconway@apache.org> | 2010-10-18 19:36:13 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-10-18 19:36:13 +0000 |
| commit | a08d54e27d4e91b52c5979cc566ab3e933878983 (patch) | |
| tree | 7f57ad88051e4a02f52d4bdf395968549e24f57a /cpp/src/tests | |
| parent | 8e53bc375ef2bfb4b05cc32b4a8c0042d95b9ec2 (diff) | |
| download | qpid-python-a08d54e27d4e91b52c5979cc566ab3e933878983.tar.gz | |
Introduce broker::Cluster interface.
See cpp/src/qpid/cluster/new-cluster-design.txt and new-cluster-plan.txt.
qpid/cpp/src/tests/BrokerClusterCalls.cpp is a unit test that verifies
the broker makes the expected calls on broker::Cluster in various situations.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1023966 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
| -rw-r--r-- | cpp/src/tests/BrokerClusterCalls.cpp | 435 | ||||
| -rw-r--r-- | cpp/src/tests/Makefile.am | 3 | ||||
| -rw-r--r-- | cpp/src/tests/cluster.mk | 2 |
3 files changed, 438 insertions, 2 deletions
diff --git a/cpp/src/tests/BrokerClusterCalls.cpp b/cpp/src/tests/BrokerClusterCalls.cpp new file mode 100644 index 0000000000..2b06d9ce34 --- /dev/null +++ b/cpp/src/tests/BrokerClusterCalls.cpp @@ -0,0 +1,435 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +///@file +// Tests using a dummy broker::Cluster implementation to verify the expected +// Cluster functions are called for various actions on the broker. +// + +#include "unit_test.h" +#include "test_tools.h" +#include "qpid/broker/Cluster.h" +#include "qpid/broker/Queue.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Session.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Duration.h" +#include "BrokerFixture.h" +#include <boost/assign.hpp> +#include <boost/format.hpp> +#include <boost/regex.hpp> + +using namespace std; +using namespace boost; +using namespace boost::assign; +using namespace qpid::messaging; +using boost::format; +using boost::regex; + +namespace qpid { +namespace tests { + +class DummyCluster : public broker::Cluster +{ + private: + /** Flag used to ignore events other than enqueues while routing, + * e.g. acquires and accepts generated in a ring queue to replace an element.. + * In real impl would be a thread-local variable. + */ + bool isRouting; + + void recordQm(const string& op, const broker::QueuedMessage& qm) { + history += (format("%s(%s, %d, %s)") % op % qm.queue->getName() + % qm.position % qm.payload->getFrames().getContent()).str(); + } + void recordStr(const string& op, const string& name) { + history += (format("%s(%s)") % op % name).str(); + } + public: + // Messages + + virtual void routing(const boost::intrusive_ptr<broker::Message>& m) { + isRouting = true; + history += (format("routing(%s)") % m->getFrames().getContent()).str(); + } + + virtual void enqueue(broker::QueuedMessage& qm) { recordQm("enqueue", qm); } + + virtual void routed(const boost::intrusive_ptr<broker::Message>& m) { + history += (format("routed(%s)") % m->getFrames().getContent()).str(); + isRouting = false; + } + virtual void acquire(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("acquire", qm); + } + virtual void accept(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("accept", qm); + } + virtual void reject(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("reject", qm); + } + virtual void rejected(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("rejected", qm); + } + virtual void release(const broker::QueuedMessage& qm) { + if (!isRouting) recordQm("release", qm); + } + virtual void dequeue(const broker::QueuedMessage& qm) { + // Never ignore dequeue, used to avoid resource leaks. + recordQm("dequeue", qm); + } + + // Consumers + + virtual void consume(const broker::Queue& q, size_t n) { + history += (format("consume(%s, %d)") % q.getName() % n).str(); + } + virtual void cancel(const broker::Queue& q, size_t n) { + history += (format("cancel(%s, %d)") % q.getName() % n).str(); + } + + // Wiring + + virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); } + virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); } + virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); } + virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); } + virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) { + history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str(); + } + vector<string> history; +}; + +QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite) + +// Broker fixture with DummyCluster set up and some new API client bits. +struct DummyClusterFixture: public BrokerFixture { + Connection c; + Session s; + DummyCluster*dc; + DummyClusterFixture() { + broker->setCluster(auto_ptr<broker::Cluster>(new DummyCluster)); + dc = &static_cast<DummyCluster&>(broker->getCluster()); + c = Connection("localhost:"+lexical_cast<string>(getPort())); + c.open(); + s = c.createSession(); + } + ~DummyClusterFixture() { + c.close(); + } +}; + +QPID_AUTO_TEST_CASE(testSimplePubSub) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + // Queue creation + Sender sender = f.s.createSender("q;{create:always,delete:always}"); + int i = 0; + BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. + BOOST_CHECK_EQUAL(h.size(), i); + + // Consumer + Receiver receiver = f.s.createReceiver("q"); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Send message + sender.send(Message("a")); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + // Don't check size here as it is uncertain whether acquire has happened yet. + + // Acquire message + Message m = receiver.fetch(Duration::SECOND); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Acknowledge message + f.s.acknowledge(true); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Close a consumer + receiver.close(); + BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Destroy the queue + f.c.close(); + BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testReleaseReject) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}"); + sender.send(Message("a")); + Receiver receiver = f.s.createReceiver("q"); + Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}"); + Message m = receiver.fetch(Duration::SECOND); + h.clear(); + + // Explicit release + f.s.release(m); + f.s.sync(); + int i = 0; + BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Implicit release on closing connection. + Connection c("localhost:"+lexical_cast<string>(f.getPort())); + c.open(); + Session s = c.createSession(); + Receiver r = s.createReceiver("q"); + m = r.fetch(Duration::SECOND); + h.clear(); + i = 0; + c.close(); + BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Reject message, goes to alternate exchange. + m = receiver.fetch(Duration::SECOND); + h.clear(); + i = 0; + f.s.reject(m); + BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); + m = altReceiver.fetch(Duration::SECOND); + BOOST_CHECK_EQUAL(m.getContent(), "a"); + + // Timed out message + h.clear(); + i = 0; + m = Message("t"); + m.setTtl(Duration(1)); // Timeout 1ms + sender.send(m); + usleep(2000); // Sleep 2ms + bool received = receiver.fetch(m, Duration::IMMEDIATE); + BOOST_CHECK(!received); // Timed out + BOOST_CHECK_EQUAL(h.at(i++), "routing(t)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, t)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Message replaced on LVQ + sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}"); + m = Message("a"); + m.getProperties()["qpid.LVQ_key"] = "foo"; + sender.send(m); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)"); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + m = Message("b"); + m.getProperties()["qpid.LVQ_key"] = "foo"; + sender.send(m); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); + // FIXME: bug in Queue.cpp gives the incorrect position when + // dequeueing a replaced LVQ message. + // BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 2, a)"); // Should be 1 + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); + BOOST_CHECK_EQUAL(h.size(), i); + + receiver = f.s.createReceiver("lvq"); + BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b"); + f.s.acknowledge(true); + BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "accept(lvq, 1, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testFanout) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}"); + Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}"); + Sender sender = f.s.createSender("amq.fanout"); + r1.setCapacity(0); // Don't receive immediately. + r2.setCapacity(0); + h.clear(); + int i = 0; + + // Send message + sender.send(Message("a")); + f.s.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_REGEX("enqueue\\(amq.fanout_r[12], 1, a\\)", h.at(i++)); + BOOST_CHECK_REGEX("enqueue\\(amq.fanout_r[12], 1, a\\)", h.at(i++)); + BOOST_CHECK(h.at(i-1) != h.at(i-2)); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // Receive messages + Message m1 = r1.fetch(Duration::SECOND); + f.s.acknowledge(m1, true); + Message m2 = r2.fetch(Duration::SECOND); + f.s.acknowledge(m2, true); + + BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r1, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r2, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testRingQueue) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + + // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working, + // so we can't do this: + // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}"); + // Must use old API to declare ring queue: + qpid::client::Connection c; + f.open(c); + qpid::client::Session s = c.newSession(); + qpid::framing::FieldTable args; + args.setInt("qpid.max_size", 3); + args.setString("qpid.policy_type","ring"); + s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args); + c.close(); + Sender sender = f.s.createSender("ring"); + + int i = 0; + // Send message + sender.send(Message("a")); + sender.send(Message("b")); + sender.send(Message("c")); + sender.send(Message("d")); + f.s.sync(); + + BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(c)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(c)"); + + BOOST_CHECK_EQUAL(h.at(i++), "routing(d)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 4, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(d)"); + + Receiver receiver = f.s.createReceiver("ring"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d"); + f.s.acknowledge(true); + + BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)"); + BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 4, d)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)"); + + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_CASE(testTransactions) { + DummyClusterFixture f; + vector<string>& h = f.dc->history; + Session ts = f.c.createTransactionalSession(); + Sender sender = ts.createSender("q;{create:always,delete:always}"); + int i = 0; + BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. + BOOST_CHECK_EQUAL(h.size(), i); + + sender.send(Message("a")); + sender.send(Message("b")); + ts.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); + BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); + BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); + BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit + ts.commit(); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, b)"); + BOOST_CHECK_EQUAL(h.size(), i); + + // FIXME aconway 2010-10-18: As things stand the cluster is not + // compatible with transactions + // - enqueues occur after routing is complete. + // - no transaction context associated with messages in the Cluster interface. + // - no call to Cluster::accept in Queue::dequeueCommitted + + Receiver receiver = ts.createReceiver("q"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a"); + BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); + ts.acknowledge(); + ts.sync(); + BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)"); + BOOST_CHECK_EQUAL(h.size(), i); + ts.commit(); + ts.sync(); + // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); + // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)"); + BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)"); + BOOST_CHECK_EQUAL(h.size(), i); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests + diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 02454971cb..5e1438d813 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -123,7 +123,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ Variant.cpp \ Address.cpp \ ClientMessage.cpp \ - Qmf2.cpp + Qmf2.cpp \ + BrokerClusterCalls.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index da191e8682..a033025226 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -77,7 +77,7 @@ cluster_test_SOURCES = \ PartialFailure.cpp \ ClusterFailover.cpp -cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework +cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework -lboost_regex qpidtest_SCRIPTS += run_cluster_tests cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail |
