diff options
| author | Gordon Sim <gsim@apache.org> | 2012-08-10 12:04:27 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2012-08-10 12:04:27 +0000 |
| commit | df36b35eb7ca20c3b354d6895004fb201346482b (patch) | |
| tree | 357d90752f44304284639014f3b9db0cae1f2b2b /qpid/cpp/src/tests | |
| parent | 798cebf0e4f41953eb542d6358e5f0eea33d85a7 (diff) | |
| download | qpid-python-df36b35eb7ca20c3b354d6895004fb201346482b.tar.gz | |
QPID-4178: broker refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1371676 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/CMakeLists.txt | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/ClientSessionTest.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/DeliveryRecordTest.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/ExchangeTest.cpp | 80 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/MessageBuilderTest.cpp | 190 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/MessageTest.cpp | 59 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/MessageUtils.h | 61 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/QueueDepth.cpp | 105 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/QueueEvents.cpp | 238 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/QueueFlowLimitTest.cpp | 38 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/QueuePolicyTest.cpp | 135 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/QueueRegistryTest.cpp | 23 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 1267 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/ReplicationTest.cpp | 144 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/TxMocks.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/TxPublishTest.cpp | 98 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 10 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/test_store.cpp | 3 |
19 files changed, 414 insertions, 2053 deletions
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index 9a77514f5f..f88c0eb58e 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -126,6 +126,7 @@ set(unit_tests_to_build ExchangeTest HeadersExchangeTest MessageTest + QueueDepth QueueRegistryTest QueuePolicyTest QueueFlowLimitTest @@ -135,16 +136,12 @@ set(unit_tests_to_build TimerTest TopicExchangeTest TxBufferTest - TxPublishTest - MessageBuilderTest ManagementTest MessageReplayTracker ConsoleTest - QueueEvents ProxyTest RetryList FrameDecoder - ReplicationTest ClientMessageTest PollableCondition Variant diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp index 1905219bf2..1f07d2b83f 100644 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -621,7 +621,7 @@ QPID_AUTO_TEST_CASE(testQueueDeleted) fix.session.queueDeclare(arg::queue="my-queue"); LocalQueue queue; fix.subs.subscribe(queue, "my-queue"); - + ScopedSuppressLogging sl; fix.session.queueDelete(arg::queue="my-queue"); BOOST_CHECK_THROW(queue.get(1*qpid::sys::TIME_SEC), qpid::framing::ResourceDeletedException); diff --git a/qpid/cpp/src/tests/DeliveryRecordTest.cpp b/qpid/cpp/src/tests/DeliveryRecordTest.cpp index fb7bd2f727..c83bd9a6a4 100644 --- a/qpid/cpp/src/tests/DeliveryRecordTest.cpp +++ b/qpid/cpp/src/tests/DeliveryRecordTest.cpp @@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort) list<DeliveryRecord> records; for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) { - DeliveryRecord r(QueuedMessage(0), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false); + DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false); r.setId(*i); records.push_back(r); } diff --git a/qpid/cpp/src/tests/ExchangeTest.cpp b/qpid/cpp/src/tests/ExchangeTest.cpp index 66a16b9178..4f18b91b5a 100644 --- a/qpid/cpp/src/tests/ExchangeTest.cpp +++ b/qpid/cpp/src/tests/ExchangeTest.cpp @@ -35,7 +35,6 @@ using std::string; -using boost::intrusive_ptr; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; @@ -62,11 +61,9 @@ QPID_AUTO_TEST_CASE(testMe) queue.reset(); queue2.reset(); - intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "abc", false, "id")); - DeliverableMessage msg(msgPtr); + DeliverableMessage msg(MessageUtils::createMessage("exchange", "abc"), 0); topic.route(msg); direct.route(msg); - } QPID_AUTO_TEST_CASE(testIsBound) @@ -170,16 +167,6 @@ QPID_AUTO_TEST_CASE(testDeleteGetAndRedeclare) BOOST_CHECK_EQUAL(string("direct"), response.first->getType()); } -intrusive_ptr<Message> cmessage(std::string exchange, std::string routingKey) { - intrusive_ptr<Message> msg(new Message()); - AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); - AMQFrame header((AMQHeaderBody())); - msg->getFrames().append(method); - msg->getFrames().append(header); - msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); - return msg; -} - QPID_AUTO_TEST_CASE(testSequenceOptions) { FieldTable args; @@ -189,46 +176,35 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) { DirectExchange direct("direct1", false, args); - intrusive_ptr<Message> msg1 = cmessage("e", "abc"); - intrusive_ptr<Message> msg2 = cmessage("e", "abc"); - intrusive_ptr<Message> msg3 = cmessage("e", "abc"); - - DeliverableMessage dmsg1(msg1); - DeliverableMessage dmsg2(msg2); - DeliverableMessage dmsg3(msg3); + DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0); + DeliverableMessage msg2(MessageUtils::createMessage("e", "abc"), 0); + DeliverableMessage msg3(MessageUtils::createMessage("e", "abc"), 0); - direct.route(dmsg1); - direct.route(dmsg2); - direct.route(dmsg3); + direct.route(msg1); + direct.route(msg2); + direct.route(msg3); - BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - BOOST_CHECK_EQUAL(3, msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + BOOST_CHECK_EQUAL(1, msg1.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); + BOOST_CHECK_EQUAL(2, msg2.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); + BOOST_CHECK_EQUAL(3, msg3.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); FanOutExchange fanout("fanout1", false, args); HeadersExchange header("headers1", false, args); TopicExchange topic ("topic1", false, args); // check other exchanges, that they preroute - intrusive_ptr<Message> msg4 = cmessage("e", "abc"); - intrusive_ptr<Message> msg5 = cmessage("e", "abc"); + DeliverableMessage msg4(MessageUtils::createMessage("e", "abc"), 0); + DeliverableMessage msg5(MessageUtils::createMessage("e", "abc"), 0); + DeliverableMessage msg6(MessageUtils::createMessage("e", "abc"), 0); - // Need at least empty header for the HeadersExchange to route at all - msg5->insertCustomProperty("", ""); - intrusive_ptr<Message> msg6 = cmessage("e", "abc"); + fanout.route(msg4); + BOOST_CHECK_EQUAL(1, msg4.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); - DeliverableMessage dmsg4(msg4); - DeliverableMessage dmsg5(msg5); - DeliverableMessage dmsg6(msg6); + header.route(msg5); + BOOST_CHECK_EQUAL(1, msg5.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); - fanout.route(dmsg4); - BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - - header.route(dmsg5); - BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); - - topic.route(dmsg6); - BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + topic.route(msg6); + BOOST_CHECK_EQUAL(1, msg6.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); direct.encode(buffer); } { @@ -237,11 +213,10 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) buffer.reset(); DirectExchange::shared_ptr exch_dec = Exchange::decode(exchanges, buffer); - intrusive_ptr<Message> msg1 = cmessage("e", "abc"); - DeliverableMessage dmsg1(msg1); - exch_dec->route(dmsg1); + DeliverableMessage msg1(MessageUtils::createMessage("e", "abc"), 0); + exch_dec->route(msg1); - BOOST_CHECK_EQUAL(4, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + BOOST_CHECK_EQUAL(4, msg1.getMessage().getAnnotation("qpid.msg_sequence").asInt64()); } delete [] buff; @@ -256,9 +231,11 @@ QPID_AUTO_TEST_CASE(testIVEOption) HeadersExchange header("headers1", false, args); TopicExchange topic ("topic1", false, args); - intrusive_ptr<Message> msg1 = cmessage("direct1", "abc"); - msg1->insertCustomProperty("a", "abc"); - DeliverableMessage dmsg1(msg1); + qpid::types::Variant::Map properties; + properties["routing-key"] = "abc"; + properties["a"] = "abc"; + Message msg1 = MessageUtils::createMessage(properties, "my-message", "direct1"); + DeliverableMessage dmsg1(msg1, 0); FieldTable args2; args2.setString("x-match", "any"); @@ -273,8 +250,6 @@ QPID_AUTO_TEST_CASE(testIVEOption) Queue::shared_ptr queue2(new Queue("queue2", true)); Queue::shared_ptr queue3(new Queue("queue3", true)); - BOOST_CHECK(HeadersExchange::match(args2, msg1->getProperties<MessageProperties>()->getApplicationHeaders())); - BOOST_CHECK(direct.bind(queue, "abc", 0)); BOOST_CHECK(fanout.bind(queue1, "abc", 0)); BOOST_CHECK(header.bind(queue2, "", &args2)); @@ -287,7 +262,6 @@ QPID_AUTO_TEST_CASE(testIVEOption) } - QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index b04ec6b43e..f9eed9270b 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -96,6 +96,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ ExchangeTest.cpp \ HeadersExchangeTest.cpp \ MessageTest.cpp \ + QueueDepth.cpp \ QueueRegistryTest.cpp \ QueuePolicyTest.cpp \ QueueFlowLimitTest.cpp \ @@ -105,19 +106,15 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ TimerTest.cpp \ TopicExchangeTest.cpp \ TxBufferTest.cpp \ - TxPublishTest.cpp \ - MessageBuilderTest.cpp \ ConnectionOptions.h \ ForkedBroker.h \ ForkedBroker.cpp \ ManagementTest.cpp \ MessageReplayTracker.cpp \ ConsoleTest.cpp \ - QueueEvents.cpp \ ProxyTest.cpp \ RetryList.cpp \ FrameDecoder.cpp \ - ReplicationTest.cpp \ ClientMessageTest.cpp \ PollableCondition.cpp \ Variant.cpp \ diff --git a/qpid/cpp/src/tests/MessageBuilderTest.cpp b/qpid/cpp/src/tests/MessageBuilderTest.cpp deleted file mode 100644 index 9adb133d40..0000000000 --- a/qpid/cpp/src/tests/MessageBuilderTest.cpp +++ /dev/null @@ -1,190 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/Message.h" -#include "qpid/broker/MessageBuilder.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid/framing/frame_functors.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/TypeFilter.h" -#include "unit_test.h" -#include <list> - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -namespace qpid { -namespace tests { - -class MockMessageStore : public NullMessageStore -{ - enum Op {STAGE=1, APPEND=2}; - - uint64_t id; - boost::intrusive_ptr<PersistableMessage> expectedMsg; - std::string expectedData; - std::list<Op> ops; - - void checkExpectation(Op actual) - { - BOOST_CHECK_EQUAL(ops.front(), actual); - ops.pop_front(); - } - - public: - MockMessageStore() : id(0), expectedMsg(0) {} - - void expectStage(PersistableMessage& msg) - { - expectedMsg = &msg; - ops.push_back(STAGE); - } - - void expectAppendContent(PersistableMessage& msg, const std::string& data) - { - expectedMsg = &msg; - expectedData = data; - ops.push_back(APPEND); - } - - void stage(const boost::intrusive_ptr<PersistableMessage>& msg) - { - checkExpectation(STAGE); - BOOST_CHECK_EQUAL(expectedMsg, msg); - msg->setPersistenceId(++id); - } - - void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, - const std::string& data) - { - checkExpectation(APPEND); - BOOST_CHECK_EQUAL(boost::static_pointer_cast<const PersistableMessage>(expectedMsg), msg); - BOOST_CHECK_EQUAL(expectedData, data); - } - - bool expectationsMet() - { - return ops.empty(); - } - - //don't treat this store as a null impl - bool isNull() const - { - return false; - } - -}; - -QPID_AUTO_TEST_SUITE(MessageBuilderTestSuite) - -QPID_AUTO_TEST_CASE(testHeaderOnly) -{ - MessageBuilder builder(0); - builder.start(SequenceNumber()); - - std::string exchange("builder-exchange"); - std::string key("builder-exchange"); - - AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); - AMQFrame header((AMQHeaderBody())); - - header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(0); - header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); - - builder.handle(method); - builder.handle(header); - - BOOST_CHECK(builder.getMessage()); - BOOST_CHECK_EQUAL(exchange, builder.getMessage()->getExchangeName()); - BOOST_CHECK_EQUAL(key, builder.getMessage()->getRoutingKey()); - BOOST_CHECK(builder.getMessage()->getFrames().isComplete()); -} - -QPID_AUTO_TEST_CASE(test1ContentFrame) -{ - MessageBuilder builder(0); - builder.start(SequenceNumber()); - - std::string data("abcdefg"); - std::string exchange("builder-exchange"); - std::string key("builder-exchange"); - - AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); - AMQFrame header((AMQHeaderBody())); - AMQFrame content((AMQContentBody(data))); - method.setEof(false); - header.setBof(false); - header.setEof(false); - content.setBof(false); - - header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data.size()); - header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); - - builder.handle(method); - BOOST_CHECK(builder.getMessage()); - BOOST_CHECK(!builder.getMessage()->getFrames().isComplete()); - - builder.handle(header); - BOOST_CHECK(builder.getMessage()); - BOOST_CHECK(!builder.getMessage()->getFrames().isComplete()); - - builder.handle(content); - BOOST_CHECK(builder.getMessage()); - BOOST_CHECK(builder.getMessage()->getFrames().isComplete()); -} - -QPID_AUTO_TEST_CASE(test2ContentFrames) -{ - MessageBuilder builder(0); - builder.start(SequenceNumber()); - - std::string data1("abcdefg"); - std::string data2("hijklmn"); - std::string exchange("builder-exchange"); - std::string key("builder-exchange"); - - AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); - AMQFrame header((AMQHeaderBody())); - AMQFrame content1((AMQContentBody(data1))); - AMQFrame content2((AMQContentBody(data2))); - method.setEof(false); - header.setBof(false); - header.setEof(false); - content1.setBof(false); - content1.setEof(false); - content2.setBof(false); - - header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size()); - header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); - - builder.handle(method); - builder.handle(header); - builder.handle(content1); - BOOST_CHECK(builder.getMessage()); - BOOST_CHECK(!builder.getMessage()->getFrames().isComplete()); - - builder.handle(content2); - BOOST_CHECK(builder.getMessage()); - BOOST_CHECK(builder.getMessage()->getFrames().isComplete()); -} -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/MessageTest.cpp b/qpid/cpp/src/tests/MessageTest.cpp index 3a3ed061f9..fe670a274e 100644 --- a/qpid/cpp/src/tests/MessageTest.cpp +++ b/qpid/cpp/src/tests/MessageTest.cpp @@ -24,6 +24,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/Uuid.h" +#include "MessageUtils.h" #include "unit_test.h" @@ -43,49 +44,29 @@ QPID_AUTO_TEST_CASE(testEncodeDecode) { string exchange = "MyExchange"; string routingKey = "MyRoutingKey"; + uint64_t ttl(60); Uuid messageId(true); - string data1("abcdefg"); - string data2("hijklmn"); + string data("abcdefghijklmn"); - boost::intrusive_ptr<Message> msg(new Message()); + qpid::types::Variant::Map properties; + properties["routing-key"] = routingKey; + properties["ttl"] = ttl; + properties["durable"] = true; + properties["message-id"] = qpid::types::Uuid(messageId.data()); + properties["abc"] = "xyz"; + Message msg = MessageUtils::createMessage(properties, data); - AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); - AMQFrame header((AMQHeaderBody())); - AMQFrame content1((AMQContentBody(data1))); - AMQFrame content2((AMQContentBody(data2))); + std::string buffer; + encode(msg, buffer); + msg = Message(); + decode(buffer, msg); - msg->getFrames().append(method); - msg->getFrames().append(header); - msg->getFrames().append(content1); - msg->getFrames().append(content2); - - MessageProperties* mProps = msg->getFrames().getHeaders()->get<MessageProperties>(true); - mProps->setContentLength(data1.size() + data2.size()); - mProps->setMessageId(messageId); - FieldTable applicationHeaders; - applicationHeaders.setString("abc", "xyz"); - mProps->setApplicationHeaders(applicationHeaders); - DeliveryProperties* dProps = msg->getFrames().getHeaders()->get<DeliveryProperties>(true); - dProps->setRoutingKey(routingKey); - dProps->setDeliveryMode(PERSISTENT); - BOOST_CHECK(msg->isPersistent()); - - std::vector<char> buff(msg->encodedSize()); - Buffer wbuffer(&buff[0], msg->encodedSize()); - msg->encode(wbuffer); - - Buffer rbuffer(&buff[0], msg->encodedSize()); - msg = new Message(); - msg->decodeHeader(rbuffer); - msg->decodeContent(rbuffer); - BOOST_CHECK_EQUAL(exchange, msg->getExchangeName()); - BOOST_CHECK_EQUAL(routingKey, msg->getRoutingKey()); - BOOST_CHECK_EQUAL((uint64_t) data1.size() + data2.size(), msg->contentSize()); - BOOST_CHECK_EQUAL((uint64_t) data1.size() + data2.size(), msg->getProperties<MessageProperties>()->getContentLength()); - BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId()); - BOOST_CHECK_EQUAL(string("xyz"), msg->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("abc")); - BOOST_CHECK_EQUAL((uint8_t) PERSISTENT, msg->getProperties<DeliveryProperties>()->getDeliveryMode()); - BOOST_CHECK(msg->isPersistent()); + BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey()); + BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContentSize()); + BOOST_CHECK_EQUAL(data, msg.getContent()); + //BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId()); + BOOST_CHECK_EQUAL(string("xyz"), msg.getPropertyAsString("abc")); + BOOST_CHECK(msg.isPersistent()); } QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/MessageUtils.h b/qpid/cpp/src/tests/MessageUtils.h index 991e2a2714..c2eabd804d 100644 --- a/qpid/cpp/src/tests/MessageUtils.h +++ b/qpid/cpp/src/tests/MessageUtils.h @@ -20,9 +20,11 @@ */ #include "qpid/broker/Message.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/Uuid.h" +#include "qpid/types/Variant.h" using namespace qpid; using namespace broker; @@ -33,11 +35,46 @@ namespace tests { struct MessageUtils { - static boost::intrusive_ptr<Message> createMessage(const std::string& exchange="", const std::string& routingKey="", - const bool durable = false, const Uuid& messageId=Uuid(true), - uint64_t contentSize = 0) + static Message createMessage(const qpid::types::Variant::Map& properties, const std::string& content="", const std::string& destination = "") { - boost::intrusive_ptr<broker::Message> msg(new broker::Message()); + boost::intrusive_ptr<broker::amqp_0_10::MessageTransfer> msg(new broker::amqp_0_10::MessageTransfer()); + + AMQFrame method(( MessageTransferBody(ProtocolVersion(), destination, 0, 0))); + AMQFrame header((AMQHeaderBody())); + + msg->getFrames().append(method); + msg->getFrames().append(header); + if (content.size()) { + msg->getFrames().getHeaders()->get<MessageProperties>(true)->setContentLength(content.size()); + AMQFrame data((AMQContentBody(content))); + msg->getFrames().append(data); + } + for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + if (i->first == "routing-key" && !i->second.isVoid()) { + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(i->second); + } else if (i->first == "message-id" && !i->second.isVoid()) { + qpid::types::Uuid id = i->second; + qpid::framing::Uuid id2(id.data()); + msg->getFrames().getHeaders()->get<MessageProperties>(true)->setMessageId(id2); + } else if (i->first == "ttl" && !i->second.isVoid()) { + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(i->second); + } else if (i->first == "priority" && !i->second.isVoid()) { + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setPriority(i->second); + } else if (i->first == "durable" && !i->second.isVoid()) { + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(i->second.asBool() ? 2 : 1); + } else { + msg->getFrames().getHeaders()->get<MessageProperties>(true)->getApplicationHeaders().setString(i->first, i->second); + } + } + return Message(msg, msg); + } + + + static Message createMessage(const std::string& exchange="", const std::string& routingKey="", + uint64_t ttl = 0, bool durable = false, const Uuid& messageId=Uuid(true), + const std::string& content="") + { + boost::intrusive_ptr<broker::amqp_0_10::MessageTransfer> msg(new broker::amqp_0_10::MessageTransfer()); AMQFrame method(( MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); AMQFrame header((AMQHeaderBody())); @@ -45,18 +82,18 @@ struct MessageUtils msg->getFrames().append(method); msg->getFrames().append(header); MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setContentLength(contentSize); + props->setContentLength(content.size()); props->setMessageId(messageId); msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); if (durable) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2); - return msg; - } - - static void addContent(boost::intrusive_ptr<Message> msg, const std::string& data) - { - AMQFrame content((AMQContentBody(data))); - msg->getFrames().append(content); + if (ttl) + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl); + if (content.size()) { + AMQFrame data((AMQContentBody(content))); + msg->getFrames().append(data); + } + return Message(msg, msg); } }; diff --git a/qpid/cpp/src/tests/QueueDepth.cpp b/qpid/cpp/src/tests/QueueDepth.cpp new file mode 100644 index 0000000000..73556141ca --- /dev/null +++ b/qpid/cpp/src/tests/QueueDepth.cpp @@ -0,0 +1,105 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/broker/QueueDepth.h" + +#include "unit_test.h" + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(QueueDepthTestSuite) + +using namespace qpid::broker; + +QPID_AUTO_TEST_CASE(testCompare) +{ + QueueDepth a(0, 0); + QueueDepth b(1, 1); + QueueDepth c(2, 2); + QueueDepth d(1, 1); + + BOOST_CHECK(a < b); + BOOST_CHECK(b < c); + BOOST_CHECK(a < c); + + BOOST_CHECK(b > a); + BOOST_CHECK(c > b); + BOOST_CHECK(c > a); + + BOOST_CHECK(b == d); + BOOST_CHECK(d == b); + BOOST_CHECK(a != b); + BOOST_CHECK(b != a); + + QueueDepth e; e.setCount(1); + QueueDepth f; f.setCount(2); + BOOST_CHECK(e < f); + BOOST_CHECK(f > e); + + QueueDepth g; g.setSize(1); + QueueDepth h; h.setSize(2); + BOOST_CHECK(g < h); + BOOST_CHECK(h > g); +} + +QPID_AUTO_TEST_CASE(testIncrement) +{ + QueueDepth a(5, 10); + QueueDepth b(3, 6); + QueueDepth c(8, 16); + a += b; + BOOST_CHECK(a == c); + BOOST_CHECK_EQUAL(8, a.getCount()); + BOOST_CHECK_EQUAL(16, a.getSize()); +} + +QPID_AUTO_TEST_CASE(testDecrement) +{ + QueueDepth a(5, 10); + QueueDepth b(3, 6); + QueueDepth c(2, 4); + a -= b; + BOOST_CHECK(a == c); + BOOST_CHECK_EQUAL(2, a.getCount()); + BOOST_CHECK_EQUAL(4, a.getSize()); +} + +QPID_AUTO_TEST_CASE(testAddition) +{ + QueueDepth a(5, 10); + QueueDepth b(3, 6); + + QueueDepth c = a + b; + BOOST_CHECK_EQUAL(8, c.getCount()); + BOOST_CHECK_EQUAL(16, c.getSize()); +} + +QPID_AUTO_TEST_CASE(testSubtraction) +{ + QueueDepth a(5, 10); + QueueDepth b(3, 6); + + QueueDepth c = a - b; + BOOST_CHECK_EQUAL(2, c.getCount()); + BOOST_CHECK_EQUAL(4, c.getSize()); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/QueueEvents.cpp b/qpid/cpp/src/tests/QueueEvents.cpp deleted file mode 100644 index cea8bbf0db..0000000000 --- a/qpid/cpp/src/tests/QueueEvents.cpp +++ /dev/null @@ -1,238 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "MessageUtils.h" -#include "unit_test.h" -#include "BrokerFixture.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/QueueEvents.h" -#include "qpid/client/QueueOptions.h" -#include "qpid/framing/SequenceNumber.h" -#include "qpid/sys/Dispatcher.h" -#include <boost/bind.hpp> -#include <boost/format.hpp> - -namespace qpid { -namespace tests { - -QPID_AUTO_TEST_SUITE(QueueEventsSuite) - -using namespace qpid::client; -using namespace qpid::broker; -using namespace qpid::sys; -using qpid::framing::SequenceNumber; - -struct EventChecker -{ - typedef std::deque<QueueEvents::Event> Events; - - Events events; - boost::shared_ptr<Poller> poller; - - void handle(QueueEvents::Event e) - { - if (events.empty()) { - BOOST_FAIL("Unexpected event received"); - } else { - BOOST_CHECK_EQUAL(events.front().type, e.type); - BOOST_CHECK_EQUAL(events.front().msg.queue, e.msg.queue); - BOOST_CHECK_EQUAL(events.front().msg.payload, e.msg.payload); - BOOST_CHECK_EQUAL(events.front().msg.position, e.msg.position); - events.pop_front(); - } - if (events.empty() && poller) poller->shutdown(); - } - - void expect(QueueEvents::Event e) - { - events.push_back(e); - } -}; - -QPID_AUTO_TEST_CASE(testBasicEventProcessing) -{ - boost::shared_ptr<Poller> poller(new Poller()); - sys::Dispatcher dispatcher(poller); - Thread dispatchThread(dispatcher); - QueueEvents events(poller); - EventChecker listener; - listener.poller = poller; - events.registerListener("dummy", boost::bind(&EventChecker::handle, &listener, _1)); - //signal occurence of some events: - Queue queue("queue1"); - SequenceNumber id; - QueuedMessage event1(&queue, MessageUtils::createMessage(), id); - QueuedMessage event2(&queue, MessageUtils::createMessage(), ++id); - - //define events expected by listener: - listener.expect(QueueEvents::Event(QueueEvents::ENQUEUE, event1)); - listener.expect(QueueEvents::Event(QueueEvents::ENQUEUE, event2)); - listener.expect(QueueEvents::Event(QueueEvents::DEQUEUE, event1)); - - events.enqueued(event1); - events.enqueued(event2); - events.dequeued(event1); - - dispatchThread.join(); - events.shutdown(); - events.unregisterListener("dummy"); -} - - -struct EventRecorder -{ - struct EventRecord - { - QueueEvents::EventType type; - std::string queue; - std::string content; - SequenceNumber position; - }; - - typedef std::deque<EventRecord> Events; - - Events events; - - void handle(QueueEvents::Event event) - { - EventRecord record; - record.type = event.type; - record.queue = event.msg.queue->getName(); - event.msg.payload->getFrames().getContent(record.content); - record.position = event.msg.position; - events.push_back(record); - } - - void check(QueueEvents::EventType type, const std::string& queue, const std::string& content, const SequenceNumber& position) - { - if (events.empty()) { - BOOST_FAIL("Missed event"); - } else { - BOOST_CHECK_EQUAL(events.front().type, type); - BOOST_CHECK_EQUAL(events.front().queue, queue); - BOOST_CHECK_EQUAL(events.front().content, content); - BOOST_CHECK_EQUAL(events.front().position, position); - events.pop_front(); - } - } - void checkEnqueue(const std::string& queue, const std::string& data, const SequenceNumber& position) - { - check(QueueEvents::ENQUEUE, queue, data, position); - } - - void checkDequeue(const std::string& queue, const std::string& data, const SequenceNumber& position) - { - check(QueueEvents::DEQUEUE, queue, data, position); - } -}; - -QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing) -{ - SessionFixture fixture; - //register dummy event listener to broker - EventRecorder listener; - fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1)); - - //declare queue with event options specified - QueueOptions options; - options.enableQueueEvents(false); - std::string q("queue-events-test"); - fixture.session.queueDeclare(arg::queue=q, arg::arguments=options); - //send and consume some messages - LocalQueue incoming; - Subscription sub = fixture.subs.subscribe(incoming, q); - for (int i = 0; i < 5; i++) { - fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); - } - for (int i = 0; i < 3; i++) { - BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str()); - } - for (int i = 5; i < 10; i++) { - fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); - } - for (int i = 3; i < 10; i++) { - BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str()); - } - fixture.connection.close(); - fixture.broker->getQueueEvents().shutdown(); - - //check listener was notified of all events, and in correct order - SequenceNumber enqueueId(1); - SequenceNumber dequeueId(1); - for (int i = 0; i < 5; i++) { - listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++); - } - for (int i = 0; i < 3; i++) { - listener.checkDequeue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), dequeueId++); - } - for (int i = 5; i < 10; i++) { - listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++); - } - for (int i = 3; i < 10; i++) { - listener.checkDequeue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), dequeueId++); - } -} - -QPID_AUTO_TEST_CASE(testSystemLevelEventProcessing_enqueuesOnly) -{ - SessionFixture fixture; - //register dummy event listener to broker - EventRecorder listener; - fixture.broker->getQueueEvents().registerListener("recorder", boost::bind(&EventRecorder::handle, &listener, _1)); - - //declare queue with event options specified - QueueOptions options; - options.enableQueueEvents(true); - std::string q("queue-events-test"); - fixture.session.queueDeclare(arg::queue=q, arg::arguments=options); - //send and consume some messages - LocalQueue incoming; - Subscription sub = fixture.subs.subscribe(incoming, q); - for (int i = 0; i < 5; i++) { - fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); - } - for (int i = 0; i < 3; i++) { - BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str()); - } - for (int i = 5; i < 10; i++) { - fixture.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); - } - for (int i = 3; i < 10; i++) { - BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str()); - } - fixture.connection.close(); - fixture.broker->getQueueEvents().shutdown(); - - //check listener was notified of all events, and in correct order - SequenceNumber enqueueId(1); - SequenceNumber dequeueId(1); - for (int i = 0; i < 5; i++) { - listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++); - } - for (int i = 5; i < 10; i++) { - listener.checkEnqueue(q, (boost::format("%1%_%2%") % "Message" % (i+1)).str(), enqueueId++); - } -} - -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp index bd868398f8..d305ca452b 100644 --- a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp +++ b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp @@ -23,8 +23,8 @@ #include "unit_test.h" #include "test_tools.h" -#include "qpid/broker/QueuePolicy.h" #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/QueueSettings.h" #include "qpid/sys/Time.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldValue.h" @@ -66,21 +66,19 @@ public: return new TestFlow(flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); } - static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& settings) + static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& arguments) { + QueueSettings settings; + settings.populate(arguments, settings.storeSettings); return QueueFlowLimit::createLimit(0, settings); } }; - - -QueuedMessage createMessage(uint32_t size) +Message createMessage(uint32_t size) { static uint32_t seqNum; - QueuedMessage msg; - msg.payload = MessageUtils::createMessage(); - msg.position = ++seqNum; - MessageUtils::addContent(msg.payload, std::string (size, 'x')); + Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size, 'x')); + msg.setSequence(++seqNum); return msg; } } @@ -100,7 +98,7 @@ QPID_AUTO_TEST_CASE(testFlowCount) BOOST_CHECK(!flow->isFlowControlActive()); BOOST_CHECK(flow->monitorFlowControl()); - std::deque<QueuedMessage> msgs; + std::deque<Message> msgs; for (size_t i = 0; i < 6; i++) { msgs.push_back(createMessage(10)); flow->enqueued(msgs.back()); @@ -135,7 +133,6 @@ QPID_AUTO_TEST_CASE(testFlowCount) BOOST_CHECK(!flow->isFlowControlActive()); // 4 on queue, OFF } - QPID_AUTO_TEST_CASE(testFlowSize) { FieldTable args; @@ -151,7 +148,7 @@ QPID_AUTO_TEST_CASE(testFlowSize) BOOST_CHECK(!flow->isFlowControlActive()); BOOST_CHECK(flow->monitorFlowControl()); - std::deque<QueuedMessage> msgs; + std::deque<Message> msgs; for (size_t i = 0; i < 6; i++) { msgs.push_back(createMessage(10)); flow->enqueued(msgs.back()); @@ -161,14 +158,14 @@ QPID_AUTO_TEST_CASE(testFlowSize) BOOST_CHECK_EQUAL(6u, flow->getFlowCount()); BOOST_CHECK_EQUAL(60u, flow->getFlowSize()); - QueuedMessage msg_9 = createMessage(9); + Message msg_9 = createMessage(9); flow->enqueued(msg_9); BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue - QueuedMessage tinyMsg_1 = createMessage(1); + Message tinyMsg_1 = createMessage(1); flow->enqueued(tinyMsg_1); BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue - QueuedMessage tinyMsg_2 = createMessage(1); + Message tinyMsg_2 = createMessage(1); flow->enqueued(tinyMsg_2); BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON msgs.push_back(createMessage(10)); @@ -233,12 +230,12 @@ QPID_AUTO_TEST_CASE(testFlowCombo) args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200); args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100); - std::deque<QueuedMessage> msgs_1; - std::deque<QueuedMessage> msgs_10; - std::deque<QueuedMessage> msgs_50; - std::deque<QueuedMessage> msgs_100; + std::deque<Message> msgs_1; + std::deque<Message> msgs_10; + std::deque<Message> msgs_50; + std::deque<Message> msgs_100; - QueuedMessage msg; + Message msg; std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args)); BOOST_CHECK(!flow->isFlowControlActive()); // count:0 size:0 @@ -458,7 +455,6 @@ QPID_AUTO_TEST_CASE(testFlowDisable) } } - QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/QueuePolicyTest.cpp b/qpid/cpp/src/tests/QueuePolicyTest.cpp index f735e09449..00e964602a 100644 --- a/qpid/cpp/src/tests/QueuePolicyTest.cpp +++ b/qpid/cpp/src/tests/QueuePolicyTest.cpp @@ -22,12 +22,10 @@ #include "unit_test.h" #include "test_tools.h" -#include "qpid/broker/QueuePolicy.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/client/QueueOptions.h" #include "qpid/sys/Time.h" #include "qpid/framing/reply_exceptions.h" -#include "MessageUtils.h" #include "BrokerFixture.h" using namespace qpid::broker; @@ -39,118 +37,10 @@ namespace tests { QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite) -namespace { -QueuedMessage createMessage(uint32_t size) -{ - QueuedMessage msg; - msg.payload = MessageUtils::createMessage(); - MessageUtils::addContent(msg.payload, std::string (size, 'x')); - return msg; -} -} - -QPID_AUTO_TEST_CASE(testCount) -{ - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 0)); - BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize()); - BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount()); - - QueuedMessage msg = createMessage(10); - for (size_t i = 0; i < 5; i++) { - policy->tryEnqueue(msg.payload); - } - try { - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on enqueuing sixth message"); - } catch (const ResourceLimitExceededException&) {} - - policy->dequeued(msg); - policy->tryEnqueue(msg.payload); - - try { - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)"); - } catch (const ResourceLimitExceededException&) {} -} - -QPID_AUTO_TEST_CASE(testSize) -{ - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 0, 50)); - QueuedMessage msg = createMessage(10); - - for (size_t i = 0; i < 5; i++) { - policy->tryEnqueue(msg.payload); - } - try { - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy); - } catch (const ResourceLimitExceededException&) {} - - policy->dequeued(msg); - policy->tryEnqueue(msg.payload); - - try { - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy); - } catch (const ResourceLimitExceededException&) {} -} - -QPID_AUTO_TEST_CASE(testBoth) -{ - std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 50)); - try { - QueuedMessage msg = createMessage(51); - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy); - } catch (const ResourceLimitExceededException&) {} - - std::vector<QueuedMessage> messages; - messages.push_back(createMessage(15)); - messages.push_back(createMessage(10)); - messages.push_back(createMessage(11)); - messages.push_back(createMessage(2)); - messages.push_back(createMessage(7)); - for (size_t i = 0; i < messages.size(); i++) { - policy->tryEnqueue(messages[i].payload); - } - //size = 45 at this point, count = 5 - try { - QueuedMessage msg = createMessage(5); - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy); - } catch (const ResourceLimitExceededException&) {} - try { - QueuedMessage msg = createMessage(10); - policy->tryEnqueue(msg.payload); - BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy); - } catch (const ResourceLimitExceededException&) {} - - - policy->dequeued(messages[0]); - try { - QueuedMessage msg = createMessage(20); - policy->tryEnqueue(msg.payload); - } catch (const ResourceLimitExceededException&) { - BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy); - } -} - -QPID_AUTO_TEST_CASE(testSettings) -{ - //test reading and writing the policy from/to field table - std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy("test", 101, 303)); - FieldTable settings; - a->update(settings); - std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy("test", settings)); - BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount()); - BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize()); -} - QPID_AUTO_TEST_CASE(testRingPolicyCount) { - FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING); - policy->update(args); + QueueOptions args; + args.setSizePolicy(RING, 0, 5); SessionFixture f; std::string q("my-ring-queue"); @@ -183,9 +73,8 @@ QPID_AUTO_TEST_CASE(testRingPolicySize) // Ring queue, 500 bytes maxSize - FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING); - policy->update(args); + QueueOptions args; + args.setSizePolicy(RING, 500, 0); SessionFixture f; std::string q("my-ring-queue"); @@ -255,9 +144,9 @@ QPID_AUTO_TEST_CASE(testRingPolicySize) QPID_AUTO_TEST_CASE(testStrictRingPolicy) { - FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT); - policy->update(args); + QueueOptions args; + args.setSizePolicy(RING_STRICT, 0, 5); + args.setString("qpid.flow_stop_count", "0"); SessionFixture f; std::string q("my-ring-queue"); @@ -281,9 +170,8 @@ QPID_AUTO_TEST_CASE(testStrictRingPolicy) QPID_AUTO_TEST_CASE(testPolicyWithDtx) { - FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT); - policy->update(args); + QueueOptions args; + args.setSizePolicy(REJECT, 0, 5); SessionFixture f; std::string q("my-policy-queue"); @@ -367,9 +255,8 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNoStore) QPID_AUTO_TEST_CASE(testPolicyFailureOnCommit) { - FieldTable args; - std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT); - policy->update(args); + QueueOptions args; + args.setSizePolicy(REJECT, 0, 5); SessionFixture f; std::string q("q"); diff --git a/qpid/cpp/src/tests/QueueRegistryTest.cpp b/qpid/cpp/src/tests/QueueRegistryTest.cpp index ae555539a4..364d66c525 100644 --- a/qpid/cpp/src/tests/QueueRegistryTest.cpp +++ b/qpid/cpp/src/tests/QueueRegistryTest.cpp @@ -19,6 +19,7 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueSettings.h" #include "unit_test.h" #include <string> @@ -36,33 +37,23 @@ QPID_AUTO_TEST_CASE(testDeclare) QueueRegistry reg; std::pair<Queue::shared_ptr, bool> qc; - qc = reg.declare(foo, false, 0, 0); + qc = reg.declare(foo, QueueSettings()); Queue::shared_ptr q = qc.first; BOOST_CHECK(q); BOOST_CHECK(qc.second); // New queue BOOST_CHECK_EQUAL(foo, q->getName()); - qc = reg.declare(foo, false, 0, 0); + qc = reg.declare(foo, QueueSettings()); BOOST_CHECK_EQUAL(q, qc.first); BOOST_CHECK(!qc.second); - qc = reg.declare(bar, false, 0, 0); + qc = reg.declare(bar, QueueSettings()); q = qc.first; BOOST_CHECK(q); BOOST_CHECK_EQUAL(true, qc.second); BOOST_CHECK_EQUAL(bar, q->getName()); } -QPID_AUTO_TEST_CASE(testDeclareTmp) -{ - QueueRegistry reg; - std::pair<Queue::shared_ptr, bool> qc; - - qc = reg.declare(std::string(), false, 0, 0); - BOOST_CHECK(qc.second); - BOOST_CHECK_EQUAL(std::string("tmp_1"), qc.first->getName()); -} - QPID_AUTO_TEST_CASE(testFind) { std::string foo("foo"); @@ -72,8 +63,8 @@ QPID_AUTO_TEST_CASE(testFind) BOOST_CHECK(reg.find(foo) == 0); - reg.declare(foo, false, 0, 0); - reg.declare(bar, false, 0, 0); + reg.declare(foo, QueueSettings()); + reg.declare(bar, QueueSettings()); Queue::shared_ptr q = reg.find(bar); BOOST_CHECK(q); BOOST_CHECK_EQUAL(bar, q->getName()); @@ -85,7 +76,7 @@ QPID_AUTO_TEST_CASE(testDestroy) QueueRegistry reg; std::pair<Queue::shared_ptr, bool> qc; - qc = reg.declare(foo, false, 0, 0); + qc = reg.declare(foo, QueueSettings()); reg.destroy(foo); // Queue is gone from the registry. BOOST_CHECK(reg.find(foo) == 0); diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index aa7132828a..3dfe3863f4 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -38,8 +38,8 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" -#include "qpid/broker/QueuePolicy.h" #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/QueueSettings.h" #include <iostream> #include <vector> @@ -56,196 +56,47 @@ using namespace qpid::sys; namespace qpid { namespace tests { - class TestConsumer : public virtual Consumer{ public: typedef boost::shared_ptr<TestConsumer> shared_ptr; - QueuedMessage last; + QueueCursor lastCursor; + Message lastMessage; bool received; - TestConsumer(std::string name="test", bool acquire = true):Consumer(name, acquire), received(false) {}; + TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER), received(false) {}; - virtual bool deliver(QueuedMessage& msg){ - last = msg; + virtual bool deliver(const QueueCursor& cursor, const Message& message){ + lastCursor = cursor; + lastMessage = message; received = true; return true; }; void notify() {} void cancel() {} - void acknowledged(const QueuedMessage&) {} + void acknowledged(const DeliveryRecord&) {} OwnershipToken* getSession() { return 0; } }; class FailOnDeliver : public Deliverable { - boost::intrusive_ptr<Message> msg; + Message msg; public: FailOnDeliver() : msg(MessageUtils::createMessage()) {} void deliverTo(const boost::shared_ptr<Queue>& queue) { throw Exception(QPID_MSG("Invalid delivery to " << queue->getName())); } - Message& getMessage() { return *(msg.get()); } + Message& getMessage() { return msg; } }; -intrusive_ptr<Message> createMessage(std::string exchange, std::string routingKey, uint64_t ttl = 0) { - intrusive_ptr<Message> msg(new Message()); - AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); - AMQFrame header((AMQHeaderBody())); - msg->getFrames().append(method); - msg->getFrames().append(header); - msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); - if (ttl) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setTtl(ttl); - return msg; -} - -intrusive_ptr<Message> contentMessage(string content) { - intrusive_ptr<Message> m(MessageUtils::createMessage()); - MessageUtils::addContent(m, content); - return m; -} - -string getContent(intrusive_ptr<Message> m) { - return m->getFrames().getContent(); -} - QPID_AUTO_TEST_SUITE(QueueTestSuite) -QPID_AUTO_TEST_CASE(testAsyncMessage) { - Queue::shared_ptr queue(new Queue("my_test_queue", true)); - intrusive_ptr<Message> received; - - TestConsumer::shared_ptr c1(new TestConsumer()); - queue->consume(c1); - - - //Test basic delivery: - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process - queue->process(msg1); - sleep(2); - - BOOST_CHECK(!c1->received); - msg1->enqueueComplete(); - - received = queue->get().payload; - BOOST_CHECK_EQUAL(msg1.get(), received.get()); -} - - -QPID_AUTO_TEST_CASE(testAsyncMessageCount){ - Queue::shared_ptr queue(new Queue("my_test_queue", true)); - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - msg1->enqueueAsync(queue, 0);//this is done on enqueue which is not called from process - - queue->process(msg1); - sleep(2); - uint32_t compval=0; - BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount()); - msg1->enqueueComplete(); - compval=1; - BOOST_CHECK_EQUAL(compval, queue->getEnqueueCompleteMessageCount()); - BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); -} - -QPID_AUTO_TEST_CASE(testConsumers){ - Queue::shared_ptr queue(new Queue("my_queue", true)); - - //Test adding consumers: - TestConsumer::shared_ptr c1(new TestConsumer()); - TestConsumer::shared_ptr c2(new TestConsumer()); - queue->consume(c1); - queue->consume(c2); - - BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount()); - - //Test basic delivery: - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - intrusive_ptr<Message> msg2 = createMessage("e", "B"); - intrusive_ptr<Message> msg3 = createMessage("e", "C"); - - queue->deliver(msg1); - BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg1.get(), c1->last.payload.get()); - - queue->deliver(msg2); - BOOST_CHECK(queue->dispatch(c2)); - BOOST_CHECK_EQUAL(msg2.get(), c2->last.payload.get()); - - c1->received = false; - queue->deliver(msg3); - BOOST_CHECK(queue->dispatch(c1)); - BOOST_CHECK_EQUAL(msg3.get(), c1->last.payload.get()); - - //Test cancellation: - queue->cancel(c1); - BOOST_CHECK_EQUAL(uint32_t(1), queue->getConsumerCount()); - queue->cancel(c2); - BOOST_CHECK_EQUAL(uint32_t(0), queue->getConsumerCount()); -} - -QPID_AUTO_TEST_CASE(testRegistry){ - //Test use of queues in registry: - QueueRegistry registry; - registry.declare("queue1", true, true); - registry.declare("queue2", true, true); - registry.declare("queue3", true, true); - - BOOST_CHECK(registry.find("queue1")); - BOOST_CHECK(registry.find("queue2")); - BOOST_CHECK(registry.find("queue3")); - - registry.destroy("queue1"); - registry.destroy("queue2"); - registry.destroy("queue3"); - - BOOST_CHECK(!registry.find("queue1")); - BOOST_CHECK(!registry.find("queue2")); - BOOST_CHECK(!registry.find("queue3")); -} - -QPID_AUTO_TEST_CASE(testDequeue){ - Queue::shared_ptr queue(new Queue("my_queue", true)); - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - intrusive_ptr<Message> msg2 = createMessage("e", "B"); - intrusive_ptr<Message> msg3 = createMessage("e", "C"); - intrusive_ptr<Message> received; - - queue->deliver(msg1); - queue->deliver(msg2); - queue->deliver(msg3); - - BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); - - received = queue->get().payload; - BOOST_CHECK_EQUAL(msg1.get(), received.get()); - BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount()); - - received = queue->get().payload; - BOOST_CHECK_EQUAL(msg2.get(), received.get()); - BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount()); - - TestConsumer::shared_ptr consumer(new TestConsumer()); - queue->consume(consumer); - queue->dispatch(consumer); - if (!consumer->received) - sleep(2); - - BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); - BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); - - received = queue->get().payload; - BOOST_CHECK(!received); - BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); - -} - QPID_AUTO_TEST_CASE(testBound){ //test the recording of bindings, and use of those to allow a queue to be unbound string key("my-key"); FieldTable args; - Queue::shared_ptr queue(new Queue("my-queue", true)); + Queue::shared_ptr queue(new Queue("my-queue")); ExchangeRegistry exchanges; //establish bindings from exchange->queue and notify the queue as it is bound: Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first; @@ -273,423 +124,69 @@ QPID_AUTO_TEST_CASE(testBound){ exchange3->route(deliverable); } -QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ - client::QueueOptions args; - args.setPersistLastNode(); - - Queue::shared_ptr queue(new Queue("my-queue", true)); - queue->configure(args); - - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - intrusive_ptr<Message> msg2 = createMessage("e", "B"); - intrusive_ptr<Message> msg3 = createMessage("e", "C"); - - //enqueue 2 messages - queue->deliver(msg1); - queue->deliver(msg2); - - //change mode - queue->setLastNodeFailure(); - - //enqueue 1 message - queue->deliver(msg3); - - //check all have persistent ids. - BOOST_CHECK(msg1->isPersistent()); - BOOST_CHECK(msg2->isPersistent()); - BOOST_CHECK(msg3->isPersistent()); - -} - - -QPID_AUTO_TEST_CASE(testSeek){ - - Queue::shared_ptr queue(new Queue("my-queue", true)); - - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - intrusive_ptr<Message> msg2 = createMessage("e", "B"); - intrusive_ptr<Message> msg3 = createMessage("e", "C"); - - //enqueue 2 messages - queue->deliver(msg1); - queue->deliver(msg2); - queue->deliver(msg3); - - TestConsumer::shared_ptr consumer(new TestConsumer("test", false)); - SequenceNumber seq(2); - consumer->setPosition(seq); - - QueuedMessage qm; - queue->dispatch(consumer); - - BOOST_CHECK_EQUAL(msg3.get(), consumer->last.payload.get()); - queue->dispatch(consumer); - queue->dispatch(consumer); // make sure over-run is safe - -} - -QPID_AUTO_TEST_CASE(testSearch){ - - Queue::shared_ptr queue(new Queue("my-queue", true)); - - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - intrusive_ptr<Message> msg2 = createMessage("e", "B"); - intrusive_ptr<Message> msg3 = createMessage("e", "C"); - - //enqueue 2 messages - queue->deliver(msg1); - queue->deliver(msg2); - queue->deliver(msg3); - - SequenceNumber seq(2); - QueuedMessage qm; - TestConsumer::shared_ptr c1(new TestConsumer()); - - BOOST_CHECK(queue->find(seq, qm)); +QPID_AUTO_TEST_CASE(testLVQ){ - BOOST_CHECK_EQUAL(seq.getValue(), qm.position.getValue()); - - queue->acquire(qm, c1->getName()); - BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); - SequenceNumber seq1(3); - QueuedMessage qm1; - BOOST_CHECK(queue->find(seq1, qm1)); - BOOST_CHECK_EQUAL(seq1.getValue(), qm1.position.getValue()); - -} -const std::string nullxid = ""; - -class SimpleDummyCtxt : public TransactionContext {}; + QueueSettings settings; + string key="key"; + settings.lvqKey = key; + QueueFactory factory; + Queue::shared_ptr q(factory.create("my-queue", settings)); -class DummyCtxt : public TPCTransactionContext -{ - const std::string xid; - public: - DummyCtxt(const std::string& _xid) : xid(_xid) {} - static std::string getXid(TransactionContext& ctxt) - { - DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt)); - return c ? c->xid : nullxid; + const char* values[] = { "a", "b", "c", "a"}; + for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) { + qpid::types::Variant::Map properties; + properties[key] = values[i]; + q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1))); } -}; - -class TestMessageStoreOC : public MessageStore -{ - std::set<std::string> prepared; - uint64_t nextPersistenceId; - public: - - uint enqCnt; - uint deqCnt; - bool error; - - TestMessageStoreOC() : MessageStore(),nextPersistenceId(1),enqCnt(0),deqCnt(0),error(false) {} - ~TestMessageStoreOC(){} + BOOST_CHECK_EQUAL(q->getMessageCount(), 3u); - virtual void dequeue(TransactionContext*, - const boost::intrusive_ptr<PersistableMessage>& /*msg*/, - const PersistableQueue& /*queue*/) - { - if (error) throw Exception("Dequeue error test"); - deqCnt++; - } + TestConsumer::shared_ptr c(new TestConsumer("test", true)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(std::string("2"), c->lastMessage.getContent()); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(std::string("3"), c->lastMessage.getContent()); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(std::string("4"), c->lastMessage.getContent()); - virtual void enqueue(TransactionContext*, - const boost::intrusive_ptr<PersistableMessage>& msg, - const PersistableQueue& /* queue */) - { - if (error) throw Exception("Enqueue error test"); - enqCnt++; - msg->enqueueComplete(); - } - void createError() - { - error=true; + const char* values2[] = { "a", "b", "c"}; + for (size_t i = 0; i < sizeof(values2)/sizeof(values2[0]); ++i) { + qpid::types::Variant::Map properties; + properties[key] = values[i]; + q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+5))); } + BOOST_CHECK_EQUAL(q->getMessageCount(), 3u); - bool init(const Options*) { return true; } - void truncateInit(const bool) {} - void create(PersistableQueue& queue, const framing::FieldTable&) { queue.setPersistenceId(nextPersistenceId++); } - void destroy(PersistableQueue&) {} - void create(const PersistableExchange& exchange, const framing::FieldTable&) { exchange.setPersistenceId(nextPersistenceId++); } - void destroy(const PersistableExchange&) {} - void bind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} - void unbind(const PersistableExchange&, const PersistableQueue&, const std::string&, const framing::FieldTable&) {} - void create(const PersistableConfig& config) { config.setPersistenceId(nextPersistenceId++); } - void destroy(const PersistableConfig&) {} - void stage(const boost::intrusive_ptr<PersistableMessage>&) {} - void destroy(PersistableMessage&) {} - void appendContent(const boost::intrusive_ptr<const PersistableMessage>&, const std::string&) {} - void loadContent(const qpid::broker::PersistableQueue&, const boost::intrusive_ptr<const PersistableMessage>&, - std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); } - void flush(const qpid::broker::PersistableQueue&) {} - uint32_t outstandingQueueAIO(const PersistableQueue&) { return 0; } - - std::auto_ptr<TransactionContext> begin() { return std::auto_ptr<TransactionContext>(new SimpleDummyCtxt()); } - std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) { return std::auto_ptr<TPCTransactionContext>(new DummyCtxt(xid)); } - void prepare(TPCTransactionContext& ctxt) { prepared.insert(DummyCtxt::getXid(ctxt)); } - void commit(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } - void abort(TransactionContext& ctxt) { prepared.erase(DummyCtxt::getXid(ctxt)); } - void collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); } - - void recover(RecoveryManager&) {} -}; - - -QPID_AUTO_TEST_CASE(testLVQOrdering){ - - client::QueueOptions args; - // set queue mode - args.setOrdering(client::LVQ); - - Queue::shared_ptr queue(new Queue("my-queue", true )); - queue->configure(args); - - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - intrusive_ptr<Message> msg2 = createMessage("e", "B"); - intrusive_ptr<Message> msg3 = createMessage("e", "C"); - intrusive_ptr<Message> msg4 = createMessage("e", "D"); - intrusive_ptr<Message> received; - - //set deliever match for LVQ a,b,c,a - - string key; - args.getLVQKey(key); - BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - - - msg1->insertCustomProperty(key,"a"); - msg2->insertCustomProperty(key,"b"); - msg3->insertCustomProperty(key,"c"); - msg4->insertCustomProperty(key,"a"); - - //enqueue 4 message - queue->deliver(msg1); - queue->deliver(msg2); - queue->deliver(msg3); - queue->deliver(msg4); - - BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); - - received = queue->get().payload; - BOOST_CHECK_EQUAL(msg4.get(), received.get()); - - received = queue->get().payload; - BOOST_CHECK_EQUAL(msg2.get(), received.get()); - - received = queue->get().payload; - BOOST_CHECK_EQUAL(msg3.get(), received.get()); - - intrusive_ptr<Message> msg5 = createMessage("e", "A"); - intrusive_ptr<Message> msg6 = createMessage("e", "B"); - intrusive_ptr<Message> msg7 = createMessage("e", "C"); - msg5->insertCustomProperty(key,"a"); - msg6->insertCustomProperty(key,"b"); - msg7->insertCustomProperty(key,"c"); - queue->deliver(msg5); - queue->deliver(msg6); - queue->deliver(msg7); - - BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); - - received = queue->get().payload; - BOOST_CHECK_EQUAL(msg5.get(), received.get()); - - received = queue->get().payload; - BOOST_CHECK_EQUAL(msg6.get(), received.get()); - - received = queue->get().payload; - BOOST_CHECK_EQUAL(msg7.get(), received.get()); - + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(std::string("5"), c->lastMessage.getContent()); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(std::string("6"), c->lastMessage.getContent()); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(std::string("7"), c->lastMessage.getContent()); } QPID_AUTO_TEST_CASE(testLVQEmptyKey){ - client::QueueOptions args; - // set queue mode - args.setOrdering(client::LVQ); - - Queue::shared_ptr queue(new Queue("my-queue", true )); - queue->configure(args); - - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - intrusive_ptr<Message> msg2 = createMessage("e", "B"); - - string key; - args.getLVQKey(key); - BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - - - msg1->insertCustomProperty(key,"a"); - queue->deliver(msg1); - queue->deliver(msg2); - BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); - -} - -QPID_AUTO_TEST_CASE(testLVQAcquire){ - - client::QueueOptions args; - // set queue mode - args.setOrdering(client::LVQ); - // disable flow control, as this test violates the enqueue/dequeue sequence. - args.setInt(QueueFlowLimit::flowStopCountKey, 0); - - Queue::shared_ptr queue(new Queue("my-queue", true )); - queue->configure(args); - - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - intrusive_ptr<Message> msg2 = createMessage("e", "B"); - intrusive_ptr<Message> msg3 = createMessage("e", "C"); - intrusive_ptr<Message> msg4 = createMessage("e", "D"); - intrusive_ptr<Message> msg5 = createMessage("e", "F"); - intrusive_ptr<Message> msg6 = createMessage("e", "G"); - - //set deliever match for LVQ a,b,c,a - - string key; - args.getLVQKey(key); - BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - - - msg1->insertCustomProperty(key,"a"); - msg2->insertCustomProperty(key,"b"); - msg3->insertCustomProperty(key,"c"); - msg4->insertCustomProperty(key,"a"); - msg5->insertCustomProperty(key,"b"); - msg6->insertCustomProperty(key,"c"); - - //enqueue 4 message - queue->deliver(msg1); - queue->deliver(msg2); - queue->deliver(msg3); - queue->deliver(msg4); - - BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); - - framing::SequenceNumber sequence(1); - QueuedMessage qmsg(queue.get(), msg1, sequence); - QueuedMessage qmsg2(queue.get(), msg2, ++sequence); - framing::SequenceNumber sequence1(10); - QueuedMessage qmsg3(queue.get(), 0, sequence1); - TestConsumer::shared_ptr dummy(new TestConsumer()); - - BOOST_CHECK(!queue->acquire(qmsg, dummy->getName())); - BOOST_CHECK(queue->acquire(qmsg2, dummy->getName())); - // Acquire the massage again to test failure case. - BOOST_CHECK(!queue->acquire(qmsg2, dummy->getName())); - BOOST_CHECK(!queue->acquire(qmsg3, dummy->getName())); - - BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); - - queue->deliver(msg5); - BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); - - // set mode to no browse and check - args.setOrdering(client::LVQ_NO_BROWSE); - queue->configure(args); - TestConsumer::shared_ptr c1(new TestConsumer("test", false)); - - queue->dispatch(c1); - queue->dispatch(c1); - queue->dispatch(c1); - - queue->deliver(msg6); - BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); - - intrusive_ptr<Message> received; - received = queue->get().payload; - BOOST_CHECK_EQUAL(msg4.get(), received.get()); - -} - -QPID_AUTO_TEST_CASE(testLVQMultiQueue){ - - client::QueueOptions args; - // set queue mode - args.setOrdering(client::LVQ); - - Queue::shared_ptr queue1(new Queue("my-queue", true )); - Queue::shared_ptr queue2(new Queue("my-queue", true )); - intrusive_ptr<Message> received; - queue1->configure(args); - queue2->configure(args); - - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - intrusive_ptr<Message> msg2 = createMessage("e", "A"); - - string key; - args.getLVQKey(key); - BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - - msg1->insertCustomProperty(key,"a"); - msg2->insertCustomProperty(key,"a"); - - queue1->deliver(msg1); - queue2->deliver(msg1); - queue1->deliver(msg2); - - received = queue1->get().payload; - BOOST_CHECK_EQUAL(msg2.get(), received.get()); - - received = queue2->get().payload; - BOOST_CHECK_EQUAL(msg1.get(), received.get()); + QueueSettings settings; + string key="key"; + settings.lvqKey = key; + QueueFactory factory; + Queue::shared_ptr q(factory.create("my-queue", settings)); -} -QPID_AUTO_TEST_CASE(testLVQRecover){ - -/* simulate this - 1. start 2 nodes - 2. create cluster durable lvq - 3. send a transient message to the queue - 4. kill one of the nodes (to trigger force persistent behaviour)... - 5. then restart it (to turn off force persistent behaviour) - 6. send another transient message with same lvq key as in 3 - 7. kill the second node again (retrigger force persistent) - 8. stop and recover the first node -*/ - TestMessageStoreOC testStore; - client::QueueOptions args; - // set queue mode - args.setOrdering(client::LVQ); - args.setPersistLastNode(); - - Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); - intrusive_ptr<Message> received; - queue1->create(args); - - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - intrusive_ptr<Message> msg2 = createMessage("e", "A"); - // 2 - string key; - args.getLVQKey(key); - BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); - - msg1->insertCustomProperty(key,"a"); - msg2->insertCustomProperty(key,"a"); - // 3 - queue1->deliver(msg1); - // 4 - queue1->setLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); - // 5 - queue1->clearLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); - // 6 - queue1->deliver(msg2); - BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); - queue1->setLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); - BOOST_CHECK_EQUAL(testStore.deqCnt, 1u); + qpid::types::Variant::Map properties; + properties["key"] = "a"; + q->deliver(MessageUtils::createMessage(properties, "one")); + properties.clear(); + q->deliver(MessageUtils::createMessage(properties, "two")); + BOOST_CHECK_EQUAL(q->getMessageCount(), 2u); } void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) { for (uint i = 0; i < count; i++) { - intrusive_ptr<Message> m = createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl); - m->computeExpiration(new broker::ExpiryPolicy); + Message m = MessageUtils::createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl); + m.computeExpiration(new broker::ExpiryPolicy); queue.deliver(m); } } @@ -706,7 +203,7 @@ QPID_AUTO_TEST_CASE(testPurgeExpired) { QPID_AUTO_TEST_CASE(testQueueCleaner) { Timer timer; QueueRegistry queues; - Queue::shared_ptr queue = queues.declare("my-queue").first; + Queue::shared_ptr queue = queues.declare("my-queue", QueueSettings()).first; addMessagesToQueue(10, *queue, 200, 400); BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); @@ -717,44 +214,57 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) { ::usleep(300*1000); BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); } - - namespace { - // helper for group tests - void verifyAcquire( Queue::shared_ptr queue, - TestConsumer::shared_ptr c, - std::deque<QueuedMessage>& results, - const std::string& expectedGroup, - const int expectedId ) - { - BOOST_CHECK(queue->dispatch(c)); - results.push_back(c->last); - std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); - int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); +int getIntProperty(const Message& message, const std::string& key) +{ + qpid::types::Variant v = message.getProperty(key); + int i(0); + if (!v.isVoid()) i = v; + return i; +} +// helper for group tests +void verifyAcquire( Queue::shared_ptr queue, + TestConsumer::shared_ptr c, + std::deque<QueueCursor>& results, + const std::string& expectedGroup, + const int expectedId ) +{ + bool success = queue->dispatch(c); + BOOST_CHECK(success); + if (success) { + results.push_back(c->lastCursor); + std::string group = c->lastMessage.getPropertyAsString("GROUP-ID"); + int id = getIntProperty(c->lastMessage, "MY-ID"); BOOST_CHECK_EQUAL( group, expectedGroup ); BOOST_CHECK_EQUAL( id, expectedId ); } } +Message createGroupMessage(int id, const std::string& group) +{ + qpid::types::Variant::Map properties; + properties["GROUP-ID"] = group; + properties["MY-ID"] = id; + return MessageUtils::createMessage(properties); +} +} + QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { // // Verify that consumers of grouped messages own the groups once a message is acquired, // and release the groups once all acquired messages have been dequeued or requeued // - FieldTable args; - Queue::shared_ptr queue(new Queue("my_queue", true)); - args.setString("qpid.group_header_key", "GROUP-ID"); - args.setInt("qpid.shared_msg_group", 1); - queue->configure(args); + QueueSettings settings; + settings.shareGroups = 1; + settings.groupKey = "GROUP-ID"; + QueueFactory factory; + Queue::shared_ptr queue(factory.create("my_queue", settings)); std::string groups[] = { std::string("a"), std::string("a"), std::string("a"), std::string("b"), std::string("b"), std::string("b"), std::string("c"), std::string("c"), std::string("c") }; for (int i = 0; i < 9; ++i) { - intrusive_ptr<Message> msg = createMessage("e", "A"); - msg->insertCustomProperty("GROUP-ID", groups[i]); - msg->insertCustomProperty("MY-ID", i); - queue->deliver(msg); + queue->deliver(createGroupMessage(i, groups[i])); } // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... @@ -768,8 +278,8 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { queue->consume(c1); queue->consume(c2); - std::deque<QueuedMessage> dequeMeC1; - std::deque<QueuedMessage> dequeMeC2; + std::deque<QueueCursor> dequeMeC1; + std::deque<QueueCursor> dequeMeC2; verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0) @@ -828,9 +338,9 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2 // what happens if C-2 "requeues" a-1 and a-2? - queue->requeue( dequeMeC2.front() ); + queue->release( dequeMeC2.front() ); dequeMeC2.pop_front(); - queue->requeue( dequeMeC2.front() ); + queue->release( dequeMeC2.front() ); dequeMeC2.pop_front(); // now just has c-7 acquired // Queue = a-1, a-2, b-4, b-5, c-7, c-8... @@ -855,9 +365,9 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { gotOne = queue->dispatch(c2); BOOST_CHECK( !gotOne ); - // requeue all of C1's acquired messages, then cancel C1 + // release all of C1's acquired messages, then cancel C1 while (!dequeMeC1.empty()) { - queue->requeue(dequeMeC1.front()); + queue->release(dequeMeC1.front()); dequeMeC1.pop_front(); } queue->cancel(c1); @@ -877,7 +387,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { // Owners= ---, ---, --- TestConsumer::shared_ptr c3(new TestConsumer("C3")); - std::deque<QueuedMessage> dequeMeC3; + std::deque<QueueCursor> dequeMeC3; verifyAcquire(queue, c3, dequeMeC3, "a", 2 ); verifyAcquire(queue, c2, dequeMeC2, "b", 4 ); @@ -897,11 +407,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { // Queue = a-2, // Owners= ^C3, - - intrusive_ptr<Message> msg = createMessage("e", "A"); - msg->insertCustomProperty("GROUP-ID", "a"); - msg->insertCustomProperty("MY-ID", 9); - queue->deliver(msg); + queue->deliver(createGroupMessage(9, "a")); // Queue = a-2, a-9 // Owners= ^C3, ^C3 @@ -909,10 +415,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { gotOne = queue->dispatch(c2); BOOST_CHECK( !gotOne ); - msg = createMessage("e", "A"); - msg->insertCustomProperty("GROUP-ID", "b"); - msg->insertCustomProperty("MY-ID", 10); - queue->deliver(msg); + queue->deliver(createGroupMessage(10, "b")); // Queue = a-2, a-9, b-10 // Owners= ^C3, ^C3, ---- @@ -933,17 +436,17 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { // Verify that the same default group name is automatically applied to messages that // do not specify a group name. // - FieldTable args; - Queue::shared_ptr queue(new Queue("my_queue", true)); - args.setString("qpid.group_header_key", "GROUP-ID"); - args.setInt("qpid.shared_msg_group", 1); - queue->configure(args); + QueueSettings settings; + settings.shareGroups = 1; + settings.groupKey = "GROUP-ID"; + QueueFactory factory; + Queue::shared_ptr queue(factory.create("my_queue", settings)); for (int i = 0; i < 3; ++i) { - intrusive_ptr<Message> msg = createMessage("e", "A"); + qpid::types::Variant::Map properties; // no "GROUP-ID" header - msg->insertCustomProperty("MY-ID", i); - queue->deliver(msg); + properties["MY-ID"] = i; + queue->deliver(MessageUtils::createMessage(properties)); } // Queue = 0, 1, 2 @@ -956,20 +459,20 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { queue->consume(c1); queue->consume(c2); - std::deque<QueuedMessage> dequeMeC1; - std::deque<QueuedMessage> dequeMeC2; + std::deque<QueueCursor> dequeMeC1; + std::deque<QueueCursor> dequeMeC2; queue->dispatch(c1); // c1 now owns default group (acquired 0) - dequeMeC1.push_back(c1->last); - int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + dequeMeC1.push_back(c1->lastCursor); + int id = getIntProperty(c1->lastMessage, "MY-ID"); BOOST_CHECK_EQUAL( id, 0 ); bool gotOne = queue->dispatch(c2); // c2 should get nothing BOOST_CHECK( !gotOne ); queue->dispatch(c1); // c1 now acquires 1 - dequeMeC1.push_back(c1->last); - id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + dequeMeC1.push_back(c1->lastCursor); + id = getIntProperty(c1->lastMessage, "MY-ID"); BOOST_CHECK_EQUAL( id, 1 ); gotOne = queue->dispatch(c2); // c2 should still get nothing @@ -982,7 +485,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { // now default group should be available... queue->dispatch(c2); // c2 now owns default group (acquired 2) - id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + id = c2->lastMessage.getProperty("MY-ID"); BOOST_CHECK_EQUAL( id, 2 ); gotOne = queue->dispatch(c1); // c1 should get nothing @@ -992,556 +495,128 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { queue->cancel(c2); } -QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ - - TestMessageStoreOC testStore; - client::QueueOptions args; - args.setPersistLastNode(); - - Queue::shared_ptr queue1(new Queue("queue1", true, &testStore )); - queue1->create(args); - Queue::shared_ptr queue2(new Queue("queue2", true, &testStore )); - queue2->create(args); - - intrusive_ptr<Message> msg1 = createMessage("e", "A"); - - queue1->deliver(msg1); - queue2->deliver(msg1); - - //change mode - queue1->setLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); - queue2->setLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); - - // check they don't get stored twice - queue1->setLastNodeFailure(); - queue2->setLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); - - intrusive_ptr<Message> msg2 = createMessage("e", "B"); - queue1->deliver(msg2); - queue2->deliver(msg2); - - queue1->clearLastNodeFailure(); - queue2->clearLastNodeFailure(); - // check only new messages get forced - queue1->setLastNodeFailure(); - queue2->setLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 4u); - - // check no failure messages are stored - queue1->clearLastNodeFailure(); - queue2->clearLastNodeFailure(); - - intrusive_ptr<Message> msg3 = createMessage("e", "B"); - queue1->deliver(msg3); - queue2->deliver(msg3); - BOOST_CHECK_EQUAL(testStore.enqCnt, 4u); - queue1->setLastNodeFailure(); - queue2->setLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 6u); - - /** - * TODO: Fix or replace the following test which incorrectly requeues a - * message that was never on the queue in the first place. This relied on - * internal details not part of the queue abstraction. - - // check requeue 1 - intrusive_ptr<Message> msg4 = createMessage("e", "C"); - intrusive_ptr<Message> msg5 = createMessage("e", "D"); - - framing::SequenceNumber sequence(1); - QueuedMessage qmsg1(queue1.get(), msg4, sequence); - QueuedMessage qmsg2(queue2.get(), msg5, ++sequence); - - queue1->requeue(qmsg1); - BOOST_CHECK_EQUAL(testStore.enqCnt, 7u); - - // check requeue 2 - queue2->clearLastNodeFailure(); - queue2->requeue(qmsg2); - BOOST_CHECK_EQUAL(testStore.enqCnt, 7u); - queue2->setLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 8u); - - queue2->clearLastNodeFailure(); - queue2->setLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 8u); - */ -} - -QPID_AUTO_TEST_CASE(testLastNodeRecoverAndFail){ -/* -simulate this: - 1. start two nodes - 2. create cluster durable queue and add some messages - 3. kill one node (trigger force-persistent behaviour) - 4. stop and recover remaining node - 5. add another node - 6. kill that new node again -make sure that an attempt to re-enqueue a message does not happen which will -result in the last man standing exiting with an error. - -we need to make sure that recover is safe, i.e. messages are -not requeued to the store. -*/ - TestMessageStoreOC testStore; - client::QueueOptions args; - // set queue mode - args.setPersistLastNode(); - - Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); - intrusive_ptr<Message> received; - queue1->create(args); - - // check requeue 1 - intrusive_ptr<Message> msg1 = createMessage("e", "C"); - intrusive_ptr<Message> msg2 = createMessage("e", "D"); - - queue1->recover(msg1); - - queue1->setLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); - - queue1->clearLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); - - queue1->deliver(msg2); - BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); - queue1->setLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 1u); - -} - -QPID_AUTO_TEST_CASE(testLastNodeJournalError){ -/* -simulate store exception going into last node standing - -*/ - TestMessageStoreOC testStore; - client::QueueOptions args; - // set queue mode - args.setPersistLastNode(); - - Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore)); - intrusive_ptr<Message> received; - queue1->configure(args); - - // check requeue 1 - intrusive_ptr<Message> msg1 = createMessage("e", "C"); - - queue1->deliver(msg1); - testStore.createError(); - - ScopedSuppressLogging sl; // Suppress messages for expected errors. - queue1->setLastNodeFailure(); - BOOST_CHECK_EQUAL(testStore.enqCnt, 0u); - -} - -intrusive_ptr<Message> mkMsg(MessageStore& store, std::string content = "", bool durable = false) -{ - intrusive_ptr<Message> msg = MessageUtils::createMessage("", "", durable); - if (content.size()) MessageUtils::addContent(msg, content); - msg->setStore(&store); - return msg; -} - -QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ - - TestMessageStoreOC testStore; - client::QueueOptions args0; // No size policy - client::QueueOptions args1; - args1.setSizePolicy(FLOW_TO_DISK, 0, 1); - client::QueueOptions args2; - args2.setSizePolicy(FLOW_TO_DISK, 0, 2); - - // --- Fanout exchange bound to single transient queue ------------------------------------------------------------- - - FanOutExchange sbtFanout1("sbtFanout1", false, args0); // single binding to transient queue - Queue::shared_ptr tq1(new Queue("tq1", true)); // transient w/ limit - tq1->configure(args1); - sbtFanout1.bind(tq1, "", 0); - - intrusive_ptr<Message> msg01 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content - DeliverableMessage dmsg01(msg01); - sbtFanout1.route(dmsg01); // Brings queue 1 to capacity limit - msg01->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg01->isContentReleased(), false); - BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); - - intrusive_ptr<Message> msg02 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content - DeliverableMessage dmsg02(msg02); - { - ScopedSuppressLogging sl; // suppress expected error messages. - BOOST_CHECK_THROW(sbtFanout1.route(dmsg02), ResourceLimitExceededException); - } - msg02->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg02->isContentReleased(), false); - BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); - - intrusive_ptr<Message> msg03 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content - DeliverableMessage dmsg03(msg03); - { - ScopedSuppressLogging sl; // suppress expected error messages. - BOOST_CHECK_THROW(sbtFanout1.route(dmsg03), ResourceLimitExceededException); - } - msg03->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg03->isContentReleased(), false); - BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); - - intrusive_ptr<Message> msg04 = mkMsg(testStore); // transient no content - DeliverableMessage dmsg04(msg04); - { - ScopedSuppressLogging sl; // suppress expected error messages. - BOOST_CHECK_THROW(sbtFanout1.route(dmsg04), ResourceLimitExceededException); - } - msg04->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg04->isContentReleased(), false); - BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); - - intrusive_ptr<Message> msg05 = mkMsg(testStore, "", true); // durable no content - DeliverableMessage dmsg05(msg05); - { - ScopedSuppressLogging sl; // suppress expected error messages. - BOOST_CHECK_THROW(sbtFanout1.route(dmsg05), ResourceLimitExceededException); - } - msg05->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg05->isContentReleased(), false); - BOOST_CHECK_EQUAL(1u, tq1->getMessageCount()); - - // --- Fanout exchange bound to single durable queue --------------------------------------------------------------- - - FanOutExchange sbdFanout2("sbdFanout2", false, args0); // single binding to durable queue - Queue::shared_ptr dq2(new Queue("dq2", true, &testStore)); // durable w/ limit - dq2->configure(args1); - sbdFanout2.bind(dq2, "", 0); - - intrusive_ptr<Message> msg06 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content - DeliverableMessage dmsg06(msg06); - sbdFanout2.route(dmsg06); // Brings queue 2 to capacity limit - msg06->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg06->isContentReleased(), false); - BOOST_CHECK_EQUAL(1u, dq2->getMessageCount()); - - intrusive_ptr<Message> msg07 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content - DeliverableMessage dmsg07(msg07); - sbdFanout2.route(dmsg07); - msg07->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg07->isContentReleased(), true); - BOOST_CHECK_EQUAL(2u, dq2->getMessageCount()); - - intrusive_ptr<Message> msg08 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content - DeliverableMessage dmsg08(msg08); - sbdFanout2.route(dmsg08); - msg08->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg08->isContentReleased(), true); - BOOST_CHECK_EQUAL(3u, dq2->getMessageCount()); - - intrusive_ptr<Message> msg09 = mkMsg(testStore); // transient no content - DeliverableMessage dmsg09(msg09); - sbdFanout2.route(dmsg09); - msg09->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg09->isContentReleased(), true); - BOOST_CHECK_EQUAL(4u, dq2->getMessageCount()); - - intrusive_ptr<Message> msg10 = mkMsg(testStore, "", true); // durable no content - DeliverableMessage dmsg10(msg10); - sbdFanout2.route(dmsg10); - msg10->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg10->isContentReleased(), true); - BOOST_CHECK_EQUAL(5u, dq2->getMessageCount()); - - // --- Fanout exchange bound to multiple durable queues ------------------------------------------------------------ - - FanOutExchange mbdFanout3("mbdFanout3", false, args0); // multiple bindings to durable queues - Queue::shared_ptr dq3(new Queue("dq3", true, &testStore)); // durable w/ limit 2 - dq3->configure(args2); - mbdFanout3.bind(dq3, "", 0); - Queue::shared_ptr dq4(new Queue("dq4", true, &testStore)); // durable w/ limit 1 - dq4->configure(args1); - mbdFanout3.bind(dq4, "", 0); - Queue::shared_ptr dq5(new Queue("dq5", true, &testStore)); // durable no limit - dq5->configure(args0); - mbdFanout3.bind(dq5, "", 0); - - intrusive_ptr<Message> msg11 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content - DeliverableMessage dmsg11(msg11); - mbdFanout3.route(dmsg11); // Brings queues 3 and 4 to capacity limit - msg11->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg11->isContentReleased(), false); - BOOST_CHECK_EQUAL(1u, dq3->getMessageCount()); - BOOST_CHECK_EQUAL(1u, dq4->getMessageCount()); - BOOST_CHECK_EQUAL(1u, dq5->getMessageCount()); - - intrusive_ptr<Message> msg12 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content - DeliverableMessage dmsg12(msg12); - mbdFanout3.route(dmsg12); - msg12->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg12->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point! - BOOST_CHECK_EQUAL(2u, dq3->getMessageCount()); - BOOST_CHECK_EQUAL(2u, dq4->getMessageCount()); - BOOST_CHECK_EQUAL(2u, dq5->getMessageCount()); - - intrusive_ptr<Message> msg13 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content - DeliverableMessage dmsg13(msg13); - mbdFanout3.route(dmsg13); - msg13->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg13->isContentReleased(), true); - BOOST_CHECK_EQUAL(3u, dq3->getMessageCount()); - BOOST_CHECK_EQUAL(3u, dq4->getMessageCount()); - BOOST_CHECK_EQUAL(3u, dq5->getMessageCount()); - - intrusive_ptr<Message> msg14 = mkMsg(testStore); // transient no content - DeliverableMessage dmsg14(msg14); - mbdFanout3.route(dmsg14); - msg14->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg14->isContentReleased(), false); // XXXX - consequence of transient msg multi-queue ftd policy-handling limitations, fix in broker at some point! - BOOST_CHECK_EQUAL(4u, dq3->getMessageCount()); - BOOST_CHECK_EQUAL(4u, dq4->getMessageCount()); - BOOST_CHECK_EQUAL(4u, dq5->getMessageCount()); - - intrusive_ptr<Message> msg15 = mkMsg(testStore, "", true); // durable no content - DeliverableMessage dmsg15(msg15); - mbdFanout3.route(dmsg15); - msg15->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg15->isContentReleased(), true); - BOOST_CHECK_EQUAL(5u, dq3->getMessageCount()); - BOOST_CHECK_EQUAL(5u, dq4->getMessageCount()); - BOOST_CHECK_EQUAL(5u, dq5->getMessageCount()); - - // Bind a transient queue, this should block the release of any further messages. - // Note: this will result in a violation of the count policy of dq3 and dq4 - but this - // is expected until a better overall multi-queue design is implemented. Similarly - // for the other tests in this section. - - Queue::shared_ptr tq6(new Queue("tq6", true)); // transient no limit - tq6->configure(args0); - mbdFanout3.bind(tq6, "", 0); - - intrusive_ptr<Message> msg16 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content - DeliverableMessage dmsg16(msg16); - mbdFanout3.route(dmsg16); - msg16->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg16->isContentReleased(), false); - BOOST_CHECK_EQUAL(6u, dq3->getMessageCount()); - BOOST_CHECK_EQUAL(6u, dq4->getMessageCount()); - BOOST_CHECK_EQUAL(6u, dq5->getMessageCount()); - - intrusive_ptr<Message> msg17 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content - DeliverableMessage dmsg17(msg17); - mbdFanout3.route(dmsg17); - msg17->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg17->isContentReleased(), false); - BOOST_CHECK_EQUAL(7u, dq3->getMessageCount()); - BOOST_CHECK_EQUAL(7u, dq4->getMessageCount()); - BOOST_CHECK_EQUAL(7u, dq5->getMessageCount()); - - intrusive_ptr<Message> msg18 = mkMsg(testStore); // transient no content - DeliverableMessage dmsg18(msg18); - mbdFanout3.route(dmsg18); - msg18->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg18->isContentReleased(), false); - BOOST_CHECK_EQUAL(8u, dq3->getMessageCount()); - BOOST_CHECK_EQUAL(8u, dq4->getMessageCount()); - BOOST_CHECK_EQUAL(8u, dq5->getMessageCount()); - - intrusive_ptr<Message> msg19 = mkMsg(testStore, "", true); // durable no content - DeliverableMessage dmsg19(msg19); - mbdFanout3.route(dmsg19); - msg19->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg19->isContentReleased(), false); - BOOST_CHECK_EQUAL(9u, dq3->getMessageCount()); - BOOST_CHECK_EQUAL(9u, dq4->getMessageCount()); - BOOST_CHECK_EQUAL(9u, dq5->getMessageCount()); - - - // --- Fanout exchange bound to multiple durable and transient queues ---------------------------------------------- - - FanOutExchange mbmFanout4("mbmFanout4", false, args0); // multiple bindings to durable/transient queues - Queue::shared_ptr dq7(new Queue("dq7", true, &testStore)); // durable no limit - dq7->configure(args0); - mbmFanout4.bind(dq7, "", 0); - Queue::shared_ptr dq8(new Queue("dq8", true, &testStore)); // durable w/ limit - dq8->configure(args1); - mbmFanout4.bind(dq8, "", 0); - Queue::shared_ptr tq9(new Queue("tq9", true)); // transient no limit - tq9->configure(args0); - mbmFanout4.bind(tq9, "", 0); - - intrusive_ptr<Message> msg20 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content - DeliverableMessage dmsg20(msg20); - mbmFanout4.route(dmsg20); // Brings queue 7 to capacity limit - msg20->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg20->isContentReleased(), false); - BOOST_CHECK_EQUAL(1u, dq7->getMessageCount()); - BOOST_CHECK_EQUAL(1u, dq8->getMessageCount()); - BOOST_CHECK_EQUAL(1u, tq9->getMessageCount()); - - intrusive_ptr<Message> msg21 = mkMsg(testStore, std::string(5, 'X')); // transient w/ content - DeliverableMessage dmsg21(msg21); - mbmFanout4.route(dmsg21); - msg21->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg21->isContentReleased(), false); - BOOST_CHECK_EQUAL(2u, dq7->getMessageCount()); // over limit - BOOST_CHECK_EQUAL(2u, dq8->getMessageCount()); - BOOST_CHECK_EQUAL(2u, tq9->getMessageCount()); - - intrusive_ptr<Message> msg22 = mkMsg(testStore, std::string(5, 'X'), true); // durable w/ content - DeliverableMessage dmsg22(msg22); - mbmFanout4.route(dmsg22); - msg22->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg22->isContentReleased(), false); - BOOST_CHECK_EQUAL(3u, dq7->getMessageCount()); // over limit - BOOST_CHECK_EQUAL(3u, dq8->getMessageCount()); // over limit - BOOST_CHECK_EQUAL(3u, tq9->getMessageCount()); - - intrusive_ptr<Message> msg23 = mkMsg(testStore); // transient no content - DeliverableMessage dmsg23(msg23); - mbmFanout4.route(dmsg23); - msg23->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg23->isContentReleased(), false); - BOOST_CHECK_EQUAL(4u, dq7->getMessageCount()); // over limit - BOOST_CHECK_EQUAL(4u, dq8->getMessageCount()); // over limit - BOOST_CHECK_EQUAL(4u, tq9->getMessageCount()); - - intrusive_ptr<Message> msg24 = mkMsg(testStore, "", true); // durable no content - DeliverableMessage dmsg24(msg24); - mbmFanout4.route(dmsg24); - msg24->tryReleaseContent(); - BOOST_CHECK_EQUAL(msg24->isContentReleased(), false); - BOOST_CHECK_EQUAL(5u, dq7->getMessageCount()); // over limit - BOOST_CHECK_EQUAL(5u, dq8->getMessageCount()); // over limit - BOOST_CHECK_EQUAL(5u, tq9->getMessageCount()); -} - QPID_AUTO_TEST_CASE(testSetPositionFifo) { Queue::shared_ptr q(new Queue("my-queue", true)); BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0)); for (int i = 0; i < 10; ++i) - q->deliver(contentMessage(boost::lexical_cast<string>(i+1))); + q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), boost::lexical_cast<string>(i+1))); // Verify the front of the queue TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(1u, c->last.position); // Numbered from 1 - BOOST_CHECK_EQUAL("1", getContent(c->last.payload)); + BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); // Numbered from 1 + BOOST_CHECK_EQUAL("1", c->lastMessage.getContent()); + // Verify the back of the queue - QueuedMessage qm; BOOST_CHECK_EQUAL(10u, q->getPosition()); - BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue - BOOST_CHECK_EQUAL("10", getContent(qm.payload)); BOOST_CHECK_EQUAL(10u, q->getMessageCount()); // Using setPosition to introduce a gap in sequence numbers. q->setPosition(15); BOOST_CHECK_EQUAL(10u, q->getMessageCount()); BOOST_CHECK_EQUAL(15u, q->getPosition()); - BOOST_CHECK(q->find(10, qm)); // Back of the queue - BOOST_CHECK_EQUAL("10", getContent(qm.payload)); - q->deliver(contentMessage("16")); - c->setPosition(9); + q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "16")); + + q->seek(*c, Queue::MessagePredicate(), 9); BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(10u, c->last.position); - BOOST_CHECK_EQUAL("10", getContent(c->last.payload)); + BOOST_CHECK_EQUAL(10u, c->lastMessage.getSequence()); + BOOST_CHECK_EQUAL("10", c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(16u, c->last.position); - BOOST_CHECK_EQUAL("16", getContent(c->last.payload)); + BOOST_CHECK_EQUAL(16u, c->lastMessage.getSequence()); + BOOST_CHECK_EQUAL("16", c->lastMessage.getContent()); // Using setPosition to trunkcate the queue q->setPosition(5); BOOST_CHECK_EQUAL(5u, q->getMessageCount()); - q->deliver(contentMessage("6a")); - c->setPosition(4); + q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "6a")); + c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire + q->seek(*c, Queue::MessagePredicate(), 4); BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(5u, c->last.position); - BOOST_CHECK_EQUAL("5", getContent(c->last.payload)); + BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence()); + BOOST_CHECK_EQUAL("5", c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(6u, c->last.position); - BOOST_CHECK_EQUAL("6a", getContent(c->last.payload)); + BOOST_CHECK_EQUAL(6u, c->lastMessage.getSequence()); + BOOST_CHECK_EQUAL("6a", c->lastMessage.getContent()); BOOST_CHECK(!q->dispatch(c)); // No more messages. } QPID_AUTO_TEST_CASE(testSetPositionLvq) { - Queue::shared_ptr q(new Queue("my-queue", true)); + QueueSettings settings; string key="key"; - framing::FieldTable args; - args.setString("qpid.last_value_queue_key", "key"); - q->configure(args); + settings.lvqKey = key; + QueueFactory factory; + Queue::shared_ptr q(factory.create("my-queue", settings)); const char* values[] = { "a", "b", "c", "a", "b", "c" }; for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) { - intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1)); - m->insertCustomProperty(key, values[i]); - q->deliver(m); + qpid::types::Variant::Map properties; + properties[key] = values[i]; + q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1))); } BOOST_CHECK_EQUAL(3u, q->getMessageCount()); // Verify the front of the queue TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(4u, c->last.position); // Numbered from 1 - BOOST_CHECK_EQUAL("4", getContent(c->last.payload)); + BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); // Numbered from 1 + BOOST_CHECK_EQUAL("4", c->lastMessage.getContent()); // Verify the back of the queue - QueuedMessage qm; BOOST_CHECK_EQUAL(6u, q->getPosition()); - BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue - BOOST_CHECK_EQUAL("6", getContent(qm.payload)); q->setPosition(5); - c->setPosition(4); + + c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire + q->seek(*c, Queue::MessagePredicate(), 4); BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(5u, c->last.position); // Numbered from 1 + BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence()); // Numbered from 1 BOOST_CHECK(!q->dispatch(c)); } QPID_AUTO_TEST_CASE(testSetPositionPriority) { - Queue::shared_ptr q(new Queue("my-queue", true)); - framing::FieldTable args; - args.setInt("qpid.priorities", 10); - q->configure(args); + QueueSettings settings; + settings.priorities = 10; + QueueFactory factory; + Queue::shared_ptr q(factory.create("my-queue", settings)); const int priorities[] = { 1, 2, 3, 2, 1, 3 }; for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) { - intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1)); - m->getFrames().getHeaders()->get<DeliveryProperties>(true) - ->setPriority(priorities[i]); - q->deliver(m); + qpid::types::Variant::Map properties; + properties["priority"] = priorities[i]; + q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1))); } // Truncation removes messages in fifo order, not priority order. q->setPosition(3); - TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in FIFO order + TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in priority order BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(1u, c->last.position); + BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence()); BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(2u, c->last.position); + BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence()); BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(3u, c->last.position); + BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); BOOST_CHECK(!q->dispatch(c)); - intrusive_ptr<Message> m = contentMessage("4a"); - m->getFrames().getHeaders()->get<DeliveryProperties>(true) - ->setPriority(4); - q->deliver(m); + qpid::types::Variant::Map properties; + properties["priority"] = 4; + q->deliver(MessageUtils::createMessage(properties, "4a")); + BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(4u, c->last.position); - BOOST_CHECK_EQUAL("4a", getContent(c->last.payload)); + BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); + BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent()); // But consumers see priority order c.reset(new TestConsumer("test", true)); BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(4u, c->last.position); - BOOST_CHECK_EQUAL("4a", getContent(c->last.payload)); + BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); + BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(3u, c->last.position); - BOOST_CHECK_EQUAL("3", getContent(c->last.payload)); + BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence()); + BOOST_CHECK_EQUAL("3", c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(2u, c->last.position); - BOOST_CHECK_EQUAL("2", getContent(c->last.payload)); + BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence()); + BOOST_CHECK_EQUAL("2", c->lastMessage.getContent()); BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(1u, c->last.position); - BOOST_CHECK_EQUAL("1", getContent(c->last.payload)); + BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); + BOOST_CHECK_EQUAL("1", c->lastMessage.getContent()); } QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/ReplicationTest.cpp b/qpid/cpp/src/tests/ReplicationTest.cpp deleted file mode 100644 index 055f06579f..0000000000 --- a/qpid/cpp/src/tests/ReplicationTest.cpp +++ /dev/null @@ -1,144 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "unit_test.h" -#include "test_tools.h" -#include "config.h" -#include "BrokerFixture.h" - -#include "qpid/Plugin.h" -#include "qpid/broker/Broker.h" -#include "qpid/client/QueueOptions.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/SequenceNumber.h" -#include "qpid/replication/constants.h" -#include "qpid/sys/Shlib.h" - -#include <string> -#include <sstream> -#include <vector> - -#include <boost/assign.hpp> -#include <boost/bind.hpp> - -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::replication::constants; -using boost::assign::list_of; - -namespace qpid { -namespace tests { - -QPID_AUTO_TEST_SUITE(ReplicationTestSuite) - -// FIXME aconway 2009-11-26: clean this up. -// The CMake-based build passes in the module suffix; if it's not there, this -// is a Linux/UNIX libtool-based build. -#if defined (QPID_MODULE_PREFIX) && defined (QPID_MODULE_SUFFIX) -static const char *default_shlib = - QPID_MODULE_PREFIX "replicating_listener" QPID_MODULE_POSTFIX QPID_MODULE_SUFFIX; -#else -static const char *default_shlib = ".libs/replicating_listener.so"; -#endif -qpid::sys::Shlib plugin(getLibPath("REPLICATING_LISTENER_LIB", default_shlib)); - -qpid::broker::Broker::Options getBrokerOpts(const std::vector<std::string>& args) -{ - std::vector<const char*> argv(args.size()); - transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1)); - - qpid::broker::Broker::Options opts; - qpid::Plugin::addOptions(opts); - opts.parse(argv.size(), &argv[0], "", true); - return opts; -} - -QPID_AUTO_TEST_CASE(testReplicationExchange) -{ - qpid::broker::Broker::Options brokerOpts(getBrokerOpts(list_of<std::string>("qpidd") - ("--replication-exchange-name=qpid.replication"))); - SessionFixture f(brokerOpts); - - - std::string dataQ("queue-1"); - std::string eventQ("event-queue-1"); - std::string dataQ2("queue-2"); - std::string eventQ2("event-queue-2"); - FieldTable eventQopts; - eventQopts.setString("qpid.insert_sequence_numbers", REPLICATION_EVENT_SEQNO); - - f.session.queueDeclare(arg::queue=eventQ, arg::exclusive=true, arg::autoDelete=true, arg::arguments=eventQopts); - f.session.exchangeBind(arg::exchange="qpid.replication", arg::queue=eventQ, arg::bindingKey=dataQ); - - f.session.queueDeclare(arg::queue=eventQ2, arg::exclusive=true, arg::autoDelete=true, arg::arguments=eventQopts); - f.session.exchangeBind(arg::exchange="qpid.replication", arg::queue=eventQ2, arg::bindingKey=dataQ2); - - QueueOptions args; - args.enableQueueEvents(false); - f.session.queueDeclare(arg::queue=dataQ, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); - f.session.queueDeclare(arg::queue=dataQ2, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); - for (int i = 0; i < 10; i++) { - f.session.messageTransfer(arg::content=Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), dataQ)); - f.session.messageTransfer(arg::content=Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), dataQ2)); - } - Message msg; - LocalQueue incoming; - Subscription sub = f.subs.subscribe(incoming, dataQ); - for (int i = 0; i < 10; i++) { - BOOST_CHECK(incoming.get(msg, qpid::sys::TIME_SEC)); - BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData()); - } - BOOST_CHECK(!f.subs.get(msg, dataQ)); - - sub.cancel(); - sub = f.subs.subscribe(incoming, eventQ); - //check that we received enqueue events for first queue: - for (int i = 0; i < 10; i++) { - BOOST_CHECK(incoming.get(msg, qpid::sys::TIME_SEC)); - BOOST_CHECK_EQUAL(msg.getHeaders().getAsString(REPLICATION_TARGET_QUEUE), dataQ); - BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt(REPLICATION_EVENT_TYPE), ENQUEUE); - BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt64(REPLICATION_EVENT_SEQNO), (i+1)); - BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData()); - } - //check that we received dequeue events for first queue: - for (int i = 0; i < 10; i++) { - BOOST_CHECK(incoming.get(msg, qpid::sys::TIME_SEC)); - BOOST_CHECK_EQUAL(msg.getHeaders().getAsString(REPLICATION_TARGET_QUEUE), dataQ); - BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt(REPLICATION_EVENT_TYPE), DEQUEUE); - BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt(DEQUEUED_MESSAGE_POSITION), (i+1)); - BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt64(REPLICATION_EVENT_SEQNO), (i+11)); - } - - sub.cancel(); - sub = f.subs.subscribe(incoming, eventQ2); - //check that we received enqueue events for second queue: - for (int i = 0; i < 10; i++) { - BOOST_CHECK(incoming.get(msg, qpid::sys::TIME_SEC)); - BOOST_CHECK_EQUAL(msg.getHeaders().getAsString(REPLICATION_TARGET_QUEUE), dataQ2); - BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt(REPLICATION_EVENT_TYPE), ENQUEUE); - BOOST_CHECK_EQUAL(msg.getHeaders().getAsInt64(REPLICATION_EVENT_SEQNO), (i+1)); - BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData()); - } -} - - -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/TxMocks.h b/qpid/cpp/src/tests/TxMocks.h index 72cb50cd21..bf21104f70 100644 --- a/qpid/cpp/src/tests/TxMocks.h +++ b/qpid/cpp/src/tests/TxMocks.h @@ -119,8 +119,6 @@ public: assertEqualVector(expected, actual); } - void accept(TxOpConstVisitor&) const {} - ~MockTxOp(){} }; diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp deleted file mode 100644 index a636646035..0000000000 --- a/qpid/cpp/src/tests/TxPublishTest.cpp +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/NullMessageStore.h" -#include "qpid/broker/RecoveryManager.h" -#include "qpid/broker/TxPublish.h" -#include "unit_test.h" -#include <iostream> -#include <list> -#include <vector> -#include "MessageUtils.h" -#include "TestMessageStore.h" - -using std::list; -using std::pair; -using std::vector; -using boost::intrusive_ptr; -using namespace qpid::broker; -using namespace qpid::framing; - -namespace qpid { -namespace tests { - -struct TxPublishTest -{ - - TestMessageStore store; - Queue::shared_ptr queue1; - Queue::shared_ptr queue2; - intrusive_ptr<Message> msg; - TxPublish op; - - TxPublishTest() : - queue1(new Queue("queue1", false, &store, 0)), - queue2(new Queue("queue2", false, &store, 0)), - msg(MessageUtils::createMessage("exchange", "routing_key", true)), - op(msg) - { - op.deliverTo(queue1); - op.deliverTo(queue2); - } -}; - - -QPID_AUTO_TEST_SUITE(TxPublishTestSuite) - -QPID_AUTO_TEST_CASE(testPrepare) -{ - TxPublishTest t; - - intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(t.msg); - //ensure messages are enqueued in store - t.op.prepare(0); - BOOST_CHECK_EQUAL((size_t) 2, t.store.enqueued.size()); - BOOST_CHECK_EQUAL(std::string("queue1"), t.store.enqueued[0].first); - BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second); - BOOST_CHECK_EQUAL(std::string("queue2"), t.store.enqueued[1].first); - BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second); - BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isIngressComplete()); -} - -QPID_AUTO_TEST_CASE(testCommit) -{ - TxPublishTest t; - - //ensure messages are delivered to queue - t.op.prepare(0); - t.op.commit(); - BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount()); - intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload; - - BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isIngressComplete()); - BOOST_CHECK_EQUAL(t.msg, msg_dequeue); - - BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount()); - BOOST_CHECK_EQUAL(t.msg, t.queue2->get().payload); -} - -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 246b0ed423..310ef844bd 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -583,15 +583,7 @@ class ReplicationTests(BrokerTest): s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] for p in priorities: s.send(Message(priority=p)) - - # FIXME aconway 2012-02-22: there is a bug in priority ring - # queues that allows a low priority message to displace a high - # one. The following commented-out assert_browse is for the - # correct result, the uncommented one is for the actualy buggy - # result. See https://issues.apache.org/jira/browse/QPID-3866 - # - # expect = sorted(priorities,reverse=True)[0:5] - expect = [9,9,9,9,2] + expect = sorted(priorities,reverse=True)[0:5] primary.assert_browse("q", expect, transform=lambda m: m.priority) backup.assert_browse_backup("q", expect, transform=lambda m: m.priority) diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp index 257e77b6b4..83f6a5e4b1 100644 --- a/qpid/cpp/src/tests/test_store.cpp +++ b/qpid/cpp/src/tests/test_store.cpp @@ -34,6 +34,7 @@ #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" #include "qpid/Plugin.h" @@ -95,7 +96,7 @@ class TestStore : public NullMessageStore { const boost::intrusive_ptr<PersistableMessage>& pmsg, const PersistableQueue& ) { - Message* msg = dynamic_cast<Message*>(pmsg.get()); + qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get()); assert(msg); // Dump the message if there is a dump file. |
