summaryrefslogtreecommitdiff
path: root/cpp/src/tests/QueueTest.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2009-09-11 13:33:42 +0000
committerKim van der Riet <kpvdr@apache.org>2009-09-11 13:33:42 +0000
commitb171cc419ae5d2bc747ec2465ad1c76445f8bd37 (patch)
tree2f4d01f55832b1cee196214eae31af47f4ca4a78 /cpp/src/tests/QueueTest.cpp
parent613071992900172cb00b5eff9f39b1cc06a5e2a8 (diff)
downloadqpid-python-b171cc419ae5d2bc747ec2465ad1c76445f8bd37.tar.gz
Joint checkin with cctrieloff. Refactor of exchange routing so that multi-queue policy differences may be resolved and allow for correct multi-queue flow-to-disk behavior. Different queues may have differing policies and persistence properties - these were previously being neglected. New c++ test added.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@813825 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/QueueTest.cpp')
-rw-r--r--cpp/src/tests/QueueTest.cpp134
1 files changed, 130 insertions, 4 deletions
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index a655baeab8..b6932434b6 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -22,6 +22,8 @@
#include "test_tools.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -30,12 +32,14 @@
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/client/QueueOptions.h"
+#include "qpid/framing/reply_exceptions.h"
#include <iostream>
#include "boost/format.hpp"
using boost::intrusive_ptr;
using namespace qpid;
using namespace qpid::broker;
+using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -273,14 +277,35 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
}
-class TestMessageStoreOC : public NullMessageStore
+const std::string nullxid = "";
+
+class SimpleDummyCtxt : public TransactionContext {};
+
+class DummyCtxt : public TPCTransactionContext
+{
+ const std::string xid;
+public:
+ DummyCtxt(const std::string& _xid) : xid(_xid) {}
+ static std::string getXid(TransactionContext& ctxt)
+ {
+ DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
+ return c ? c->xid : nullxid;
+ }
+};
+
+class TestMessageStoreOC : public MessageStore
{
+ std::set<std::string> prepared;
+ uint64_t nextPersistenceId;
public:
uint enqCnt;
uint deqCnt;
bool error;
+ TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {}
+ ~TestMessageStoreOC(){}
+
virtual void dequeue(TransactionContext*,
const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
const PersistableQueue& /*queue*/)
@@ -302,8 +327,32 @@ class TestMessageStoreOC : public NullMessageStore
error=true;
}
- TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {}
- ~TestMessageStoreOC(){}
+ bool init(const Options*) { return true; }
+ void truncateInit(const bool) {}
+ void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); }
+ void destroy(PersistableQueue&) {}
+ void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); }
+ void destroy(const PersistableExchange&) {}
+ void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
+ void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
+ void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); }
+ void destroy(const PersistableConfig&) {}
+ void stage(const boost::intrusive_ptr<PersistableMessage>&) {}
+ void destroy(PersistableMessage&) {}
+ void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {}
+ void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&,
+ std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); }
+ void flush(const qpid::broker::PersistableQueue&) {}
+ uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; }
+
+ std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); }
+ std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); }
+ void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); }
+ void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
+ void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
+ void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); }
+
+ void recover(RecoveryManager&) {}
};
@@ -703,7 +752,7 @@ not requeued to the store.
QPID_AUTO_TEST_CASE(testLastNodeJournalError){
/*
-simulate store excption going into last node standing
+simulate store exception going into last node standing
*/
TestMessageStoreOC testStore;
@@ -727,6 +776,83 @@ simulate store excption going into last node standing
}
+intrusive_ptr<Message> mkMsg(std::string exchange, std::string routingKey) {
+ intrusive_ptr<Message> msg(new Message());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ return msg;
+}
+
+QPID_AUTO_TEST_CASE(testFlowToDiskMsgProperties){
+
+ TestMessageStoreOC testStore;
+ client::QueueOptions args;
+ args.setSizePolicy(FLOW_TO_DISK, 0, 1);
+
+ intrusive_ptr<Message> msg1 = mkMsg("e", "A");
+ intrusive_ptr<Message> msg2 = mkMsg("e", "B");
+ intrusive_ptr<Message> msg3 = mkMsg("e", "C");
+ intrusive_ptr<Message> msg4 = mkMsg("e", "D");
+ intrusive_ptr<Message> msg5 = mkMsg("e", "E");
+ intrusive_ptr<Message> msg6 = mkMsg("e", "F");
+ intrusive_ptr<Message> msg7 = mkMsg("e", "G");
+ msg4->forcePersistent();
+ msg5->forcePersistent();
+ msg7->forcePersistent();
+
+ DeliverableMessage dmsg1(msg1);
+ DeliverableMessage dmsg2(msg2);
+ DeliverableMessage dmsg3(msg3);
+ DeliverableMessage dmsg4(msg4);
+ DeliverableMessage dmsg5(msg5);
+ DeliverableMessage dmsg6(msg6);
+ DeliverableMessage dmsg7(msg7);
+
+ FanOutExchange fanout1("fanout1", false, args);
+ FanOutExchange fanout2("fanout2", false, args);
+
+ Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
+ queue1->configure(args);
+ Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
+ queue2->configure(args);
+ Queue::shared_ptr queue3(new Queue("queue3", true));
+ fanout1.bind(queue1, "", 0);
+ fanout1.bind(queue2, "", 0);
+ fanout1.route(dmsg1, "", 0);
+ msg1->releaseContent();
+ fanout1.route(dmsg2, "", 0);
+ msg2->releaseContent();
+ fanout1.route(dmsg3, "", 0);
+ msg3->releaseContent();
+
+ BOOST_CHECK_EQUAL(3, queue1->getMessageCount());
+ BOOST_CHECK_EQUAL(3, queue2->getMessageCount());
+ BOOST_CHECK_EQUAL(msg1->isContentReleased(), false);
+ BOOST_CHECK_EQUAL(msg2->isContentReleased(), true);
+ BOOST_CHECK_EQUAL(msg3->isContentReleased(), true);
+
+ fanout1.bind(queue3, "", 0);
+ fanout1.route(dmsg4, "", 0);
+ msg4->releaseContent();
+ BOOST_CHECK_EQUAL(msg4->isContentReleased(), false);
+ fanout1.route(dmsg5, "", 0);
+ msg5->releaseContent();
+ BOOST_CHECK_EQUAL(msg5->isContentReleased(), false);
+
+ fanout2.bind(queue3, "", 0);
+ fanout2.route(dmsg6, "", 0);
+ fanout2.route(dmsg7, "", 0);
+ msg6->releaseContent();
+ BOOST_CHECK_EQUAL(msg6->isContentReleased(), false);
+ msg7->releaseContent();
+ BOOST_CHECK_EQUAL(msg7->isContentReleased(), false);
+
+}
+
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests