summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-08-10 12:04:27 +0000
committerGordon Sim <gsim@apache.org>2012-08-10 12:04:27 +0000
commitdf36b35eb7ca20c3b354d6895004fb201346482b (patch)
tree357d90752f44304284639014f3b9db0cae1f2b2b /qpid/cpp/src/tests
parent798cebf0e4f41953eb542d6358e5f0eea33d85a7 (diff)
downloadqpid-python-df36b35eb7ca20c3b354d6895004fb201346482b.tar.gz
QPID-4178: broker refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1371676 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/CMakeLists.txt5
-rw-r--r--qpid/cpp/src/tests/ClientSessionTest.cpp2
-rw-r--r--qpid/cpp/src/tests/DeliveryRecordTest.cpp2
-rw-r--r--qpid/cpp/src/tests/ExchangeTest.cpp80
-rw-r--r--qpid/cpp/src/tests/Makefile.am5
-rw-r--r--qpid/cpp/src/tests/MessageBuilderTest.cpp190
-rw-r--r--qpid/cpp/src/tests/MessageTest.cpp59
-rw-r--r--qpid/cpp/src/tests/MessageUtils.h61
-rw-r--r--qpid/cpp/src/tests/QueueDepth.cpp105
-rw-r--r--qpid/cpp/src/tests/QueueEvents.cpp238
-rw-r--r--qpid/cpp/src/tests/QueueFlowLimitTest.cpp38
-rw-r--r--qpid/cpp/src/tests/QueuePolicyTest.cpp135
-rw-r--r--qpid/cpp/src/tests/QueueRegistryTest.cpp23
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp1267
-rw-r--r--qpid/cpp/src/tests/ReplicationTest.cpp144
-rw-r--r--qpid/cpp/src/tests/TxMocks.h2
-rw-r--r--qpid/cpp/src/tests/TxPublishTest.cpp98
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py10
-rw-r--r--qpid/cpp/src/tests/test_store.cpp3
19 files changed, 414 insertions, 2053 deletions
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt
index 9a77514f5f..f88c0eb58e 100644
--- a/qpid/cpp/src/tests/CMakeLists.txt
+++ b/qpid/cpp/src/tests/CMakeLists.txt
@@ -126,6 +126,7 @@ set(unit_tests_to_build
ExchangeTest
HeadersExchangeTest
MessageTest
+ QueueDepth
QueueRegistryTest
QueuePolicyTest
QueueFlowLimitTest
@@ -135,16 +136,12 @@ set(unit_tests_to_build
TimerTest
TopicExchangeTest
TxBufferTest
- TxPublishTest
- MessageBuilderTest
ManagementTest
MessageReplayTracker
ConsoleTest
- QueueEvents
ProxyTest
RetryList
FrameDecoder
- ReplicationTest
ClientMessageTest
PollableCondition
Variant
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp
index 1905219bf2..1f07d2b83f 100644
--- a/qpid/cpp/src/tests/ClientSessionTest.cpp
+++ b/qpid/cpp/src/tests/ClientSessionTest.cpp
@@ -621,7 +621,7 @@ QPID_AUTO_TEST_CASE(testQueueDeleted)
fix.session.queueDeclare(arg::queue="my-queue");
LocalQueue queue;
fix.subs.subscribe(queue, "my-queue");
-
+
ScopedSuppressLogging sl;
fix.session.queueDelete(arg::queue="my-queue");
BOOST_CHECK_THROW(queue.get(1*qpid::sys::TIME_SEC), qpid::framing::ResourceDeletedException);
diff --git a/qpid/cpp/src/tests/DeliveryRecordTest.cpp b/qpid/cpp/src/tests/DeliveryRecordTest.cpp
index fb7bd2f727..c83bd9a6a4 100644
--- a/qpid/cpp/src/tests/DeliveryRecordTest.cpp
+++ b/qpid/cpp/src/tests/DeliveryRecordTest.cpp
@@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort)
list<DeliveryRecord> records;
for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) {
- DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
+ DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
r.setId(*i);
records.push_back(r);
}
diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp
index 66a16b9178..4f18b91b5a 100644
--- a/qpid/cpp/src/tests/ExchangeTest.cpp
+++ b/qpid/cpp/src/tests/ExchangeTest.cpp
@@ -35,7 +35,6 @@
using std::string;
-using boost::intrusive_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -62,11 +61,9 @@ QPID_AUTO_TEST_CASE(testMe)
queue.reset();
queue2.reset();
- intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "abc", false, "id"));
- DeliverableMessage msg(msgPtr);
+ DeliverableMessage msg(MessageUtils::createMessage("exchange", "abc"), 0);
topic.route(msg);
direct.route(msg);
-
}
QPID_AUTO_TEST_CASE(testIsBound)
@@ -170,16 +167,6 @@ QPID_AUTO_TEST_CASE(testDeleteGetAndRedeclare)
BOOST_CHECK_EQUAL(string("direct"), response.first->getType());
}
-intrusive_ptr<Message> cmessage(std::string exchange, std::string routingKey) {
- intrusive_ptr<Message> msg(new Message());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- msg->getFrames().append(method);
- msg->getFrames().append(header);
- msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
- return msg;
-}
-
QPID_AUTO_TEST_CASE(testSequenceOptions)
{
FieldTable args;
@@ -189,46 +176,35 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
{
DirectExchange direct("direct1", false, args);
- intrusive_ptr<Message> msg1 = cmessage("e", "abc");
- intrusive_ptr<Message> msg2 = cmessage("e", "abc");
- intrusive_ptr<Message> msg3 = cmessage("e", "abc");
-
- DeliverableMessage dmsg1(msg1);
- DeliverableMessage dmsg2(msg2);
- DeliverableMessage dmsg3(msg3);
+ DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0);
+ DeliverableMessage msg2(MessageUtils::createMessage("e", "abc"), 0);
+ DeliverableMessage msg3(MessageUtils::createMessage("e", "abc"), 0);
- direct.route(dmsg1);
- direct.route(dmsg2);
- direct.route(dmsg3);
+ direct.route(msg1);
+ direct.route(msg2);
+ direct.route(msg3);
- BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
- BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
- BOOST_CHECK_EQUAL(3, msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ BOOST_CHECK_EQUAL(1, msg1.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
+ BOOST_CHECK_EQUAL(2, msg2.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
+ BOOST_CHECK_EQUAL(3, msg3.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
FanOutExchange fanout("fanout1", false, args);
HeadersExchange header("headers1", false, args);
TopicExchange topic ("topic1", false, args);
// check other exchanges, that they preroute
- intrusive_ptr<Message> msg4 = cmessage("e", "abc");
- intrusive_ptr<Message> msg5 = cmessage("e", "abc");
+ DeliverableMessage msg4(MessageUtils::createMessage("e", "abc"), 0);
+ DeliverableMessage msg5(MessageUtils::createMessage("e", "abc"), 0);
+ DeliverableMessage msg6(MessageUtils::createMessage("e", "abc"), 0);
- // Need at least empty header for the HeadersExchange to route at all
- msg5->insertCustomProperty("", "");
- intrusive_ptr<Message> msg6 = cmessage("e", "abc");
+ fanout.route(msg4);
+ BOOST_CHECK_EQUAL(1, msg4.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
- DeliverableMessage dmsg4(msg4);
- DeliverableMessage dmsg5(msg5);
- DeliverableMessage dmsg6(msg6);
+ header.route(msg5);
+ BOOST_CHECK_EQUAL(1, msg5.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
- fanout.route(dmsg4);
- BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-
- header.route(dmsg5);
- BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
-
- topic.route(dmsg6);
- BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ topic.route(msg6);
+ BOOST_CHECK_EQUAL(1, msg6.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
direct.encode(buffer);
}
{
@@ -237,11 +213,10 @@ QPID_AUTO_TEST_CASE(testSequenceOptions)
buffer.reset();
DirectExchange::shared_ptr exch_dec = Exchange::decode(exchanges, buffer);
- intrusive_ptr<Message> msg1 = cmessage("e", "abc");
- DeliverableMessage dmsg1(msg1);
- exch_dec->route(dmsg1);
+ DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0);
+ exch_dec->route(msg1);
- BOOST_CHECK_EQUAL(4, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ BOOST_CHECK_EQUAL(4, msg1.getMessage().getAnnotation("qpid.msg_sequence").asInt64());
}
delete [] buff;
@@ -256,9 +231,11 @@ QPID_AUTO_TEST_CASE(testIVEOption)
HeadersExchange header("headers1", false, args);
TopicExchange topic ("topic1", false, args);
- intrusive_ptr<Message> msg1 = cmessage("direct1", "abc");
- msg1->insertCustomProperty("a", "abc");
- DeliverableMessage dmsg1(msg1);
+ qpid::types::Variant::Map properties;
+ properties["routing-key"] = "abc";
+ properties["a"] = "abc";
+ Message msg1 = MessageUtils::createMessage(properties, "my-message", "direct1");
+ DeliverableMessage dmsg1(msg1, 0);
FieldTable args2;
args2.setString("x-match", "any");
@@ -273,8 +250,6 @@ QPID_AUTO_TEST_CASE(testIVEOption)
Queue::shared_ptr queue2(new Queue("queue2", true));
Queue::shared_ptr queue3(new Queue("queue3", true));
- BOOST_CHECK(HeadersExchange::match(args2, msg1->getProperties<MessageProperties>()->getApplicationHeaders()));
-
BOOST_CHECK(direct.bind(queue, "abc", 0));
BOOST_CHECK(fanout.bind(queue1, "abc", 0));
BOOST_CHECK(header.bind(queue2, "", &args2));
@@ -287,7 +262,6 @@ QPID_AUTO_TEST_CASE(testIVEOption)
}
-
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index b04ec6b43e..f9eed9270b 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -96,6 +96,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
ExchangeTest.cpp \
HeadersExchangeTest.cpp \
MessageTest.cpp \
+ QueueDepth.cpp \
QueueRegistryTest.cpp \
QueuePolicyTest.cpp \
QueueFlowLimitTest.cpp \
@@ -105,19 +106,15 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
TimerTest.cpp \
TopicExchangeTest.cpp \
TxBufferTest.cpp \
- TxPublishTest.cpp \
- MessageBuilderTest.cpp \
ConnectionOptions.h \
ForkedBroker.h \
ForkedBroker.cpp \
ManagementTest.cpp \
MessageReplayTracker.cpp \
ConsoleTest.cpp \
- QueueEvents.cpp \
ProxyTest.cpp \
RetryList.cpp \
FrameDecoder.cpp \
- ReplicationTest.cpp \
ClientMessageTest.cpp \
PollableCondition.cpp \
Variant.cpp \
diff --git a/qpid/cpp/src/tests/MessageBuilderTest.cpp b/qpid/cpp/src/tests/MessageBuilderTest.cpp
deleted file mode 100644
index 9adb133d40..0000000000
--- a/qpid/cpp/src/tests/MessageBuilderTest.cpp
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/Message.h"
-#include "qpid/broker/MessageBuilder.h"
-#include "qpid/broker/NullMessageStore.h"
-#include "qpid/framing/frame_functors.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/TypeFilter.h"
-#include "unit_test.h"
-#include <list>
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-namespace qpid {
-namespace tests {
-
-class MockMessageStore : public NullMessageStore
-{
- enum Op {STAGE=1, APPEND=2};
-
- uint64_t id;
- boost::intrusive_ptr<PersistableMessage> expectedMsg;
- std::string expectedData;
- std::list<Op> ops;
-
- void checkExpectation(Op actual)
- {
- BOOST_CHECK_EQUAL(ops.front(), actual);
- ops.pop_front();
- }
-
- public:
- MockMessageStore() : id(0), expectedMsg(0) {}
-
- void expectStage(PersistableMessage& msg)
- {
- expectedMsg = &msg;
- ops.push_back(STAGE);
- }
-
- void expectAppendContent(PersistableMessage& msg, const std::string& data)
- {
- expectedMsg = &msg;
- expectedData = data;
- ops.push_back(APPEND);
- }
-
- void stage(const boost::intrusive_ptr<PersistableMessage>& msg)
- {
- checkExpectation(STAGE);
- BOOST_CHECK_EQUAL(expectedMsg, msg);
- msg->setPersistenceId(++id);
- }
-
- void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
- const std::string& data)
- {
- checkExpectation(APPEND);
- BOOST_CHECK_EQUAL(boost::static_pointer_cast<const PersistableMessage>(expectedMsg), msg);
- BOOST_CHECK_EQUAL(expectedData, data);
- }
-
- bool expectationsMet()
- {
- return ops.empty();
- }
-
- //don't treat this store as a null impl
- bool isNull() const
- {
- return false;
- }
-
-};
-
-QPID_AUTO_TEST_SUITE(MessageBuilderTestSuite)
-
-QPID_AUTO_TEST_CASE(testHeaderOnly)
-{
- MessageBuilder builder(0);
- builder.start(SequenceNumber());
-
- std::string exchange("builder-exchange");
- std::string key("builder-exchange");
-
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
- AMQFrame header((AMQHeaderBody()));
-
- header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0);
- header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
-
- builder.handle(method);
- builder.handle(header);
-
- BOOST_CHECK(builder.getMessage());
- BOOST_CHECK_EQUAL(exchange, builder.getMessage()->getExchangeName());
- BOOST_CHECK_EQUAL(key, builder.getMessage()->getRoutingKey());
- BOOST_CHECK(builder.getMessage()->getFrames().isComplete());
-}
-
-QPID_AUTO_TEST_CASE(test1ContentFrame)
-{
- MessageBuilder builder(0);
- builder.start(SequenceNumber());
-
- std::string data("abcdefg");
- std::string exchange("builder-exchange");
- std::string key("builder-exchange");
-
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- AMQFrame content((AMQContentBody(data)));
- method.setEof(false);
- header.setBof(false);
- header.setEof(false);
- content.setBof(false);
-
- header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data.size());
- header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
-
- builder.handle(method);
- BOOST_CHECK(builder.getMessage());
- BOOST_CHECK(!builder.getMessage()->getFrames().isComplete());
-
- builder.handle(header);
- BOOST_CHECK(builder.getMessage());
- BOOST_CHECK(!builder.getMessage()->getFrames().isComplete());
-
- builder.handle(content);
- BOOST_CHECK(builder.getMessage());
- BOOST_CHECK(builder.getMessage()->getFrames().isComplete());
-}
-
-QPID_AUTO_TEST_CASE(test2ContentFrames)
-{
- MessageBuilder builder(0);
- builder.start(SequenceNumber());
-
- std::string data1("abcdefg");
- std::string data2("hijklmn");
- std::string exchange("builder-exchange");
- std::string key("builder-exchange");
-
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- AMQFrame content1((AMQContentBody(data1)));
- AMQFrame content2((AMQContentBody(data2)));
- method.setEof(false);
- header.setBof(false);
- header.setEof(false);
- content1.setBof(false);
- content1.setEof(false);
- content2.setBof(false);
-
- header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size());
- header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
-
- builder.handle(method);
- builder.handle(header);
- builder.handle(content1);
- BOOST_CHECK(builder.getMessage());
- BOOST_CHECK(!builder.getMessage()->getFrames().isComplete());
-
- builder.handle(content2);
- BOOST_CHECK(builder.getMessage());
- BOOST_CHECK(builder.getMessage()->getFrames().isComplete());
-}
-QPID_AUTO_TEST_SUITE_END()
-
-}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/MessageTest.cpp b/qpid/cpp/src/tests/MessageTest.cpp
index 3a3ed061f9..fe670a274e 100644
--- a/qpid/cpp/src/tests/MessageTest.cpp
+++ b/qpid/cpp/src/tests/MessageTest.cpp
@@ -24,6 +24,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/Uuid.h"
+#include "MessageUtils.h"
#include "unit_test.h"
@@ -43,49 +44,29 @@ QPID_AUTO_TEST_CASE(testEncodeDecode)
{
string exchange = "MyExchange";
string routingKey = "MyRoutingKey";
+ uint64_t ttl(60);
Uuid messageId(true);
- string data1("abcdefg");
- string data2("hijklmn");
+ string data("abcdefghijklmn");
- boost::intrusive_ptr<Message> msg(new Message());
+ qpid::types::Variant::Map properties;
+ properties["routing-key"] = routingKey;
+ properties["ttl"] = ttl;
+ properties["durable"] = true;
+ properties["message-id"] = qpid::types::Uuid(messageId.data());
+ properties["abc"] = "xyz";
+ Message msg = MessageUtils::createMessage(properties, data);
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- AMQFrame content1((AMQContentBody(data1)));
- AMQFrame content2((AMQContentBody(data2)));
+ std::string buffer;
+ encode(msg, buffer);
+ msg = Message();
+ decode(buffer, msg);
- msg->getFrames().append(method);
- msg->getFrames().append(header);
- msg->getFrames().append(content1);
- msg->getFrames().append(content2);
-
- MessageProperties* mProps = msg->getFrames().getHeaders()->get<MessageProperties>(true);
- mProps->setContentLength(data1.size() + data2.size());
- mProps->setMessageId(messageId);
- FieldTable applicationHeaders;
- applicationHeaders.setString("abc", "xyz");
- mProps->setApplicationHeaders(applicationHeaders);
- DeliveryProperties* dProps = msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
- dProps->setRoutingKey(routingKey);
- dProps->setDeliveryMode(PERSISTENT);
- BOOST_CHECK(msg->isPersistent());
-
- std::vector<char> buff(msg->encodedSize());
- Buffer wbuffer(&buff[0], msg->encodedSize());
- msg->encode(wbuffer);
-
- Buffer rbuffer(&buff[0], msg->encodedSize());
- msg = new Message();
- msg->decodeHeader(rbuffer);
- msg->decodeContent(rbuffer);
- BOOST_CHECK_EQUAL(exchange, msg->getExchangeName());
- BOOST_CHECK_EQUAL(routingKey, msg->getRoutingKey());
- BOOST_CHECK_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize());
- BOOST_CHECK_EQUAL((uint64_t) data1.size() + data2.size(), msg->getProperties<MessageProperties>()->getContentLength());
- BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
- BOOST_CHECK_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("abc"));
- BOOST_CHECK_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode());
- BOOST_CHECK(msg->isPersistent());
+ BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
+ BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContentSize());
+ BOOST_CHECK_EQUAL(data, msg.getContent());
+ //BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
+ BOOST_CHECK_EQUAL(string("xyz"), msg.getPropertyAsString("abc"));
+ BOOST_CHECK(msg.isPersistent());
}
QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/MessageUtils.h b/qpid/cpp/src/tests/MessageUtils.h
index 991e2a2714..c2eabd804d 100644
--- a/qpid/cpp/src/tests/MessageUtils.h
+++ b/qpid/cpp/src/tests/MessageUtils.h
@@ -20,9 +20,11 @@
*/
#include "qpid/broker/Message.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/types/Variant.h"
using namespace qpid;
using namespace broker;
@@ -33,11 +35,46 @@ namespace tests {
struct MessageUtils
{
- static boost::intrusive_ptr<Message> createMessage(const std::string& exchange="", const std::string& routingKey="",
- const bool durable = false, const Uuid& messageId=Uuid(true),
- uint64_t contentSize = 0)
+ static Message createMessage(const qpid::types::Variant::Map& properties, const std::string& content="", const std::string& destination = "")
{
- boost::intrusive_ptr<broker::Message> msg(new broker::Message());
+ boost::intrusive_ptr<broker::amqp_0_10::MessageTransfer> msg(new broker::amqp_0_10::MessageTransfer());
+
+ AMQFrame method(( MessageTransferBody(ProtocolVersion(), destination, 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ if (content.size()) {
+ msg->getFrames().getHeaders()->get<MessageProperties>(true)->setContentLength(content.size());
+ AMQFrame data((AMQContentBody(content)));
+ msg->getFrames().append(data);
+ }
+ for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ if (i->first == "routing-key" && !i->second.isVoid()) {
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(i->second);
+ } else if (i->first == "message-id" && !i->second.isVoid()) {
+ qpid::types::Uuid id = i->second;
+ qpid::framing::Uuid id2(id.data());
+ msg->getFrames().getHeaders()->get<MessageProperties>(true)->setMessageId(id2);
+ } else if (i->first == "ttl" && !i->second.isVoid()) {
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(i->second);
+ } else if (i->first == "priority" && !i->second.isVoid()) {
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setPriority(i->second);
+ } else if (i->first == "durable" && !i->second.isVoid()) {
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(i->second.asBool() ? 2 : 1);
+ } else {
+ msg->getFrames().getHeaders()->get<MessageProperties>(true)->getApplicationHeaders().setString(i->first, i->second);
+ }
+ }
+ return Message(msg, msg);
+ }
+
+
+ static Message createMessage(const std::string& exchange="", const std::string& routingKey="",
+ uint64_t ttl = 0, bool durable = false, const Uuid& messageId=Uuid(true),
+ const std::string& content="")
+ {
+ boost::intrusive_ptr<broker::amqp_0_10::MessageTransfer> msg(new broker::amqp_0_10::MessageTransfer());
AMQFrame method(( MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
AMQFrame header((AMQHeaderBody()));
@@ -45,18 +82,18 @@ struct MessageUtils
msg->getFrames().append(method);
msg->getFrames().append(header);
MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setContentLength(contentSize);
+ props->setContentLength(content.size());
props->setMessageId(messageId);
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
if (durable)
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2);
- return msg;
- }
-
- static void addContent(boost::intrusive_ptr<Message> msg, const std::string& data)
- {
- AMQFrame content((AMQContentBody(data)));
- msg->getFrames().append(content);
+ if (ttl)
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
+ if (content.size()) {
+ AMQFrame data((AMQContentBody(content)));
+ msg->getFrames().append(data);
+ }
+ return Message(msg, msg);
}
};
diff --git a/qpid/cpp/src/tests/QueueDepth.cpp b/qpid/cpp/src/tests/QueueDepth.cpp
new file mode 100644
index 0000000000..73556141ca
--- /dev/null
+++ b/qpid/cpp/src/tests/QueueDepth.cpp
@@ -0,0 +1,105 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/broker/QueueDepth.h"
+
+#include "unit_test.h"
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(QueueDepthTestSuite)
+
+using namespace qpid::broker;
+
+QPID_AUTO_TEST_CASE(testCompare)
+{
+ QueueDepth a(0, 0);
+ QueueDepth b(1, 1);
+ QueueDepth c(2, 2);
+ QueueDepth d(1, 1);
+
+ BOOST_CHECK(a < b);
+ BOOST_CHECK(b < c);
+ BOOST_CHECK(a < c);
+
+ BOOST_CHECK(b > a);
+ BOOST_CHECK(c > b);
+ BOOST_CHECK(c > a);
+
+ BOOST_CHECK(b == d);
+ BOOST_CHECK(d == b);
+ BOOST_CHECK(a != b);
+ BOOST_CHECK(b != a);
+
+ QueueDepth e; e.setCount(1);
+ QueueDepth f; f.setCount(2);
+ BOOST_CHECK(e < f);
+ BOOST_CHECK(f > e);
+
+ QueueDepth g; g.setSize(1);
+ QueueDepth h; h.setSize(2);
+ BOOST_CHECK(g < h);
+ BOOST_CHECK(h > g);
+}
+
+QPID_AUTO_TEST_CASE(testIncrement)
+{
+ QueueDepth a(5, 10);
+ QueueDepth b(3, 6);
+ QueueDepth c(8, 16);
+ a += b;
+ BOOST_CHECK(a == c);
+ BOOST_CHECK_EQUAL(8, a.getCount());
+ BOOST_CHECK_EQUAL(16, a.getSize());
+}
+
+QPID_AUTO_TEST_CASE(testDecrement)
+{
+ QueueDepth a(5, 10);
+ QueueDepth b(3, 6);
+ QueueDepth c(2, 4);
+ a -= b;
+ BOOST_CHECK(a == c);
+ BOOST_CHECK_EQUAL(2, a.getCount());
+ BOOST_CHECK_EQUAL(4, a.getSize());
+}
+
+QPID_AUTO_TEST_CASE(testAddition)
+{
+ QueueDepth a(5, 10);
+ QueueDepth b(3, 6);
+
+ QueueDepth c = a + b;
+ BOOST_CHECK_EQUAL(8, c.getCount());
+ BOOST_CHECK_EQUAL(16, c.getSize());
+}
+
+QPID_AUTO_TEST_CASE(testSubtraction)
+{
+ QueueDepth a(5, 10);
+ QueueDepth b(3, 6);
+
+ QueueDepth c = a - b;
+ BOOST_CHECK_EQUAL(2, c.getCount());
+ BOOST_CHECK_EQUAL(4, c.getSize());
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/QueueEvents.cpp b/qpid/cpp/src/tests/QueueEvents.cpp
deleted file mode 100644
index cea8bbf0db..0000000000
--- a/qpid/cpp/src/tests/QueueEvents.cpp
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "MessageUtils.h"
-#include "unit_test.h"
-#include "BrokerFixture.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueEvents.h"
-#include "qpid/client/QueueOptions.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "qpid/sys/Dispatcher.h"
-#include <boost/bind.hpp>
-#include <boost/format.hpp>
-
-namespace qpid {
-namespace tests {
-
-QPID_AUTO_TEST_SUITE(QueueEventsSuite)
-
-using namespace qpid::client;
-using namespace qpid::broker;
-using namespace qpid::sys;
-using qpid::framing::SequenceNumber;
-
-struct EventChecker
-{
- typedef std::deque<QueueEvents::Event> Events;
-
- Events events;
- boost::shared_ptr<Poller> poller;
-
- void handle(QueueEvents::Event e)
- {
- if (events.empty()) {
- BOOST_FAIL("Unexpected event received");
- } else {
- BOOST_CHECK_EQUAL(events.front().type, e.type);
- BOOST_CHECK_EQUAL(events.front().msg.queue, e.msg.queue);
- BOOST_CHECK_EQUAL(events.front().msg.payload, e.msg.payload);
- BOOST_CHECK_EQUAL(events.front().msg.position, e.msg.position);
- events.pop_front();
- }
- if (events.empty() && poller) poller->shutdown();
- }
-
- void expect(QueueEvents::Event e)
- {
- events.push_back(e);
- }
-};
-
-QPID_AUTO_TEST_CASE(testBasicEventProcessing)
-{
- boost::shared_ptr<Poller> poller(new Poller());
- sys::Dispatcher dispatcher(poller);
- Thread dispatchThread(dispatcher);
- QueueEvents events(poller);
- EventChecker listener;
- listener.poller = poller;
- events.registerListener("dummy", boost::bind(&EventChecker::handle, &listener, _1));
- //signal occurence of some events:
- Queue queue("queue1");
- SequenceNumber id;
- QueuedMessage event1(&queue, MessageUtils::createMessage(), id);
- QueuedMessage event2(&queue, MessageUtils::createMessage(), ++id);
-
- //define events expected by listener:
- listener.expect(QueueEvents::Event(QueueEvents::ENQUEUE, event1));
- listener.expect(QueueEvents::Event(QueueEvents::ENQUEUE, event2));
- listener.expect(QueueEvents::Event(QueueEvents::DEQUEUE, event1));
-
- events.enqueued(event1);
- events.enqueued(event2);
- events.dequeued(event1);
-
- dispatchThread.join();
- events.shutdown();
- events.unregisterListener("dummy");
-}
-
-
-struct EventRecorder
-{
- struct EventRecord
- {
- QueueEvents::EventType type;
- std::string queue;
- std::string content;
- SequenceNumber position;
- };
-
- typedef std::deque<EventRecord> Events;
-
- Events events;
-
- void handle(QueueEvents::Event event)
- {
- EventRecord record;
- record.type = event.type;
- record.queue = event.msg.queue->getName();
- event.msg.payload->getFrames().getContent(record.content);
- record.position = event.msg.position;
- events.push_back(record);
- }
-
- void check(QueueEvents::EventType type, const std::string& queue, const std::string& content, const SequenceNumber& position)
- {
- if (events.empty()) {
- BOOST_FAIL("Missed event");
- } else {
- BOOST_CHECK_EQUAL(events.front().type, type);
- BOOST_CHECK_EQUAL(events.front().queue, queue);
- BOOST_CHECK_EQUAL(events.front().content, content);
- BOOST_CHECK_EQUAL(events.front().position, position);
- events.pop_front();
- }
- }
- void checkEnqueue(const std::string& queue, const std::string& data, const SequenceNumber& position)
- {
- check(QueueEvents::ENQUEUE, queue, data, position);
- }
-
- void checkDequeue(const std::string& queue, const std::string& data, const SequenceNumber& position)
- {
- check(QueueEvents::DEQUEUE, queue, data, position);
- }
-};
-
-QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing)
-{
- SessionFixture fixture;
- //register dummy event listener to broker
- EventRecorder listener;
- fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
-
- //declare queue with event options specified
- QueueOptions options;
- options.enableQueueEvents(false);
- std::string q("queue-events-test");
- fixture.session.queueDeclare(arg::queue=q, arg::arguments=options);
- //send and consume some messages
- LocalQueue incoming;
- Subscription sub = fixture.subs.subscribe(incoming, q);
- for (int i = 0; i < 5; i++) {
- fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
- }
- for (int i = 0; i < 3; i++) {
- BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
- }
- for (int i = 5; i < 10; i++) {
- fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
- }
- for (int i = 3; i < 10; i++) {
- BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
- }
- fixture.connection.close();
- fixture.broker->getQueueEvents().shutdown();
-
- //check listener was notified of all events, and in correct order
- SequenceNumber enqueueId(1);
- SequenceNumber dequeueId(1);
- for (int i = 0; i < 5; i++) {
- listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
- }
- for (int i = 0; i < 3; i++) {
- listener.checkDequeue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), dequeueId++);
- }
- for (int i = 5; i < 10; i++) {
- listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
- }
- for (int i = 3; i < 10; i++) {
- listener.checkDequeue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), dequeueId++);
- }
-}
-
-QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly)
-{
- SessionFixture fixture;
- //register dummy event listener to broker
- EventRecorder listener;
- fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1));
-
- //declare queue with event options specified
- QueueOptions options;
- options.enableQueueEvents(true);
- std::string q("queue-events-test");
- fixture.session.queueDeclare(arg::queue=q, arg::arguments=options);
- //send and consume some messages
- LocalQueue incoming;
- Subscription sub = fixture.subs.subscribe(incoming, q);
- for (int i = 0; i < 5; i++) {
- fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
- }
- for (int i = 0; i < 3; i++) {
- BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
- }
- for (int i = 5; i < 10; i++) {
- fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
- }
- for (int i = 3; i < 10; i++) {
- BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
- }
- fixture.connection.close();
- fixture.broker->getQueueEvents().shutdown();
-
- //check listener was notified of all events, and in correct order
- SequenceNumber enqueueId(1);
- SequenceNumber dequeueId(1);
- for (int i = 0; i < 5; i++) {
- listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
- }
- for (int i = 5; i < 10; i++) {
- listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++);
- }
-}
-
-QPID_AUTO_TEST_SUITE_END()
-
-}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
index bd868398f8..d305ca452b 100644
--- a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
+++ b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
@@ -23,8 +23,8 @@
#include "unit_test.h"
#include "test_tools.h"
-#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldValue.h"
@@ -66,21 +66,19 @@ public:
return new TestFlow(flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
- static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& settings)
+ static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& arguments)
{
+ QueueSettings settings;
+ settings.populate(arguments, settings.storeSettings);
return QueueFlowLimit::createLimit(0, settings);
}
};
-
-
-QueuedMessage createMessage(uint32_t size)
+Message createMessage(uint32_t size)
{
static uint32_t seqNum;
- QueuedMessage msg;
- msg.payload = MessageUtils::createMessage();
- msg.position = ++seqNum;
- MessageUtils::addContent(msg.payload, std::string (size, 'x'));
+ Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size, 'x'));
+ msg.setSequence(++seqNum);
return msg;
}
}
@@ -100,7 +98,7 @@ QPID_AUTO_TEST_CASE(testFlowCount)
BOOST_CHECK(!flow->isFlowControlActive());
BOOST_CHECK(flow->monitorFlowControl());
- std::deque<QueuedMessage> msgs;
+ std::deque<Message> msgs;
for (size_t i = 0; i < 6; i++) {
msgs.push_back(createMessage(10));
flow->enqueued(msgs.back());
@@ -135,7 +133,6 @@ QPID_AUTO_TEST_CASE(testFlowCount)
BOOST_CHECK(!flow->isFlowControlActive()); // 4 on queue, OFF
}
-
QPID_AUTO_TEST_CASE(testFlowSize)
{
FieldTable args;
@@ -151,7 +148,7 @@ QPID_AUTO_TEST_CASE(testFlowSize)
BOOST_CHECK(!flow->isFlowControlActive());
BOOST_CHECK(flow->monitorFlowControl());
- std::deque<QueuedMessage> msgs;
+ std::deque<Message> msgs;
for (size_t i = 0; i < 6; i++) {
msgs.push_back(createMessage(10));
flow->enqueued(msgs.back());
@@ -161,14 +158,14 @@ QPID_AUTO_TEST_CASE(testFlowSize)
BOOST_CHECK_EQUAL(6u, flow->getFlowCount());
BOOST_CHECK_EQUAL(60u, flow->getFlowSize());
- QueuedMessage msg_9 = createMessage(9);
+ Message msg_9 = createMessage(9);
flow->enqueued(msg_9);
BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue
- QueuedMessage tinyMsg_1 = createMessage(1);
+ Message tinyMsg_1 = createMessage(1);
flow->enqueued(tinyMsg_1);
BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue
- QueuedMessage tinyMsg_2 = createMessage(1);
+ Message tinyMsg_2 = createMessage(1);
flow->enqueued(tinyMsg_2);
BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON
msgs.push_back(createMessage(10));
@@ -233,12 +230,12 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200);
args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100);
- std::deque<QueuedMessage> msgs_1;
- std::deque<QueuedMessage> msgs_10;
- std::deque<QueuedMessage> msgs_50;
- std::deque<QueuedMessage> msgs_100;
+ std::deque<Message> msgs_1;
+ std::deque<Message> msgs_10;
+ std::deque<Message> msgs_50;
+ std::deque<Message> msgs_100;
- QueuedMessage msg;
+ Message msg;
std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args));
BOOST_CHECK(!flow->isFlowControlActive()); // count:0 size:0
@@ -458,7 +455,6 @@ QPID_AUTO_TEST_CASE(testFlowDisable)
}
}
-
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp
index f735e09449..00e964602a 100644
--- a/qpid/cpp/src/tests/QueuePolicyTest.cpp
+++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp
@@ -22,12 +22,10 @@
#include "unit_test.h"
#include "test_tools.h"
-#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/client/QueueOptions.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/reply_exceptions.h"
-#include "MessageUtils.h"
#include "BrokerFixture.h"
using namespace qpid::broker;
@@ -39,118 +37,10 @@ namespace tests {
QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite)
-namespace {
-QueuedMessage createMessage(uint32_t size)
-{
- QueuedMessage msg;
- msg.payload = MessageUtils::createMessage();
- MessageUtils::addContent(msg.payload, std::string (size, 'x'));
- return msg;
-}
-}
-
-QPID_AUTO_TEST_CASE(testCount)
-{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 0));
- BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize());
- BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount());
-
- QueuedMessage msg = createMessage(10);
- for (size_t i = 0; i < 5; i++) {
- policy->tryEnqueue(msg.payload);
- }
- try {
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on enqueuing sixth message");
- } catch (const ResourceLimitExceededException&) {}
-
- policy->dequeued(msg);
- policy->tryEnqueue(msg.payload);
-
- try {
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)");
- } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_CASE(testSize)
-{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 0, 50));
- QueuedMessage msg = createMessage(10);
-
- for (size_t i = 0; i < 5; i++) {
- policy->tryEnqueue(msg.payload);
- }
- try {
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
- } catch (const ResourceLimitExceededException&) {}
-
- policy->dequeued(msg);
- policy->tryEnqueue(msg.payload);
-
- try {
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy);
- } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_CASE(testBoth)
-{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 50));
- try {
- QueuedMessage msg = createMessage(51);
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy);
- } catch (const ResourceLimitExceededException&) {}
-
- std::vector<QueuedMessage> messages;
- messages.push_back(createMessage(15));
- messages.push_back(createMessage(10));
- messages.push_back(createMessage(11));
- messages.push_back(createMessage(2));
- messages.push_back(createMessage(7));
- for (size_t i = 0; i < messages.size(); i++) {
- policy->tryEnqueue(messages[i].payload);
- }
- //size = 45 at this point, count = 5
- try {
- QueuedMessage msg = createMessage(5);
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy);
- } catch (const ResourceLimitExceededException&) {}
- try {
- QueuedMessage msg = createMessage(10);
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
- } catch (const ResourceLimitExceededException&) {}
-
-
- policy->dequeued(messages[0]);
- try {
- QueuedMessage msg = createMessage(20);
- policy->tryEnqueue(msg.payload);
- } catch (const ResourceLimitExceededException&) {
- BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy);
- }
-}
-
-QPID_AUTO_TEST_CASE(testSettings)
-{
- //test reading and writing the policy from/to field table
- std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy("test", 101, 303));
- FieldTable settings;
- a->update(settings);
- std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy("test", settings));
- BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount());
- BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize());
-}
-
QPID_AUTO_TEST_CASE(testRingPolicyCount)
{
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING);
- policy->update(args);
+ QueueOptions args;
+ args.setSizePolicy(RING, 0, 5);
SessionFixture f;
std::string q("my-ring-queue");
@@ -183,9 +73,8 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
// Ring queue, 500 bytes maxSize
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING);
- policy->update(args);
+ QueueOptions args;
+ args.setSizePolicy(RING, 500, 0);
SessionFixture f;
std::string q("my-ring-queue");
@@ -255,9 +144,9 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
QPID_AUTO_TEST_CASE(testStrictRingPolicy)
{
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT);
- policy->update(args);
+ QueueOptions args;
+ args.setSizePolicy(RING_STRICT, 0, 5);
+ args.setString("qpid.flow_stop_count", "0");
SessionFixture f;
std::string q("my-ring-queue");
@@ -281,9 +170,8 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy)
QPID_AUTO_TEST_CASE(testPolicyWithDtx)
{
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
- policy->update(args);
+ QueueOptions args;
+ args.setSizePolicy(REJECT, 0, 5);
SessionFixture f;
std::string q("my-policy-queue");
@@ -367,9 +255,8 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNoStore)
QPID_AUTO_TEST_CASE(testPolicyFailureOnCommit)
{
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
- policy->update(args);
+ QueueOptions args;
+ args.setSizePolicy(REJECT, 0, 5);
SessionFixture f;
std::string q("q");
diff --git a/qpid/cpp/src/tests/QueueRegistryTest.cpp b/qpid/cpp/src/tests/QueueRegistryTest.cpp
index ae555539a4..364d66c525 100644
--- a/qpid/cpp/src/tests/QueueRegistryTest.cpp
+++ b/qpid/cpp/src/tests/QueueRegistryTest.cpp
@@ -19,6 +19,7 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
#include "unit_test.h"
#include <string>
@@ -36,33 +37,23 @@ QPID_AUTO_TEST_CASE(testDeclare)
QueueRegistry reg;
std::pair<Queue::shared_ptr, bool> qc;
- qc = reg.declare(foo, false, 0, 0);
+ qc = reg.declare(foo, QueueSettings());
Queue::shared_ptr q = qc.first;
BOOST_CHECK(q);
BOOST_CHECK(qc.second); // New queue
BOOST_CHECK_EQUAL(foo, q->getName());
- qc = reg.declare(foo, false, 0, 0);
+ qc = reg.declare(foo, QueueSettings());
BOOST_CHECK_EQUAL(q, qc.first);
BOOST_CHECK(!qc.second);
- qc = reg.declare(bar, false, 0, 0);
+ qc = reg.declare(bar, QueueSettings());
q = qc.first;
BOOST_CHECK(q);
BOOST_CHECK_EQUAL(true, qc.second);
BOOST_CHECK_EQUAL(bar, q->getName());
}
-QPID_AUTO_TEST_CASE(testDeclareTmp)
-{
- QueueRegistry reg;
- std::pair<Queue::shared_ptr, bool> qc;
-
- qc = reg.declare(std::string(), false, 0, 0);
- BOOST_CHECK(qc.second);
- BOOST_CHECK_EQUAL(std::string("tmp_1"), qc.first->getName());
-}
-
QPID_AUTO_TEST_CASE(testFind)
{
std::string foo("foo");
@@ -72,8 +63,8 @@ QPID_AUTO_TEST_CASE(testFind)
BOOST_CHECK(reg.find(foo) == 0);
- reg.declare(foo, false, 0, 0);
- reg.declare(bar, false, 0, 0);
+ reg.declare(foo, QueueSettings());
+ reg.declare(bar, QueueSettings());
Queue::shared_ptr q = reg.find(bar);
BOOST_CHECK(q);
BOOST_CHECK_EQUAL(bar, q->getName());
@@ -85,7 +76,7 @@ QPID_AUTO_TEST_CASE(testDestroy)
QueueRegistry reg;
std::pair<Queue::shared_ptr, bool> qc;
- qc = reg.declare(foo, false, 0, 0);
+ qc = reg.declare(foo, QueueSettings());
reg.destroy(foo);
// Queue is gone from the registry.
BOOST_CHECK(reg.find(foo) == 0);
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index aa7132828a..3dfe3863f4 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -38,8 +38,8 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/QueueSettings.h"
#include <iostream>
#include <vector>
@@ -56,196 +56,47 @@ using namespace qpid::sys;
namespace qpid {
namespace tests {
-
class TestConsumer : public virtual Consumer{
public:
typedef boost::shared_ptr<TestConsumer> shared_ptr;
- QueuedMessage last;
+ QueueCursor lastCursor;
+ Message lastMessage;
bool received;
- TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {};
+ TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER), received(false) {};
- virtual bool deliver(QueuedMessage& msg){
- last = msg;
+ virtual bool deliver(const QueueCursor& cursor, const Message& message){
+ lastCursor = cursor;
+ lastMessage = message;
received = true;
return true;
};
void notify() {}
void cancel() {}
- void acknowledged(const QueuedMessage&) {}
+ void acknowledged(const DeliveryRecord&) {}
OwnershipToken* getSession() { return 0; }
};
class FailOnDeliver : public Deliverable
{
- boost::intrusive_ptr<Message> msg;
+ Message msg;
public:
FailOnDeliver() : msg(MessageUtils::createMessage()) {}
void deliverTo(const boost::shared_ptr<Queue>& queue)
{
throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
}
- Message& getMessage() { return *(msg.get()); }
+ Message& getMessage() { return msg; }
};
-intrusive_ptr<Message> createMessage(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
- intrusive_ptr<Message> msg(new Message());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- msg->getFrames().append(method);
- msg->getFrames().append(header);
- msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
- if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl);
- return msg;
-}
-
-intrusive_ptr<Message> contentMessage(string content) {
- intrusive_ptr<Message> m(MessageUtils::createMessage());
- MessageUtils::addContent(m, content);
- return m;
-}
-
-string getContent(intrusive_ptr<Message> m) {
- return m->getFrames().getContent();
-}
-
QPID_AUTO_TEST_SUITE(QueueTestSuite)
-QPID_AUTO_TEST_CASE(testAsyncMessage) {
- Queue::shared_ptr queue(new Queue("my_test_queue", true));
- intrusive_ptr<Message> received;
-
- TestConsumer::shared_ptr c1(new TestConsumer());
- queue->consume(c1);
-
-
- //Test basic delivery:
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
- queue->process(msg1);
- sleep(2);
-
- BOOST_CHECK(!c1->received);
- msg1->enqueueComplete();
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg1.get(), received.get());
-}
-
-
-QPID_AUTO_TEST_CASE(testAsyncMessageCount){
- Queue::shared_ptr queue(new Queue("my_test_queue", true));
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process
-
- queue->process(msg1);
- sleep(2);
- uint32_t compval=0;
- BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
- msg1->enqueueComplete();
- compval=1;
- BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount());
- BOOST_CHECK_EQUAL(compval, queue->getMessageCount());
-}
-
-QPID_AUTO_TEST_CASE(testConsumers){
- Queue::shared_ptr queue(new Queue("my_queue", true));
-
- //Test adding consumers:
- TestConsumer::shared_ptr c1(new TestConsumer());
- TestConsumer::shared_ptr c2(new TestConsumer());
- queue->consume(c1);
- queue->consume(c2);
-
- BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount());
-
- //Test basic delivery:
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
- queue->deliver(msg1);
- BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get());
-
- queue->deliver(msg2);
- BOOST_CHECK(queue->dispatch(c2));
- BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get());
-
- c1->received = false;
- queue->deliver(msg3);
- BOOST_CHECK(queue->dispatch(c1));
- BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get());
-
- //Test cancellation:
- queue->cancel(c1);
- BOOST_CHECK_EQUAL(uint32_t(1), queue->getConsumerCount());
- queue->cancel(c2);
- BOOST_CHECK_EQUAL(uint32_t(0), queue->getConsumerCount());
-}
-
-QPID_AUTO_TEST_CASE(testRegistry){
- //Test use of queues in registry:
- QueueRegistry registry;
- registry.declare("queue1", true, true);
- registry.declare("queue2", true, true);
- registry.declare("queue3", true, true);
-
- BOOST_CHECK(registry.find("queue1"));
- BOOST_CHECK(registry.find("queue2"));
- BOOST_CHECK(registry.find("queue3"));
-
- registry.destroy("queue1");
- registry.destroy("queue2");
- registry.destroy("queue3");
-
- BOOST_CHECK(!registry.find("queue1"));
- BOOST_CHECK(!registry.find("queue2"));
- BOOST_CHECK(!registry.find("queue3"));
-}
-
-QPID_AUTO_TEST_CASE(testDequeue){
- Queue::shared_ptr queue(new Queue("my_queue", true));
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
- intrusive_ptr<Message> received;
-
- queue->deliver(msg1);
- queue->deliver(msg2);
- queue->deliver(msg3);
-
- BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg1.get(), received.get());
- BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount());
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg2.get(), received.get());
- BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount());
-
- TestConsumer::shared_ptr consumer(new TestConsumer());
- queue->consume(consumer);
- queue->dispatch(consumer);
- if (!consumer->received)
- sleep(2);
-
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
- BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
-
- received = queue->get().payload;
- BOOST_CHECK(!received);
- BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
-
-}
-
QPID_AUTO_TEST_CASE(testBound){
//test the recording of bindings, and use of those to allow a queue to be unbound
string key("my-key");
FieldTable args;
- Queue::shared_ptr queue(new Queue("my-queue", true));
+ Queue::shared_ptr queue(new Queue("my-queue"));
ExchangeRegistry exchanges;
//establish bindings from exchange->queue and notify the queue as it is bound:
Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first;
@@ -273,423 +124,69 @@ QPID_AUTO_TEST_CASE(testBound){
exchange3->route(deliverable);
}
-QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
- client::QueueOptions args;
- args.setPersistLastNode();
-
- Queue::shared_ptr queue(new Queue("my-queue", true));
- queue->configure(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
- //enqueue 2 messages
- queue->deliver(msg1);
- queue->deliver(msg2);
-
- //change mode
- queue->setLastNodeFailure();
-
- //enqueue 1 message
- queue->deliver(msg3);
-
- //check all have persistent ids.
- BOOST_CHECK(msg1->isPersistent());
- BOOST_CHECK(msg2->isPersistent());
- BOOST_CHECK(msg3->isPersistent());
-
-}
-
-
-QPID_AUTO_TEST_CASE(testSeek){
-
- Queue::shared_ptr queue(new Queue("my-queue", true));
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
- //enqueue 2 messages
- queue->deliver(msg1);
- queue->deliver(msg2);
- queue->deliver(msg3);
-
- TestConsumer::shared_ptr consumer(new TestConsumer("test", false));
- SequenceNumber seq(2);
- consumer->setPosition(seq);
-
- QueuedMessage qm;
- queue->dispatch(consumer);
-
- BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get());
- queue->dispatch(consumer);
- queue->dispatch(consumer); // make sure over-run is safe
-
-}
-
-QPID_AUTO_TEST_CASE(testSearch){
-
- Queue::shared_ptr queue(new Queue("my-queue", true));
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
-
- //enqueue 2 messages
- queue->deliver(msg1);
- queue->deliver(msg2);
- queue->deliver(msg3);
-
- SequenceNumber seq(2);
- QueuedMessage qm;
- TestConsumer::shared_ptr c1(new TestConsumer());
-
- BOOST_CHECK(queue->find(seq, qm));
+QPID_AUTO_TEST_CASE(testLVQ){
- BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue());
-
- queue->acquire(qm, c1->getName());
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
- SequenceNumber seq1(3);
- QueuedMessage qm1;
- BOOST_CHECK(queue->find(seq1, qm1));
- BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue());
-
-}
-const std::string nullxid = "";
-
-class SimpleDummyCtxt : public TransactionContext {};
+ QueueSettings settings;
+ string key="key";
+ settings.lvqKey = key;
+ QueueFactory factory;
+ Queue::shared_ptr q(factory.create("my-queue", settings));
-class DummyCtxt : public TPCTransactionContext
-{
- const std::string xid;
- public:
- DummyCtxt(const std::string& _xid) : xid(_xid) {}
- static std::string getXid(TransactionContext& ctxt)
- {
- DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
- return c ? c->xid : nullxid;
+ const char* values[] = { "a", "b", "c", "a"};
+ for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
+ qpid::types::Variant::Map properties;
+ properties[key] = values[i];
+ q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
}
-};
-
-class TestMessageStoreOC : public MessageStore
-{
- std::set<std::string> prepared;
- uint64_t nextPersistenceId;
- public:
-
- uint enqCnt;
- uint deqCnt;
- bool error;
-
- TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {}
- ~TestMessageStoreOC(){}
+ BOOST_CHECK_EQUAL(q->getMessageCount(), 3u);
- virtual void dequeue(TransactionContext*,
- const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
- const PersistableQueue& /*queue*/)
- {
- if (error) throw Exception("Dequeue error test");
- deqCnt++;
- }
+ TestConsumer::shared_ptr c(new TestConsumer("test", true));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(std::string("2"), c->lastMessage.getContent());
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(std::string("3"), c->lastMessage.getContent());
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(std::string("4"), c->lastMessage.getContent());
- virtual void enqueue(TransactionContext*,
- const boost::intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& /* queue */)
- {
- if (error) throw Exception("Enqueue error test");
- enqCnt++;
- msg->enqueueComplete();
- }
- void createError()
- {
- error=true;
+ const char* values2[] = { "a", "b", "c"};
+ for (size_t i = 0; i < sizeof(values2)/sizeof(values2[0]); ++i) {
+ qpid::types::Variant::Map properties;
+ properties[key] = values[i];
+ q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+5)));
}
+ BOOST_CHECK_EQUAL(q->getMessageCount(), 3u);
- bool init(const Options*) { return true; }
- void truncateInit(const bool) {}
- void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); }
- void destroy(PersistableQueue&) {}
- void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); }
- void destroy(const PersistableExchange&) {}
- void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
- void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {}
- void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); }
- void destroy(const PersistableConfig&) {}
- void stage(const boost::intrusive_ptr<PersistableMessage>&) {}
- void destroy(PersistableMessage&) {}
- void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {}
- void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&,
- std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); }
- void flush(const qpid::broker::PersistableQueue&) {}
- uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; }
-
- std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); }
- std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); }
- void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); }
- void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
- void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); }
- void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); }
-
- void recover(RecoveryManager&) {}
-};
-
-
-QPID_AUTO_TEST_CASE(testLVQOrdering){
-
- client::QueueOptions args;
- // set queue mode
- args.setOrdering(client::LVQ);
-
- Queue::shared_ptr queue(new Queue("my-queue", true ));
- queue->configure(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
- intrusive_ptr<Message> msg4 = createMessage("e", "D");
- intrusive_ptr<Message> received;
-
- //set deliever match for LVQ a,b,c,a
-
- string key;
- args.getLVQKey(key);
- BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"b");
- msg3->insertCustomProperty(key,"c");
- msg4->insertCustomProperty(key,"a");
-
- //enqueue 4 message
- queue->deliver(msg1);
- queue->deliver(msg2);
- queue->deliver(msg3);
- queue->deliver(msg4);
-
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg4.get(), received.get());
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg2.get(), received.get());
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg3.get(), received.get());
-
- intrusive_ptr<Message> msg5 = createMessage("e", "A");
- intrusive_ptr<Message> msg6 = createMessage("e", "B");
- intrusive_ptr<Message> msg7 = createMessage("e", "C");
- msg5->insertCustomProperty(key,"a");
- msg6->insertCustomProperty(key,"b");
- msg7->insertCustomProperty(key,"c");
- queue->deliver(msg5);
- queue->deliver(msg6);
- queue->deliver(msg7);
-
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg5.get(), received.get());
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg6.get(), received.get());
-
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg7.get(), received.get());
-
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(std::string("5"), c->lastMessage.getContent());
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(std::string("6"), c->lastMessage.getContent());
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(std::string("7"), c->lastMessage.getContent());
}
QPID_AUTO_TEST_CASE(testLVQEmptyKey){
- client::QueueOptions args;
- // set queue mode
- args.setOrdering(client::LVQ);
-
- Queue::shared_ptr queue(new Queue("my-queue", true ));
- queue->configure(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
-
- string key;
- args.getLVQKey(key);
- BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-
- msg1->insertCustomProperty(key,"a");
- queue->deliver(msg1);
- queue->deliver(msg2);
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
-
-}
-
-QPID_AUTO_TEST_CASE(testLVQAcquire){
-
- client::QueueOptions args;
- // set queue mode
- args.setOrdering(client::LVQ);
- // disable flow control, as this test violates the enqueue/dequeue sequence.
- args.setInt(QueueFlowLimit::flowStopCountKey, 0);
-
- Queue::shared_ptr queue(new Queue("my-queue", true ));
- queue->configure(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- intrusive_ptr<Message> msg3 = createMessage("e", "C");
- intrusive_ptr<Message> msg4 = createMessage("e", "D");
- intrusive_ptr<Message> msg5 = createMessage("e", "F");
- intrusive_ptr<Message> msg6 = createMessage("e", "G");
-
- //set deliever match for LVQ a,b,c,a
-
- string key;
- args.getLVQKey(key);
- BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
-
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"b");
- msg3->insertCustomProperty(key,"c");
- msg4->insertCustomProperty(key,"a");
- msg5->insertCustomProperty(key,"b");
- msg6->insertCustomProperty(key,"c");
-
- //enqueue 4 message
- queue->deliver(msg1);
- queue->deliver(msg2);
- queue->deliver(msg3);
- queue->deliver(msg4);
-
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
- framing::SequenceNumber sequence(1);
- QueuedMessage qmsg(queue.get(), msg1, sequence);
- QueuedMessage qmsg2(queue.get(), msg2, ++sequence);
- framing::SequenceNumber sequence1(10);
- QueuedMessage qmsg3(queue.get(), 0, sequence1);
- TestConsumer::shared_ptr dummy(new TestConsumer());
-
- BOOST_CHECK(!queue->acquire(qmsg, dummy->getName()));
- BOOST_CHECK(queue->acquire(qmsg2, dummy->getName()));
- // Acquire the massage again to test failure case.
- BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName()));
- BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName()));
-
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
-
- queue->deliver(msg5);
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
- // set mode to no browse and check
- args.setOrdering(client::LVQ_NO_BROWSE);
- queue->configure(args);
- TestConsumer::shared_ptr c1(new TestConsumer("test", false));
-
- queue->dispatch(c1);
- queue->dispatch(c1);
- queue->dispatch(c1);
-
- queue->deliver(msg6);
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
-
- intrusive_ptr<Message> received;
- received = queue->get().payload;
- BOOST_CHECK_EQUAL(msg4.get(), received.get());
-
-}
-
-QPID_AUTO_TEST_CASE(testLVQMultiQueue){
-
- client::QueueOptions args;
- // set queue mode
- args.setOrdering(client::LVQ);
-
- Queue::shared_ptr queue1(new Queue("my-queue", true ));
- Queue::shared_ptr queue2(new Queue("my-queue", true ));
- intrusive_ptr<Message> received;
- queue1->configure(args);
- queue2->configure(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "A");
-
- string key;
- args.getLVQKey(key);
- BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"a");
-
- queue1->deliver(msg1);
- queue2->deliver(msg1);
- queue1->deliver(msg2);
-
- received = queue1->get().payload;
- BOOST_CHECK_EQUAL(msg2.get(), received.get());
-
- received = queue2->get().payload;
- BOOST_CHECK_EQUAL(msg1.get(), received.get());
+ QueueSettings settings;
+ string key="key";
+ settings.lvqKey = key;
+ QueueFactory factory;
+ Queue::shared_ptr q(factory.create("my-queue", settings));
-}
-QPID_AUTO_TEST_CASE(testLVQRecover){
-
-/* simulate this
- 1. start 2 nodes
- 2. create cluster durable lvq
- 3. send a transient message to the queue
- 4. kill one of the nodes (to trigger force persistent behaviour)...
- 5. then restart it (to turn off force persistent behaviour)
- 6. send another transient message with same lvq key as in 3
- 7. kill the second node again (retrigger force persistent)
- 8. stop and recover the first node
-*/
- TestMessageStoreOC testStore;
- client::QueueOptions args;
- // set queue mode
- args.setOrdering(client::LVQ);
- args.setPersistLastNode();
-
- Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
- intrusive_ptr<Message> received;
- queue1->create(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
- intrusive_ptr<Message> msg2 = createMessage("e", "A");
- // 2
- string key;
- args.getLVQKey(key);
- BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
-
- msg1->insertCustomProperty(key,"a");
- msg2->insertCustomProperty(key,"a");
- // 3
- queue1->deliver(msg1);
- // 4
- queue1->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
- // 5
- queue1->clearLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
- // 6
- queue1->deliver(msg2);
- BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
- queue1->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
- BOOST_CHECK_EQUAL(testStore.deqCnt, 1u);
+ qpid::types::Variant::Map properties;
+ properties["key"] = "a";
+ q->deliver(MessageUtils::createMessage(properties, "one"));
+ properties.clear();
+ q->deliver(MessageUtils::createMessage(properties, "two"));
+ BOOST_CHECK_EQUAL(q->getMessageCount(), 2u);
}
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
- intrusive_ptr<Message> m = createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
- m->computeExpiration(new broker::ExpiryPolicy);
+ Message m = MessageUtils::createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
+ m.computeExpiration(new broker::ExpiryPolicy);
queue.deliver(m);
}
}
@@ -706,7 +203,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) {
QPID_AUTO_TEST_CASE(testQueueCleaner) {
Timer timer;
QueueRegistry queues;
- Queue::shared_ptr queue = queues.declare("my-queue").first;
+ Queue::shared_ptr queue = queues.declare("my-queue", QueueSettings()).first;
addMessagesToQueue(10, *queue, 200, 400);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
@@ -717,44 +214,57 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
::usleep(300*1000);
BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
}
-
-
namespace {
- // helper for group tests
- void verifyAcquire( Queue::shared_ptr queue,
- TestConsumer::shared_ptr c,
- std::deque<QueuedMessage>& results,
- const std::string& expectedGroup,
- const int expectedId )
- {
- BOOST_CHECK(queue->dispatch(c));
- results.push_back(c->last);
- std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
- int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+int getIntProperty(const Message& message, const std::string& key)
+{
+ qpid::types::Variant v = message.getProperty(key);
+ int i(0);
+ if (!v.isVoid()) i = v;
+ return i;
+}
+// helper for group tests
+void verifyAcquire( Queue::shared_ptr queue,
+ TestConsumer::shared_ptr c,
+ std::deque<QueueCursor>& results,
+ const std::string& expectedGroup,
+ const int expectedId )
+{
+ bool success = queue->dispatch(c);
+ BOOST_CHECK(success);
+ if (success) {
+ results.push_back(c->lastCursor);
+ std::string group = c->lastMessage.getPropertyAsString("GROUP-ID");
+ int id = getIntProperty(c->lastMessage, "MY-ID");
BOOST_CHECK_EQUAL( group, expectedGroup );
BOOST_CHECK_EQUAL( id, expectedId );
}
}
+Message createGroupMessage(int id, const std::string& group)
+{
+ qpid::types::Variant::Map properties;
+ properties["GROUP-ID"] = group;
+ properties["MY-ID"] = id;
+ return MessageUtils::createMessage(properties);
+}
+}
+
QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
//
// Verify that consumers of grouped messages own the groups once a message is acquired,
// and release the groups once all acquired messages have been dequeued or requeued
//
- FieldTable args;
- Queue::shared_ptr queue(new Queue("my_queue", true));
- args.setString("qpid.group_header_key", "GROUP-ID");
- args.setInt("qpid.shared_msg_group", 1);
- queue->configure(args);
+ QueueSettings settings;
+ settings.shareGroups = 1;
+ settings.groupKey = "GROUP-ID";
+ QueueFactory factory;
+ Queue::shared_ptr queue(factory.create("my_queue", settings));
std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
std::string("b"), std::string("b"), std::string("b"),
std::string("c"), std::string("c"), std::string("c") };
for (int i = 0; i < 9; ++i) {
- intrusive_ptr<Message> msg = createMessage("e", "A");
- msg->insertCustomProperty("GROUP-ID", groups[i]);
- msg->insertCustomProperty("MY-ID", i);
- queue->deliver(msg);
+ queue->deliver(createGroupMessage(i, groups[i]));
}
// Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
@@ -768,8 +278,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
queue->consume(c1);
queue->consume(c2);
- std::deque<QueuedMessage> dequeMeC1;
- std::deque<QueuedMessage> dequeMeC2;
+ std::deque<QueueCursor> dequeMeC1;
+ std::deque<QueueCursor> dequeMeC2;
verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0)
@@ -828,9 +338,9 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
// Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
// what happens if C-2 "requeues" a-1 and a-2?
- queue->requeue( dequeMeC2.front() );
+ queue->release( dequeMeC2.front() );
dequeMeC2.pop_front();
- queue->requeue( dequeMeC2.front() );
+ queue->release( dequeMeC2.front() );
dequeMeC2.pop_front(); // now just has c-7 acquired
// Queue = a-1, a-2, b-4, b-5, c-7, c-8...
@@ -855,9 +365,9 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
gotOne = queue->dispatch(c2);
BOOST_CHECK( !gotOne );
- // requeue all of C1's acquired messages, then cancel C1
+ // release all of C1's acquired messages, then cancel C1
while (!dequeMeC1.empty()) {
- queue->requeue(dequeMeC1.front());
+ queue->release(dequeMeC1.front());
dequeMeC1.pop_front();
}
queue->cancel(c1);
@@ -877,7 +387,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
// Owners= ---, ---, ---
TestConsumer::shared_ptr c3(new TestConsumer("C3"));
- std::deque<QueuedMessage> dequeMeC3;
+ std::deque<QueueCursor> dequeMeC3;
verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
@@ -897,11 +407,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
// Queue = a-2,
// Owners= ^C3,
-
- intrusive_ptr<Message> msg = createMessage("e", "A");
- msg->insertCustomProperty("GROUP-ID", "a");
- msg->insertCustomProperty("MY-ID", 9);
- queue->deliver(msg);
+ queue->deliver(createGroupMessage(9, "a"));
// Queue = a-2, a-9
// Owners= ^C3, ^C3
@@ -909,10 +415,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
gotOne = queue->dispatch(c2);
BOOST_CHECK( !gotOne );
- msg = createMessage("e", "A");
- msg->insertCustomProperty("GROUP-ID", "b");
- msg->insertCustomProperty("MY-ID", 10);
- queue->deliver(msg);
+ queue->deliver(createGroupMessage(10, "b"));
// Queue = a-2, a-9, b-10
// Owners= ^C3, ^C3, ----
@@ -933,17 +436,17 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
// Verify that the same default group name is automatically applied to messages that
// do not specify a group name.
//
- FieldTable args;
- Queue::shared_ptr queue(new Queue("my_queue", true));
- args.setString("qpid.group_header_key", "GROUP-ID");
- args.setInt("qpid.shared_msg_group", 1);
- queue->configure(args);
+ QueueSettings settings;
+ settings.shareGroups = 1;
+ settings.groupKey = "GROUP-ID";
+ QueueFactory factory;
+ Queue::shared_ptr queue(factory.create("my_queue", settings));
for (int i = 0; i < 3; ++i) {
- intrusive_ptr<Message> msg = createMessage("e", "A");
+ qpid::types::Variant::Map properties;
// no "GROUP-ID" header
- msg->insertCustomProperty("MY-ID", i);
- queue->deliver(msg);
+ properties["MY-ID"] = i;
+ queue->deliver(MessageUtils::createMessage(properties));
}
// Queue = 0, 1, 2
@@ -956,20 +459,20 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
queue->consume(c1);
queue->consume(c2);
- std::deque<QueuedMessage> dequeMeC1;
- std::deque<QueuedMessage> dequeMeC2;
+ std::deque<QueueCursor> dequeMeC1;
+ std::deque<QueueCursor> dequeMeC2;
queue->dispatch(c1); // c1 now owns default group (acquired 0)
- dequeMeC1.push_back(c1->last);
- int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ dequeMeC1.push_back(c1->lastCursor);
+ int id = getIntProperty(c1->lastMessage, "MY-ID");
BOOST_CHECK_EQUAL( id, 0 );
bool gotOne = queue->dispatch(c2); // c2 should get nothing
BOOST_CHECK( !gotOne );
queue->dispatch(c1); // c1 now acquires 1
- dequeMeC1.push_back(c1->last);
- id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ dequeMeC1.push_back(c1->lastCursor);
+ id = getIntProperty(c1->lastMessage, "MY-ID");
BOOST_CHECK_EQUAL( id, 1 );
gotOne = queue->dispatch(c2); // c2 should still get nothing
@@ -982,7 +485,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
// now default group should be available...
queue->dispatch(c2); // c2 now owns default group (acquired 2)
- id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ id = c2->lastMessage.getProperty("MY-ID");
BOOST_CHECK_EQUAL( id, 2 );
gotOne = queue->dispatch(c1); // c1 should get nothing
@@ -992,556 +495,128 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
queue->cancel(c2);
}
-QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
-
- TestMessageStoreOC testStore;
- client::QueueOptions args;
- args.setPersistLastNode();
-
- Queue::shared_ptr queue1(new Queue("queue1", true, &testStore ));
- queue1->create(args);
- Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
- queue2->create(args);
-
- intrusive_ptr<Message> msg1 = createMessage("e", "A");
-
- queue1->deliver(msg1);
- queue2->deliver(msg1);
-
- //change mode
- queue1->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
- queue2->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
-
- // check they don't get stored twice
- queue1->setLastNodeFailure();
- queue2->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
-
- intrusive_ptr<Message> msg2 = createMessage("e", "B");
- queue1->deliver(msg2);
- queue2->deliver(msg2);
-
- queue1->clearLastNodeFailure();
- queue2->clearLastNodeFailure();
- // check only new messages get forced
- queue1->setLastNodeFailure();
- queue2->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
-
- // check no failure messages are stored
- queue1->clearLastNodeFailure();
- queue2->clearLastNodeFailure();
-
- intrusive_ptr<Message> msg3 = createMessage("e", "B");
- queue1->deliver(msg3);
- queue2->deliver(msg3);
- BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
- queue1->setLastNodeFailure();
- queue2->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 6u);
-
- /**
- * TODO: Fix or replace the following test which incorrectly requeues a
- * message that was never on the queue in the first place. This relied on
- * internal details not part of the queue abstraction.
-
- // check requeue 1
- intrusive_ptr<Message> msg4 = createMessage("e", "C");
- intrusive_ptr<Message> msg5 = createMessage("e", "D");
-
- framing::SequenceNumber sequence(1);
- QueuedMessage qmsg1(queue1.get(), msg4, sequence);
- QueuedMessage qmsg2(queue2.get(), msg5, ++sequence);
-
- queue1->requeue(qmsg1);
- BOOST_CHECK_EQUAL(testStore.enqCnt, 7u);
-
- // check requeue 2
- queue2->clearLastNodeFailure();
- queue2->requeue(qmsg2);
- BOOST_CHECK_EQUAL(testStore.enqCnt, 7u);
- queue2->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
-
- queue2->clearLastNodeFailure();
- queue2->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 8u);
- */
-}
-
-QPID_AUTO_TEST_CASE(testLastNodeRecoverAndFail){
-/*
-simulate this:
- 1. start two nodes
- 2. create cluster durable queue and add some messages
- 3. kill one node (trigger force-persistent behaviour)
- 4. stop and recover remaining node
- 5. add another node
- 6. kill that new node again
-make sure that an attempt to re-enqueue a message does not happen which will
-result in the last man standing exiting with an error.
-
-we need to make sure that recover is safe, i.e. messages are
-not requeued to the store.
-*/
- TestMessageStoreOC testStore;
- client::QueueOptions args;
- // set queue mode
- args.setPersistLastNode();
-
- Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
- intrusive_ptr<Message> received;
- queue1->create(args);
-
- // check requeue 1
- intrusive_ptr<Message> msg1 = createMessage("e", "C");
- intrusive_ptr<Message> msg2 = createMessage("e", "D");
-
- queue1->recover(msg1);
-
- queue1->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-
- queue1->clearLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-
- queue1->deliver(msg2);
- BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
- queue1->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 1u);
-
-}
-
-QPID_AUTO_TEST_CASE(testLastNodeJournalError){
-/*
-simulate store exception going into last node standing
-
-*/
- TestMessageStoreOC testStore;
- client::QueueOptions args;
- // set queue mode
- args.setPersistLastNode();
-
- Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
- intrusive_ptr<Message> received;
- queue1->configure(args);
-
- // check requeue 1
- intrusive_ptr<Message> msg1 = createMessage("e", "C");
-
- queue1->deliver(msg1);
- testStore.createError();
-
- ScopedSuppressLogging sl; // Suppress messages for expected errors.
- queue1->setLastNodeFailure();
- BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
-
-}
-
-intrusive_ptr<Message> mkMsg(MessageStore& store, std::string content = "", bool durable = false)
-{
- intrusive_ptr<Message> msg = MessageUtils::createMessage("", "", durable);
- if (content.size()) MessageUtils::addContent(msg, content);
- msg->setStore(&store);
- return msg;
-}
-
-QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
-
- TestMessageStoreOC testStore;
- client::QueueOptions args0; // No size policy
- client::QueueOptions args1;
- args1.setSizePolicy(FLOW_TO_DISK, 0, 1);
- client::QueueOptions args2;
- args2.setSizePolicy(FLOW_TO_DISK, 0, 2);
-
- // --- Fanout exchange bound to single transient queue -------------------------------------------------------------
-
- FanOutExchange sbtFanout1("sbtFanout1", false, args0); // single binding to transient queue
- Queue::shared_ptr tq1(new Queue("tq1", true)); // transient w/ limit
- tq1->configure(args1);
- sbtFanout1.bind(tq1, "", 0);
-
- intrusive_ptr<Message> msg01 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg01(msg01);
- sbtFanout1.route(dmsg01); // Brings queue 1 to capacity limit
- msg01->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg01->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
- intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg02(msg02);
- {
- ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg02), ResourceLimitExceededException);
- }
- msg02->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg02->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
- intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
- DeliverableMessage dmsg03(msg03);
- {
- ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg03), ResourceLimitExceededException);
- }
- msg03->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg03->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
- intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content
- DeliverableMessage dmsg04(msg04);
- {
- ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg04), ResourceLimitExceededException);
- }
- msg04->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg04->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
- intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content
- DeliverableMessage dmsg05(msg05);
- {
- ScopedSuppressLogging sl; // suppress expected error messages.
- BOOST_CHECK_THROW(sbtFanout1.route(dmsg05), ResourceLimitExceededException);
- }
- msg05->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg05->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, tq1->getMessageCount());
-
- // --- Fanout exchange bound to single durable queue ---------------------------------------------------------------
-
- FanOutExchange sbdFanout2("sbdFanout2", false, args0); // single binding to durable queue
- Queue::shared_ptr dq2(new Queue("dq2", true, &testStore)); // durable w/ limit
- dq2->configure(args1);
- sbdFanout2.bind(dq2, "", 0);
-
- intrusive_ptr<Message> msg06 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg06(msg06);
- sbdFanout2.route(dmsg06); // Brings queue 2 to capacity limit
- msg06->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg06->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, dq2->getMessageCount());
-
- intrusive_ptr<Message> msg07 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg07(msg07);
- sbdFanout2.route(dmsg07);
- msg07->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg07->isContentReleased(), true);
- BOOST_CHECK_EQUAL(2u, dq2->getMessageCount());
-
- intrusive_ptr<Message> msg08 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
- DeliverableMessage dmsg08(msg08);
- sbdFanout2.route(dmsg08);
- msg08->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg08->isContentReleased(), true);
- BOOST_CHECK_EQUAL(3u, dq2->getMessageCount());
-
- intrusive_ptr<Message> msg09 = mkMsg(testStore); // transient no content
- DeliverableMessage dmsg09(msg09);
- sbdFanout2.route(dmsg09);
- msg09->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg09->isContentReleased(), true);
- BOOST_CHECK_EQUAL(4u, dq2->getMessageCount());
-
- intrusive_ptr<Message> msg10 = mkMsg(testStore, "", true); // durable no content
- DeliverableMessage dmsg10(msg10);
- sbdFanout2.route(dmsg10);
- msg10->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg10->isContentReleased(), true);
- BOOST_CHECK_EQUAL(5u, dq2->getMessageCount());
-
- // --- Fanout exchange bound to multiple durable queues ------------------------------------------------------------
-
- FanOutExchange mbdFanout3("mbdFanout3", false, args0); // multiple bindings to durable queues
- Queue::shared_ptr dq3(new Queue("dq3", true, &testStore)); // durable w/ limit 2
- dq3->configure(args2);
- mbdFanout3.bind(dq3, "", 0);
- Queue::shared_ptr dq4(new Queue("dq4", true, &testStore)); // durable w/ limit 1
- dq4->configure(args1);
- mbdFanout3.bind(dq4, "", 0);
- Queue::shared_ptr dq5(new Queue("dq5", true, &testStore)); // durable no limit
- dq5->configure(args0);
- mbdFanout3.bind(dq5, "", 0);
-
- intrusive_ptr<Message> msg11 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg11(msg11);
- mbdFanout3.route(dmsg11); // Brings queues 3 and 4 to capacity limit
- msg11->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg11->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(1u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(1u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg12 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg12(msg12);
- mbdFanout3.route(dmsg12);
- msg12->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg12->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
- BOOST_CHECK_EQUAL(2u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(2u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(2u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg13 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
- DeliverableMessage dmsg13(msg13);
- mbdFanout3.route(dmsg13);
- msg13->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg13->isContentReleased(), true);
- BOOST_CHECK_EQUAL(3u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(3u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(3u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg14 = mkMsg(testStore); // transient no content
- DeliverableMessage dmsg14(msg14);
- mbdFanout3.route(dmsg14);
- msg14->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg14->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point!
- BOOST_CHECK_EQUAL(4u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(4u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(4u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg15 = mkMsg(testStore, "", true); // durable no content
- DeliverableMessage dmsg15(msg15);
- mbdFanout3.route(dmsg15);
- msg15->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg15->isContentReleased(), true);
- BOOST_CHECK_EQUAL(5u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(5u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(5u, dq5->getMessageCount());
-
- // Bind a transient queue, this should block the release of any further messages.
- // Note: this will result in a violation of the count policy of dq3 and dq4 - but this
- // is expected until a better overall multi-queue design is implemented. Similarly
- // for the other tests in this section.
-
- Queue::shared_ptr tq6(new Queue("tq6", true)); // transient no limit
- tq6->configure(args0);
- mbdFanout3.bind(tq6, "", 0);
-
- intrusive_ptr<Message> msg16 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg16(msg16);
- mbdFanout3.route(dmsg16);
- msg16->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg16->isContentReleased(), false);
- BOOST_CHECK_EQUAL(6u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(6u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(6u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg17 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
- DeliverableMessage dmsg17(msg17);
- mbdFanout3.route(dmsg17);
- msg17->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg17->isContentReleased(), false);
- BOOST_CHECK_EQUAL(7u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(7u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(7u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg18 = mkMsg(testStore); // transient no content
- DeliverableMessage dmsg18(msg18);
- mbdFanout3.route(dmsg18);
- msg18->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg18->isContentReleased(), false);
- BOOST_CHECK_EQUAL(8u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(8u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(8u, dq5->getMessageCount());
-
- intrusive_ptr<Message> msg19 = mkMsg(testStore, "", true); // durable no content
- DeliverableMessage dmsg19(msg19);
- mbdFanout3.route(dmsg19);
- msg19->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg19->isContentReleased(), false);
- BOOST_CHECK_EQUAL(9u, dq3->getMessageCount());
- BOOST_CHECK_EQUAL(9u, dq4->getMessageCount());
- BOOST_CHECK_EQUAL(9u, dq5->getMessageCount());
-
-
- // --- Fanout exchange bound to multiple durable and transient queues ----------------------------------------------
-
- FanOutExchange mbmFanout4("mbmFanout4", false, args0); // multiple bindings to durable/transient queues
- Queue::shared_ptr dq7(new Queue("dq7", true, &testStore)); // durable no limit
- dq7->configure(args0);
- mbmFanout4.bind(dq7, "", 0);
- Queue::shared_ptr dq8(new Queue("dq8", true, &testStore)); // durable w/ limit
- dq8->configure(args1);
- mbmFanout4.bind(dq8, "", 0);
- Queue::shared_ptr tq9(new Queue("tq9", true)); // transient no limit
- tq9->configure(args0);
- mbmFanout4.bind(tq9, "", 0);
-
- intrusive_ptr<Message> msg20 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg20(msg20);
- mbmFanout4.route(dmsg20); // Brings queue 7 to capacity limit
- msg20->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg20->isContentReleased(), false);
- BOOST_CHECK_EQUAL(1u, dq7->getMessageCount());
- BOOST_CHECK_EQUAL(1u, dq8->getMessageCount());
- BOOST_CHECK_EQUAL(1u, tq9->getMessageCount());
-
- intrusive_ptr<Message> msg21 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content
- DeliverableMessage dmsg21(msg21);
- mbmFanout4.route(dmsg21);
- msg21->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg21->isContentReleased(), false);
- BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(2u, dq8->getMessageCount());
- BOOST_CHECK_EQUAL(2u, tq9->getMessageCount());
-
- intrusive_ptr<Message> msg22 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content
- DeliverableMessage dmsg22(msg22);
- mbmFanout4.route(dmsg22);
- msg22->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg22->isContentReleased(), false);
- BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(3u, dq8->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(3u, tq9->getMessageCount());
-
- intrusive_ptr<Message> msg23 = mkMsg(testStore); // transient no content
- DeliverableMessage dmsg23(msg23);
- mbmFanout4.route(dmsg23);
- msg23->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg23->isContentReleased(), false);
- BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(4u, dq8->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(4u, tq9->getMessageCount());
-
- intrusive_ptr<Message> msg24 = mkMsg(testStore, "", true); // durable no content
- DeliverableMessage dmsg24(msg24);
- mbmFanout4.route(dmsg24);
- msg24->tryReleaseContent();
- BOOST_CHECK_EQUAL(msg24->isContentReleased(), false);
- BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(5u, dq8->getMessageCount()); // over limit
- BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
-}
-
QPID_AUTO_TEST_CASE(testSetPositionFifo) {
Queue::shared_ptr q(new Queue("my-queue", true));
BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0));
for (int i = 0; i < 10; ++i)
- q->deliver(contentMessage(boost::lexical_cast<string>(i+1)));
+ q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), boost::lexical_cast<string>(i+1)));
// Verify the front of the queue
TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(1u, c->last.position); // Numbered from 1
- BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); // Numbered from 1
+ BOOST_CHECK_EQUAL("1", c->lastMessage.getContent());
+
// Verify the back of the queue
- QueuedMessage qm;
BOOST_CHECK_EQUAL(10u, q->getPosition());
- BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
- BOOST_CHECK_EQUAL("10", getContent(qm.payload));
BOOST_CHECK_EQUAL(10u, q->getMessageCount());
// Using setPosition to introduce a gap in sequence numbers.
q->setPosition(15);
BOOST_CHECK_EQUAL(10u, q->getMessageCount());
BOOST_CHECK_EQUAL(15u, q->getPosition());
- BOOST_CHECK(q->find(10, qm)); // Back of the queue
- BOOST_CHECK_EQUAL("10", getContent(qm.payload));
- q->deliver(contentMessage("16"));
- c->setPosition(9);
+ q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "16"));
+
+ q->seek(*c, Queue::MessagePredicate(), 9);
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(10u, c->last.position);
- BOOST_CHECK_EQUAL("10", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(10u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("10", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(16u, c->last.position);
- BOOST_CHECK_EQUAL("16", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(16u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("16", c->lastMessage.getContent());
// Using setPosition to trunkcate the queue
q->setPosition(5);
BOOST_CHECK_EQUAL(5u, q->getMessageCount());
- q->deliver(contentMessage("6a"));
- c->setPosition(4);
+ q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "6a"));
+ c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire
+ q->seek(*c, Queue::MessagePredicate(), 4);
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(5u, c->last.position);
- BOOST_CHECK_EQUAL("5", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("5", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(6u, c->last.position);
- BOOST_CHECK_EQUAL("6a", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(6u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("6a", c->lastMessage.getContent());
BOOST_CHECK(!q->dispatch(c)); // No more messages.
}
QPID_AUTO_TEST_CASE(testSetPositionLvq) {
- Queue::shared_ptr q(new Queue("my-queue", true));
+ QueueSettings settings;
string key="key";
- framing::FieldTable args;
- args.setString("qpid.last_value_queue_key", "key");
- q->configure(args);
+ settings.lvqKey = key;
+ QueueFactory factory;
+ Queue::shared_ptr q(factory.create("my-queue", settings));
const char* values[] = { "a", "b", "c", "a", "b", "c" };
for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
- intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
- m->insertCustomProperty(key, values[i]);
- q->deliver(m);
+ qpid::types::Variant::Map properties;
+ properties[key] = values[i];
+ q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
}
BOOST_CHECK_EQUAL(3u, q->getMessageCount());
// Verify the front of the queue
TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(4u, c->last.position); // Numbered from 1
- BOOST_CHECK_EQUAL("4", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); // Numbered from 1
+ BOOST_CHECK_EQUAL("4", c->lastMessage.getContent());
// Verify the back of the queue
- QueuedMessage qm;
BOOST_CHECK_EQUAL(6u, q->getPosition());
- BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
- BOOST_CHECK_EQUAL("6", getContent(qm.payload));
q->setPosition(5);
- c->setPosition(4);
+
+ c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire
+ q->seek(*c, Queue::MessagePredicate(), 4);
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(5u, c->last.position); // Numbered from 1
+ BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence()); // Numbered from 1
BOOST_CHECK(!q->dispatch(c));
}
QPID_AUTO_TEST_CASE(testSetPositionPriority) {
- Queue::shared_ptr q(new Queue("my-queue", true));
- framing::FieldTable args;
- args.setInt("qpid.priorities", 10);
- q->configure(args);
+ QueueSettings settings;
+ settings.priorities = 10;
+ QueueFactory factory;
+ Queue::shared_ptr q(factory.create("my-queue", settings));
const int priorities[] = { 1, 2, 3, 2, 1, 3 };
for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) {
- intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
- m->getFrames().getHeaders()->get<DeliveryProperties>(true)
- ->setPriority(priorities[i]);
- q->deliver(m);
+ qpid::types::Variant::Map properties;
+ properties["priority"] = priorities[i];
+ q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
}
// Truncation removes messages in fifo order, not priority order.
q->setPosition(3);
- TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in FIFO order
+ TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in priority order
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(1u, c->last.position);
+ BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(2u, c->last.position);
+ BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(3u, c->last.position);
+ BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence());
BOOST_CHECK(!q->dispatch(c));
- intrusive_ptr<Message> m = contentMessage("4a");
- m->getFrames().getHeaders()->get<DeliveryProperties>(true)
- ->setPriority(4);
- q->deliver(m);
+ qpid::types::Variant::Map properties;
+ properties["priority"] = 4;
+ q->deliver(MessageUtils::createMessage(properties, "4a"));
+
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(4u, c->last.position);
- BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent());
// But consumers see priority order
c.reset(new TestConsumer("test", true));
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(4u, c->last.position);
- BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(3u, c->last.position);
- BOOST_CHECK_EQUAL("3", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("3", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(2u, c->last.position);
- BOOST_CHECK_EQUAL("2", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("2", c->lastMessage.getContent());
BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(1u, c->last.position);
- BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+ BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence());
+ BOOST_CHECK_EQUAL("1", c->lastMessage.getContent());
}
QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/ReplicationTest.cpp b/qpid/cpp/src/tests/ReplicationTest.cpp
deleted file mode 100644
index 055f06579f..0000000000
--- a/qpid/cpp/src/tests/ReplicationTest.cpp
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "unit_test.h"
-#include "test_tools.h"
-#include "config.h"
-#include "BrokerFixture.h"
-
-#include "qpid/Plugin.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/client/QueueOptions.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "qpid/replication/constants.h"
-#include "qpid/sys/Shlib.h"
-
-#include <string>
-#include <sstream>
-#include <vector>
-
-#include <boost/assign.hpp>
-#include <boost/bind.hpp>
-
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::replication::constants;
-using boost::assign::list_of;
-
-namespace qpid {
-namespace tests {
-
-QPID_AUTO_TEST_SUITE(ReplicationTestSuite)
-
-// FIXME aconway 2009-11-26: clean this up.
-// The CMake-based build passes in the module suffix; if it's not there, this
-// is a Linux/UNIX libtool-based build.
-#if defined (QPID_MODULE_PREFIX) && defined (QPID_MODULE_SUFFIX)
-static const char *default_shlib =
- QPID_MODULE_PREFIX "replicating_listener" QPID_MODULE_POSTFIX QPID_MODULE_SUFFIX;
-#else
-static const char *default_shlib = ".libs/replicating_listener.so";
-#endif
-qpid::sys::Shlib plugin(getLibPath("REPLICATING_LISTENER_LIB", default_shlib));
-
-qpid::broker::Broker::Options getBrokerOpts(const std::vector<std::string>& args)
-{
- std::vector<const char*> argv(args.size());
- transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1));
-
- qpid::broker::Broker::Options opts;
- qpid::Plugin::addOptions(opts);
- opts.parse(argv.size(), &argv[0], "", true);
- return opts;
-}
-
-QPID_AUTO_TEST_CASE(testReplicationExchange)
-{
- qpid::broker::Broker::Options brokerOpts(getBrokerOpts(list_of<std::string>("qpidd")
- ("--replication-exchange-name=qpid.replication")));
- SessionFixture f(brokerOpts);
-
-
- std::string dataQ("queue-1");
- std::string eventQ("event-queue-1");
- std::string dataQ2("queue-2");
- std::string eventQ2("event-queue-2");
- FieldTable eventQopts;
- eventQopts.setString("qpid.insert_sequence_numbers", REPLICATION_EVENT_SEQNO);
-
- f.session.queueDeclare(arg::queue=eventQ, arg::exclusive=true, arg::autoDelete=true, arg::arguments=eventQopts);
- f.session.exchangeBind(arg::exchange="qpid.replication", arg::queue=eventQ, arg::bindingKey=dataQ);
-
- f.session.queueDeclare(arg::queue=eventQ2, arg::exclusive=true, arg::autoDelete=true, arg::arguments=eventQopts);
- f.session.exchangeBind(arg::exchange="qpid.replication", arg::queue=eventQ2, arg::bindingKey=dataQ2);
-
- QueueOptions args;
- args.enableQueueEvents(false);
- f.session.queueDeclare(arg::queue=dataQ, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
- f.session.queueDeclare(arg::queue=dataQ2, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
- for (int i = 0; i < 10; i++) {
- f.session.messageTransfer(arg::content=Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), dataQ));
- f.session.messageTransfer(arg::content=Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), dataQ2));
- }
- Message msg;
- LocalQueue incoming;
- Subscription sub = f.subs.subscribe(incoming, dataQ);
- for (int i = 0; i < 10; i++) {
- BOOST_CHECK(incoming.get(msg, qpid::sys::TIME_SEC));
- BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData());
- }
- BOOST_CHECK(!f.subs.get(msg, dataQ));
-
- sub.cancel();
- sub = f.subs.subscribe(incoming, eventQ);
- //check that we received enqueue events for first queue:
- for (int i = 0; i < 10; i++) {
- BOOST_CHECK(incoming.get(msg, qpid::sys::TIME_SEC));
- BOOST_CHECK_EQUAL(msg.getHeaders().getAsString(REPLICATION_TARGET_QUEUE), dataQ);
- BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt(REPLICATION_EVENT_TYPE), ENQUEUE);
- BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt64(REPLICATION_EVENT_SEQNO), (i+1));
- BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData());
- }
- //check that we received dequeue events for first queue:
- for (int i = 0; i < 10; i++) {
- BOOST_CHECK(incoming.get(msg, qpid::sys::TIME_SEC));
- BOOST_CHECK_EQUAL(msg.getHeaders().getAsString(REPLICATION_TARGET_QUEUE), dataQ);
- BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt(REPLICATION_EVENT_TYPE), DEQUEUE);
- BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt(DEQUEUED_MESSAGE_POSITION), (i+1));
- BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt64(REPLICATION_EVENT_SEQNO), (i+11));
- }
-
- sub.cancel();
- sub = f.subs.subscribe(incoming, eventQ2);
- //check that we received enqueue events for second queue:
- for (int i = 0; i < 10; i++) {
- BOOST_CHECK(incoming.get(msg, qpid::sys::TIME_SEC));
- BOOST_CHECK_EQUAL(msg.getHeaders().getAsString(REPLICATION_TARGET_QUEUE), dataQ2);
- BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt(REPLICATION_EVENT_TYPE), ENQUEUE);
- BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt64(REPLICATION_EVENT_SEQNO), (i+1));
- BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData());
- }
-}
-
-
-QPID_AUTO_TEST_SUITE_END()
-
-}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/TxMocks.h b/qpid/cpp/src/tests/TxMocks.h
index 72cb50cd21..bf21104f70 100644
--- a/qpid/cpp/src/tests/TxMocks.h
+++ b/qpid/cpp/src/tests/TxMocks.h
@@ -119,8 +119,6 @@ public:
assertEqualVector(expected, actual);
}
- void accept(TxOpConstVisitor&) const {}
-
~MockTxOp(){}
};
diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp
deleted file mode 100644
index a636646035..0000000000
--- a/qpid/cpp/src/tests/TxPublishTest.cpp
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/NullMessageStore.h"
-#include "qpid/broker/RecoveryManager.h"
-#include "qpid/broker/TxPublish.h"
-#include "unit_test.h"
-#include <iostream>
-#include <list>
-#include <vector>
-#include "MessageUtils.h"
-#include "TestMessageStore.h"
-
-using std::list;
-using std::pair;
-using std::vector;
-using boost::intrusive_ptr;
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-namespace qpid {
-namespace tests {
-
-struct TxPublishTest
-{
-
- TestMessageStore store;
- Queue::shared_ptr queue1;
- Queue::shared_ptr queue2;
- intrusive_ptr<Message> msg;
- TxPublish op;
-
- TxPublishTest() :
- queue1(new Queue("queue1", false, &store, 0)),
- queue2(new Queue("queue2", false, &store, 0)),
- msg(MessageUtils::createMessage("exchange", "routing_key", true)),
- op(msg)
- {
- op.deliverTo(queue1);
- op.deliverTo(queue2);
- }
-};
-
-
-QPID_AUTO_TEST_SUITE(TxPublishTestSuite)
-
-QPID_AUTO_TEST_CASE(testPrepare)
-{
- TxPublishTest t;
-
- intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(t.msg);
- //ensure messages are enqueued in store
- t.op.prepare(0);
- BOOST_CHECK_EQUAL((size_t) 2, t.store.enqueued.size());
- BOOST_CHECK_EQUAL(std::string("queue1"), t.store.enqueued[0].first);
- BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second);
- BOOST_CHECK_EQUAL(std::string("queue2"), t.store.enqueued[1].first);
- BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second);
- BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isIngressComplete());
-}
-
-QPID_AUTO_TEST_CASE(testCommit)
-{
- TxPublishTest t;
-
- //ensure messages are delivered to queue
- t.op.prepare(0);
- t.op.commit();
- BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount());
- intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload;
-
- BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isIngressComplete());
- BOOST_CHECK_EQUAL(t.msg, msg_dequeue);
-
- BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount());
- BOOST_CHECK_EQUAL(t.msg, t.queue2->get().payload);
-}
-
-QPID_AUTO_TEST_SUITE_END()
-
-}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 246b0ed423..310ef844bd 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -583,15 +583,7 @@ class ReplicationTests(BrokerTest):
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
-
- # FIXME aconway 2012-02-22: there is a bug in priority ring
- # queues that allows a low priority message to displace a high
- # one. The following commented-out assert_browse is for the
- # correct result, the uncommented one is for the actualy buggy
- # result. See https://issues.apache.org/jira/browse/QPID-3866
- #
- # expect = sorted(priorities,reverse=True)[0:5]
- expect = [9,9,9,9,2]
+ expect = sorted(priorities,reverse=True)[0:5]
primary.assert_browse("q", expect, transform=lambda m: m.priority)
backup.assert_browse_backup("q", expect, transform=lambda m: m.priority)
diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp
index 257e77b6b4..83f6a5e4b1 100644
--- a/qpid/cpp/src/tests/test_store.cpp
+++ b/qpid/cpp/src/tests/test_store.cpp
@@ -34,6 +34,7 @@
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
#include "qpid/Plugin.h"
@@ -95,7 +96,7 @@ class TestStore : public NullMessageStore {
const boost::intrusive_ptr<PersistableMessage>& pmsg,
const PersistableQueue& )
{
- Message* msg = dynamic_cast<Message*>(pmsg.get());
+ qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
assert(msg);
// Dump the message if there is a dump file.