diff options
author | Kim van der Riet <kpvdr@apache.org> | 2009-09-11 13:33:42 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2009-09-11 13:33:42 +0000 |
commit | b171cc419ae5d2bc747ec2465ad1c76445f8bd37 (patch) | |
tree | 2f4d01f55832b1cee196214eae31af47f4ca4a78 /cpp/src/tests/QueueTest.cpp | |
parent | 613071992900172cb00b5eff9f39b1cc06a5e2a8 (diff) | |
download | qpid-python-b171cc419ae5d2bc747ec2465ad1c76445f8bd37.tar.gz |
Joint checkin with cctrieloff. Refactor of exchange routing so that multi-queue policy differences may be resolved and allow for correct multi-queue flow-to-disk behavior. Different queues may have differing policies and persistence properties - these were previously being neglected. New c++ test added.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@813825 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/QueueTest.cpp')
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 134 |
1 files changed, 130 insertions, 4 deletions
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index a655baeab8..b6932434b6 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -22,6 +22,8 @@ #include "test_tools.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/FanOutExchange.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" @@ -30,12 +32,14 @@ #include "qpid/broker/ExpiryPolicy.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" +#include "qpid/framing/reply_exceptions.h" #include <iostream> #include "boost/format.hpp" using boost::intrusive_ptr; using namespace qpid; using namespace qpid::broker; +using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; @@ -273,14 +277,35 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ } -class TestMessageStoreOC : public NullMessageStore +const std::string nullxid = ""; + +class SimpleDummyCtxt : public TransactionContext {}; + +class DummyCtxt : public TPCTransactionContext +{ + const std::string xid; +public: + DummyCtxt(const std::string& _xid) : xid(_xid) {} + static std::string getXid(TransactionContext& ctxt) + { + DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt)); + return c ? c->xid : nullxid; + } +}; + +class TestMessageStoreOC : public MessageStore { + std::set<std::string> prepared; + uint64_t nextPersistenceId; public: uint enqCnt; uint deqCnt; bool error; + TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {} + ~TestMessageStoreOC(){} + virtual void dequeue(TransactionContext*, const boost::intrusive_ptr<PersistableMessage>& /*msg*/, const PersistableQueue& /*queue*/) @@ -302,8 +327,32 @@ class TestMessageStoreOC : public NullMessageStore error=true; } - TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {} - ~TestMessageStoreOC(){} + bool init(const Options*) { return true; } + void truncateInit(const bool) {} + void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); } + void destroy(PersistableQueue&) {} + void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); } + void destroy(const PersistableExchange&) {} + void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} + void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} + void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); } + void destroy(const PersistableConfig&) {} + void stage(const boost::intrusive_ptr<PersistableMessage>&) {} + void destroy(PersistableMessage&) {} + void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {} + void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&, + std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); } + void flush(const qpid::broker::PersistableQueue&) {} + uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; } + + std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); } + std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); } + void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); } + void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } + void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } + void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); } + + void recover(RecoveryManager&) {} }; @@ -703,7 +752,7 @@ not requeued to the store. QPID_AUTO_TEST_CASE(testLastNodeJournalError){ /* -simulate store excption going into last node standing +simulate store exception going into last node standing */ TestMessageStoreOC testStore; @@ -727,6 +776,83 @@ simulate store excption going into last node standing } +intrusive_ptr<Message> mkMsg(std::string exchange, std::string routingKey) { + intrusive_ptr<Message> msg(new Message()); + AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); + AMQFrame header((AMQHeaderBody())); + msg->getFrames().append(method); + msg->getFrames().append(header); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + return msg; +} + +QPID_AUTO_TEST_CASE(testFlowToDiskMsgProperties){ + + TestMessageStoreOC testStore; + client::QueueOptions args; + args.setSizePolicy(FLOW_TO_DISK, 0, 1); + + intrusive_ptr<Message> msg1 = mkMsg("e", "A"); + intrusive_ptr<Message> msg2 = mkMsg("e", "B"); + intrusive_ptr<Message> msg3 = mkMsg("e", "C"); + intrusive_ptr<Message> msg4 = mkMsg("e", "D"); + intrusive_ptr<Message> msg5 = mkMsg("e", "E"); + intrusive_ptr<Message> msg6 = mkMsg("e", "F"); + intrusive_ptr<Message> msg7 = mkMsg("e", "G"); + msg4->forcePersistent(); + msg5->forcePersistent(); + msg7->forcePersistent(); + + DeliverableMessage dmsg1(msg1); + DeliverableMessage dmsg2(msg2); + DeliverableMessage dmsg3(msg3); + DeliverableMessage dmsg4(msg4); + DeliverableMessage dmsg5(msg5); + DeliverableMessage dmsg6(msg6); + DeliverableMessage dmsg7(msg7); + + FanOutExchange fanout1("fanout1", false, args); + FanOutExchange fanout2("fanout2", false, args); + + Queue::shared_ptr queue1(new Queue("queue1", true, &testStore )); + queue1->configure(args); + Queue::shared_ptr queue2(new Queue("queue2", true, &testStore )); + queue2->configure(args); + Queue::shared_ptr queue3(new Queue("queue3", true)); + fanout1.bind(queue1, "", 0); + fanout1.bind(queue2, "", 0); + fanout1.route(dmsg1, "", 0); + msg1->releaseContent(); + fanout1.route(dmsg2, "", 0); + msg2->releaseContent(); + fanout1.route(dmsg3, "", 0); + msg3->releaseContent(); + + BOOST_CHECK_EQUAL(3, queue1->getMessageCount()); + BOOST_CHECK_EQUAL(3, queue2->getMessageCount()); + BOOST_CHECK_EQUAL(msg1->isContentReleased(), false); + BOOST_CHECK_EQUAL(msg2->isContentReleased(), true); + BOOST_CHECK_EQUAL(msg3->isContentReleased(), true); + + fanout1.bind(queue3, "", 0); + fanout1.route(dmsg4, "", 0); + msg4->releaseContent(); + BOOST_CHECK_EQUAL(msg4->isContentReleased(), false); + fanout1.route(dmsg5, "", 0); + msg5->releaseContent(); + BOOST_CHECK_EQUAL(msg5->isContentReleased(), false); + + fanout2.bind(queue3, "", 0); + fanout2.route(dmsg6, "", 0); + fanout2.route(dmsg7, "", 0); + msg6->releaseContent(); + BOOST_CHECK_EQUAL(msg6->isContentReleased(), false); + msg7->releaseContent(); + BOOST_CHECK_EQUAL(msg7->isContentReleased(), false); + +} + + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |