/* * * Copyright (c) 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ ///@file // Tests using a dummy broker::Cluster implementation to verify the expected // Cluster functions are called for various actions on the broker. // #include "unit_test.h" #include "test_tools.h" #include "qpid/broker/Cluster.h" #include "qpid/broker/Queue.h" #include "qpid/client/Connection.h" #include "qpid/client/Session.h" #include "qpid/messaging/Connection.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/Duration.h" #include "BrokerFixture.h" #include #include using namespace std; using namespace boost; using namespace boost::assign; using namespace qpid::messaging; using boost::format; using boost::intrusive_ptr; namespace qpid { namespace tests { class DummyCluster : public broker::Cluster { private: /** Flag used to ignore events other than enqueues while routing, * e.g. acquires and accepts generated in a ring queue to replace an element.. * In real impl would be a thread-local variable. */ bool isRouting; void recordQm(const string& op, const broker::QueuedMessage& qm) { history += (format("%s(%s, %d, %s)") % op % qm.queue->getName() % qm.position % qm.payload->getFrames().getContent()).str(); } void recordMsg(const string& op, broker::Queue& q, intrusive_ptr msg) { history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str(); } void recordStr(const string& op, const string& name) { history += (format("%s(%s)") % op % name).str(); } public: // Messages virtual void routing(const boost::intrusive_ptr& m) { isRouting = true; history += (format("routing(%s)") % m->getFrames().getContent()).str(); } virtual bool enqueue(broker::Queue& q, const intrusive_ptr&msg) { recordMsg("enqueue", q, msg); return true; } virtual void routed(const boost::intrusive_ptr& m) { history += (format("routed(%s)") % m->getFrames().getContent()).str(); isRouting = false; } virtual void acquire(const broker::QueuedMessage& qm) { if (!isRouting) recordQm("acquire", qm); } virtual void accept(const broker::QueuedMessage& qm) { if (!isRouting) recordQm("accept", qm); } virtual void reject(const broker::QueuedMessage& qm) { if (!isRouting) recordQm("reject", qm); } virtual void rejected(const broker::QueuedMessage& qm) { if (!isRouting) recordQm("rejected", qm); } virtual void release(const broker::QueuedMessage& qm) { if (!isRouting) recordQm("release", qm); } virtual void drop(const broker::QueuedMessage& qm) { if (!isRouting) recordQm("dequeue", qm); } // Consumers virtual void consume(const broker::Queue& q, size_t n) { history += (format("consume(%s, %d)") % q.getName() % n).str(); } virtual void cancel(const broker::Queue& q, size_t n) { history += (format("cancel(%s, %d)") % q.getName() % n).str(); } // Wiring virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); } virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); } virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); } virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); } virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) { history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str(); } vector history; }; QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite) // Broker fixture with DummyCluster set up and some new API client bits. struct DummyClusterFixture: public BrokerFixture { Connection c; Session s; DummyCluster*dc; DummyClusterFixture() { broker->setCluster(auto_ptr(new DummyCluster)); dc = &static_cast(broker->getCluster()); c = Connection("localhost:"+lexical_cast(getPort())); c.open(); s = c.createSession(); } ~DummyClusterFixture() { c.close(); } }; QPID_AUTO_TEST_CASE(testSimplePubSub) { DummyClusterFixture f; vector& h = f.dc->history; // Queue creation Sender sender = f.s.createSender("q;{create:always,delete:always}"); size_t i = 0; BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. BOOST_CHECK_EQUAL(h.size(), i); // Consumer Receiver receiver = f.s.createReceiver("q"); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); BOOST_CHECK_EQUAL(h.size(), i); // Send message sender.send(Message("a")); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); // Don't check size here as it is uncertain whether acquire has happened yet. // Acquire message Message m = receiver.fetch(Duration::SECOND); BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); BOOST_CHECK_EQUAL(h.size(), i); // Acknowledge message f.s.acknowledge(true); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); BOOST_CHECK_EQUAL(h.size(), i); // Close a consumer receiver.close(); BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)"); BOOST_CHECK_EQUAL(h.size(), i); // Destroy the queue f.c.close(); BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)"); BOOST_CHECK_EQUAL(h.size(), i); } QPID_AUTO_TEST_CASE(testReleaseReject) { DummyClusterFixture f; vector& h = f.dc->history; Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}"); sender.send(Message("a")); Receiver receiver = f.s.createReceiver("q"); Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}"); Message m = receiver.fetch(Duration::SECOND); h.clear(); // Explicit release f.s.release(m); f.s.sync(); size_t i = 0; BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); BOOST_CHECK_EQUAL(h.size(), i); // Implicit release on closing connection. Connection c("localhost:"+lexical_cast(f.getPort())); c.open(); Session s = c.createSession(); Receiver r = s.createReceiver("q"); m = r.fetch(Duration::SECOND); h.clear(); i = 0; c.close(); BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)"); BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)"); BOOST_CHECK_EQUAL(h.size(), i); // Reject message, goes to alternate exchange. m = receiver.fetch(Duration::SECOND); h.clear(); i = 0; f.s.reject(m); BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)"); BOOST_CHECK_EQUAL(h.size(), i); m = altReceiver.fetch(Duration::SECOND); BOOST_CHECK_EQUAL(m.getContent(), "a"); // Timed out message h.clear(); i = 0; m = Message("t"); m.setTtl(Duration(1)); // Timeout 1ms sender.send(m); usleep(2000); // Sleep 2ms bool received = receiver.fetch(m, Duration::IMMEDIATE); BOOST_CHECK(!received); // Timed out BOOST_CHECK_EQUAL(h.at(i++), "routing(t)"); BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(t)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)"); BOOST_CHECK_EQUAL(h.size(), i); // Message replaced on LVQ sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}"); m = Message("a"); m.getProperties()["qpid.LVQ_key"] = "foo"; sender.send(m); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.size(), i); m = Message("b"); m.getProperties()["qpid.LVQ_key"] = "foo"; sender.send(m); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); BOOST_CHECK_EQUAL(h.size(), i); receiver = f.s.createReceiver("lvq"); BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b"); f.s.acknowledge(true); BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)"); BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)"); BOOST_CHECK_EQUAL(h.at(i++), "accept(lvq, 1, b)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)"); BOOST_CHECK_EQUAL(h.size(), i); } QPID_AUTO_TEST_CASE(testFanout) { DummyClusterFixture f; vector& h = f.dc->history; Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}"); Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}"); Sender sender = f.s.createSender("amq.fanout"); r1.setCapacity(0); // Don't receive immediately. r2.setCapacity(0); h.clear(); size_t i = 0; // Send message sender.send(Message("a")); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r")); BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r")); BOOST_CHECK(h.at(i-1) != h.at(i-2)); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.size(), i); // Receive messages Message m1 = r1.fetch(Duration::SECOND); f.s.acknowledge(m1, true); Message m2 = r2.fetch(Duration::SECOND); f.s.acknowledge(m2, true); BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r1, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r2, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)"); BOOST_CHECK_EQUAL(h.size(), i); } QPID_AUTO_TEST_CASE(testRingQueue) { DummyClusterFixture f; vector& h = f.dc->history; // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working, // so we can't do this: // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}"); // Must use old API to declare ring queue: qpid::client::Connection c; f.open(c); qpid::client::Session s = c.newSession(); qpid::framing::FieldTable args; args.setInt("qpid.max_size", 3); args.setString("qpid.policy_type","ring"); s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args); c.close(); Sender sender = f.s.createSender("ring"); size_t i = 0; // Send message sender.send(Message("a")); sender.send(Message("b")); sender.send(Message("c")); sender.send(Message("d")); f.s.sync(); BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(c)"); BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(c)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(d)"); BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(d)"); Receiver receiver = f.s.createReceiver("ring"); BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c"); BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d"); f.s.acknowledge(true); BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)"); BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)"); BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)"); BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)"); BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 2, b)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)"); BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 3, c)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)"); BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 4, d)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)"); BOOST_CHECK_EQUAL(h.size(), i); } QPID_AUTO_TEST_CASE(testTransactions) { DummyClusterFixture f; vector& h = f.dc->history; Session ts = f.c.createTransactionalSession(); Sender sender = ts.createSender("q;{create:always,delete:always}"); size_t i = 0; BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking. BOOST_CHECK_EQUAL(h.size(), i); sender.send(Message("a")); sender.send(Message("b")); ts.sync(); BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(a)"); BOOST_CHECK_EQUAL(h.at(i++), "routing(b)"); BOOST_CHECK_EQUAL(h.at(i++), "routed(b)"); BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit ts.commit(); // FIXME aconway 2010-10-18: As things stand the cluster is not // compatible with transactions // - enqueues occur after routing is complete // - no call to Cluster::enqueue, should be in Queue::process? // - no transaction context associated with messages in the Cluster interface. // - no call to Cluster::accept in Queue::dequeueCommitted // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)"); // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)"); BOOST_CHECK_EQUAL(h.size(), i); Receiver receiver = ts.createReceiver("q"); BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a"); BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b"); ts.acknowledge(); ts.sync(); BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)"); BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)"); BOOST_CHECK_EQUAL(h.size(), i); ts.commit(); ts.sync(); // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)"); // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)"); BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)"); BOOST_CHECK_EQUAL(h.size(), i); } QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests