diff options
Diffstat (limited to 'qpid/cpp/src/tests/QueueTest.cpp')
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 629 |
1 files changed, 0 insertions, 629 deletions
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp deleted file mode 100644 index ee9d37e76d..0000000000 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ /dev/null @@ -1,629 +0,0 @@ - /* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "MessageUtils.h" -#include "unit_test.h" -#include "test_tools.h" -#include "qpid/Exception.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/broker/FanOutExchange.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/Deliverable.h" -#include "qpid/broker/ExchangeRegistry.h" -#include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid/framing/DeliveryProperties.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/client/QueueOptions.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/broker/QueueFlowLimit.h" -#include "qpid/broker/QueueSettings.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Timer.h" - -#include <iostream> -#include <vector> -#include <boost/format.hpp> -#include <boost/lexical_cast.hpp> - -using namespace std; -using boost::intrusive_ptr; -using namespace qpid; -using namespace qpid::broker; -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::sys; - -namespace qpid { -namespace tests { -class TestConsumer : public virtual Consumer{ -public: - typedef boost::shared_ptr<TestConsumer> shared_ptr; - - QueueCursor lastCursor; - Message lastMessage; - bool received; - TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER, ""), received(false) {}; - - virtual bool deliver(const QueueCursor& cursor, const Message& message){ - lastCursor = cursor; - lastMessage = message; - received = true; - return true; - }; - void notify() {} - void cancel() {} - void acknowledged(const DeliveryRecord&) {} - OwnershipToken* getSession() { return 0; } -}; - -class FailOnDeliver : public Deliverable -{ - Message msg; -public: - FailOnDeliver() : msg(MessageUtils::createMessage()) {} - void deliverTo(const boost::shared_ptr<Queue>& queue) - { - throw Exception(QPID_MSG("Invalid delivery to " << queue->getName())); - } - Message& getMessage() { return msg; } -}; - -QPID_AUTO_TEST_SUITE(QueueTestSuite) - -QPID_AUTO_TEST_CASE(testBound){ - //test the recording of bindings, and use of those to allow a queue to be unbound - string key("my-key"); - FieldTable args; - - Queue::shared_ptr queue(new Queue("my-queue")); - ExchangeRegistry exchanges; - //establish bindings from exchange->queue and notify the queue as it is bound: - Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first; - exchange1->bind(queue, key, &args); - queue->bound(exchange1->getName(), key, args); - - Exchange::shared_ptr exchange2 = exchanges.declare("my-exchange-2", "fanout").first; - exchange2->bind(queue, key, &args); - queue->bound(exchange2->getName(), key, args); - - Exchange::shared_ptr exchange3 = exchanges.declare("my-exchange-3", "topic").first; - exchange3->bind(queue, key, &args); - queue->bound(exchange3->getName(), key, args); - - //delete one of the exchanges: - exchanges.destroy(exchange2->getName()); - exchange2.reset(); - - //unbind the queue from all exchanges it knows it has been bound to: - queue->unbind(exchanges); - - //ensure the remaining exchanges don't still have the queue bound to them: - FailOnDeliver deliverable; - exchange1->route(deliverable); - exchange3->route(deliverable); -} - -QPID_AUTO_TEST_CASE(testLVQ){ - - QueueSettings settings; - string key="key"; - settings.lvqKey = key; - QueueFactory factory; - Queue::shared_ptr q(factory.create("my-queue", settings)); - - const char* values[] = { "a", "b", "c", "a"}; - for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) { - qpid::types::Variant::Map properties; - properties[key] = values[i]; - q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1))); - } - BOOST_CHECK_EQUAL(q->getMessageCount(), 3u); - - TestConsumer::shared_ptr c(new TestConsumer("test", true)); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(std::string("2"), c->lastMessage.getContent()); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(std::string("3"), c->lastMessage.getContent()); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(std::string("4"), c->lastMessage.getContent()); - - - const char* values2[] = { "a", "b", "c"}; - for (size_t i = 0; i < sizeof(values2)/sizeof(values2[0]); ++i) { - qpid::types::Variant::Map properties; - properties[key] = values[i]; - q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+5))); - } - BOOST_CHECK_EQUAL(q->getMessageCount(), 3u); - - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(std::string("5"), c->lastMessage.getContent()); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(std::string("6"), c->lastMessage.getContent()); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(std::string("7"), c->lastMessage.getContent()); -} - -QPID_AUTO_TEST_CASE(testLVQEmptyKey){ - - QueueSettings settings; - string key="key"; - settings.lvqKey = key; - QueueFactory factory; - Queue::shared_ptr q(factory.create("my-queue", settings)); - - - qpid::types::Variant::Map properties; - properties["key"] = "a"; - q->deliver(MessageUtils::createMessage(properties, "one")); - properties.clear(); - q->deliver(MessageUtils::createMessage(properties, "two")); - BOOST_CHECK_EQUAL(q->getMessageCount(), 2u); -} - -void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) -{ - for (uint i = 0; i < count; i++) { - Message m = MessageUtils::createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl); - queue.deliver(m); - } -} - -QPID_AUTO_TEST_CASE(testPurgeExpired) { - Queue queue("my-queue"); - addMessagesToQueue(10, queue); - BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u); - ::usleep(300*1000); - queue.purgeExpired(0); - BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u); -} - -QPID_AUTO_TEST_CASE(testQueueCleaner) { - boost::shared_ptr<Poller> poller(new Poller); - Thread runner(poller.get()); - Timer timer; - QueueRegistry queues; - Queue::shared_ptr queue = queues.declare("my-queue", QueueSettings()).first; - addMessagesToQueue(10, *queue, 200, 400); - BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); - - QueueCleaner cleaner(queues, poller, &timer); - cleaner.start(100 * qpid::sys::TIME_MSEC); - ::usleep(300*1000); - BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); - ::usleep(300*1000); - BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); - poller->shutdown(); - runner.join(); -} -namespace { -int getIntProperty(const Message& message, const std::string& key) -{ - qpid::types::Variant v = message.getProperty(key); - int i(0); - if (!v.isVoid()) i = v; - return i; -} -// helper for group tests -void verifyAcquire( Queue::shared_ptr queue, - TestConsumer::shared_ptr c, - std::deque<QueueCursor>& results, - const std::string& expectedGroup, - const int expectedId ) -{ - bool success = queue->dispatch(c); - BOOST_CHECK(success); - if (success) { - results.push_back(c->lastCursor); - std::string group = c->lastMessage.getPropertyAsString("GROUP-ID"); - int id = getIntProperty(c->lastMessage, "MY-ID"); - BOOST_CHECK_EQUAL( group, expectedGroup ); - BOOST_CHECK_EQUAL( id, expectedId ); - } -} - -Message createGroupMessage(int id, const std::string& group) -{ - qpid::types::Variant::Map properties; - properties["GROUP-ID"] = group; - properties["MY-ID"] = id; - return MessageUtils::createMessage(properties); -} -} - -QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { - // - // Verify that consumers of grouped messages own the groups once a message is acquired, - // and release the groups once all acquired messages have been dequeued or requeued - // - QueueSettings settings; - settings.shareGroups = 1; - settings.groupKey = "GROUP-ID"; - QueueFactory factory; - Queue::shared_ptr queue(factory.create("my_queue", settings)); - - std::string groups[] = { std::string("a"), std::string("a"), std::string("a"), - std::string("b"), std::string("b"), std::string("b"), - std::string("c"), std::string("c"), std::string("c") }; - for (int i = 0; i < 9; ++i) { - queue->deliver(createGroupMessage(i, groups[i])); - } - - // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... - // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---, - - BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount()); - - TestConsumer::shared_ptr c1(new TestConsumer("C1")); - TestConsumer::shared_ptr c2(new TestConsumer("C2")); - - queue->consume(c1); - queue->consume(c2); - - std::deque<QueueCursor> dequeMeC1; - std::deque<QueueCursor> dequeMeC2; - - - verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0) - verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3) - - // now let c1 complete the 'a-0' message - this should free the 'a' group - queue->dequeue( 0, dequeMeC1.front() ); - dequeMeC1.pop_front(); - - // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... - // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, --- - - // now c2 should pick up the next 'a-1', since it is oldest free - verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b" - - // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... - // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, --- - - // c1 should only be able to snarf up the first "c" message now... - verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c" - - // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... - // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1 - - // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired) - queue->dequeue( 0, dequeMeC2.front() ); - dequeMeC2.pop_front(); - - // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... - // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1 - - // b group is free, c is owned by c1 - c1's next get should grab 'b-4' - verifyAcquire(queue, c1, dequeMeC1, "b", 4 ); - - // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... - // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1 - - // c2 can now only grab a-2, and that's all - verifyAcquire(queue, c2, dequeMeC2, "a", 2 ); - - // now C2 can't get any more, since C1 owns "b" and "c" group... - bool gotOne = queue->dispatch(c2); - BOOST_CHECK( !gotOne ); - - // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4) - queue->dequeue( 0, dequeMeC1.front() ); - dequeMeC1.pop_front(); - - // Queue = a-1, a-2, b-4, b-5, c-7, c-8... - // Owners= ^C2, ^C2, ^C1, ^C1, ---, --- - - // c2 can now grab c-7 - verifyAcquire(queue, c2, dequeMeC2, "c", 7 ); - - // Queue = a-1, a-2, b-4, b-5, c-7, c-8... - // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2 - - // what happens if C-2 "requeues" a-1 and a-2? - queue->release( dequeMeC2.front() ); - dequeMeC2.pop_front(); - queue->release( dequeMeC2.front() ); - dequeMeC2.pop_front(); // now just has c-7 acquired - - // Queue = a-1, a-2, b-4, b-5, c-7, c-8... - // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2 - - // now c1 will grab a-1 and a-2... - verifyAcquire(queue, c1, dequeMeC1, "a", 1 ); - verifyAcquire(queue, c1, dequeMeC1, "a", 2 ); - - // Queue = a-1, a-2, b-4, b-5, c-7, c-8... - // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2 - - // c2 can now acquire c-8 only - verifyAcquire(queue, c2, dequeMeC2, "c", 8 ); - - // and c1 can get b-5 - verifyAcquire(queue, c1, dequeMeC1, "b", 5 ); - - // should be no more acquire-able for anyone now: - gotOne = queue->dispatch(c1); - BOOST_CHECK( !gotOne ); - gotOne = queue->dispatch(c2); - BOOST_CHECK( !gotOne ); - - // release all of C1's acquired messages, then cancel C1 - while (!dequeMeC1.empty()) { - queue->release(dequeMeC1.front()); - dequeMeC1.pop_front(); - } - queue->cancel(c1); - - // Queue = a-1, a-2, b-4, b-5, c-7, c-8... - // Owners= ---, ---, ---, ---, ^C2, ^C2 - - // b-4, a-1, a-2, b-5 all should be available, right? - verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); - - while (!dequeMeC2.empty()) { - queue->dequeue(0, dequeMeC2.front()); - dequeMeC2.pop_front(); - } - - // Queue = a-2, b-4, b-5 - // Owners= ---, ---, --- - - TestConsumer::shared_ptr c3(new TestConsumer("C3")); - queue->consume(c3); - std::deque<QueueCursor> dequeMeC3; - - verifyAcquire(queue, c3, dequeMeC3, "a", 2 ); - verifyAcquire(queue, c2, dequeMeC2, "b", 4 ); - - // Queue = a-2, b-4, b-5 - // Owners= ^C3, ^C2, ^C2 - - gotOne = queue->dispatch(c3); - BOOST_CHECK( !gotOne ); - - verifyAcquire(queue, c2, dequeMeC2, "b", 5 ); - - while (!dequeMeC2.empty()) { - queue->dequeue(0, dequeMeC2.front()); - dequeMeC2.pop_front(); - } - - // Queue = a-2, - // Owners= ^C3, - queue->deliver(createGroupMessage(9, "a")); - - // Queue = a-2, a-9 - // Owners= ^C3, ^C3 - - gotOne = queue->dispatch(c2); - BOOST_CHECK( !gotOne ); - - queue->deliver(createGroupMessage(10, "b")); - - // Queue = a-2, a-9, b-10 - // Owners= ^C3, ^C3, ---- - - verifyAcquire(queue, c2, dequeMeC2, "b", 10 ); - verifyAcquire(queue, c3, dequeMeC3, "a", 9 ); - - gotOne = queue->dispatch(c3); - BOOST_CHECK( !gotOne ); - - queue->cancel(c2); - queue->cancel(c3); -} - - -QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { - // - // Verify that the same default group name is automatically applied to messages that - // do not specify a group name. - // - QueueSettings settings; - settings.shareGroups = 1; - settings.groupKey = "GROUP-ID"; - QueueFactory factory; - Queue::shared_ptr queue(factory.create("my_queue", settings)); - - for (int i = 0; i < 3; ++i) { - qpid::types::Variant::Map properties; - // no "GROUP-ID" header - properties["MY-ID"] = i; - queue->deliver(MessageUtils::createMessage(properties)); - } - - // Queue = 0, 1, 2 - - BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); - - TestConsumer::shared_ptr c1(new TestConsumer("C1")); - TestConsumer::shared_ptr c2(new TestConsumer("C2")); - - queue->consume(c1); - queue->consume(c2); - - std::deque<QueueCursor> dequeMeC1; - std::deque<QueueCursor> dequeMeC2; - - queue->dispatch(c1); // c1 now owns default group (acquired 0) - dequeMeC1.push_back(c1->lastCursor); - int id = getIntProperty(c1->lastMessage, "MY-ID"); - BOOST_CHECK_EQUAL( id, 0 ); - - bool gotOne = queue->dispatch(c2); // c2 should get nothing - BOOST_CHECK( !gotOne ); - - queue->dispatch(c1); // c1 now acquires 1 - dequeMeC1.push_back(c1->lastCursor); - id = getIntProperty(c1->lastMessage, "MY-ID"); - BOOST_CHECK_EQUAL( id, 1 ); - - gotOne = queue->dispatch(c2); // c2 should still get nothing - BOOST_CHECK( !gotOne ); - - while (!dequeMeC1.empty()) { - queue->dequeue(0, dequeMeC1.front()); - dequeMeC1.pop_front(); - } - - // now default group should be available... - queue->dispatch(c2); // c2 now owns default group (acquired 2) - id = c2->lastMessage.getProperty("MY-ID"); - BOOST_CHECK_EQUAL( id, 2 ); - - gotOne = queue->dispatch(c1); // c1 should get nothing - BOOST_CHECK( !gotOne ); - - queue->cancel(c1); - queue->cancel(c2); -} - -QPID_AUTO_TEST_CASE(testSetPositionFifo) { - Queue::shared_ptr q(new Queue("my-queue", true)); - BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0)); - for (int i = 0; i < 10; ++i) - q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), boost::lexical_cast<string>(i+1))); - - // Verify the front of the queue - TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); // Numbered from 1 - BOOST_CHECK_EQUAL("1", c->lastMessage.getContent()); - - // Verify the back of the queue - BOOST_CHECK_EQUAL(10u, q->getPosition()); - BOOST_CHECK_EQUAL(10u, q->getMessageCount()); - - // Using setPosition to introduce a gap in sequence numbers. - q->setPosition(15); - BOOST_CHECK_EQUAL(10u, q->getMessageCount()); - BOOST_CHECK_EQUAL(15u, q->getPosition()); - q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "16")); - - q->seek(*c, Queue::MessagePredicate(), 9); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(10u, c->lastMessage.getSequence()); - BOOST_CHECK_EQUAL("10", c->lastMessage.getContent()); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(16u, c->lastMessage.getSequence()); - BOOST_CHECK_EQUAL("16", c->lastMessage.getContent()); - - // Using setPosition to trunkcate the queue - q->setPosition(5); - BOOST_CHECK_EQUAL(5u, q->getMessageCount()); - q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "6a")); - c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire - q->seek(*c, Queue::MessagePredicate(), 4); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence()); - BOOST_CHECK_EQUAL("5", c->lastMessage.getContent()); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(6u, c->lastMessage.getSequence()); - BOOST_CHECK_EQUAL("6a", c->lastMessage.getContent()); - BOOST_CHECK(!q->dispatch(c)); // No more messages. -} - -QPID_AUTO_TEST_CASE(testSetPositionLvq) { - QueueSettings settings; - string key="key"; - settings.lvqKey = key; - QueueFactory factory; - Queue::shared_ptr q(factory.create("my-queue", settings)); - - const char* values[] = { "a", "b", "c", "a", "b", "c" }; - for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) { - qpid::types::Variant::Map properties; - properties[key] = values[i]; - q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1))); - } - BOOST_CHECK_EQUAL(3u, q->getMessageCount()); - // Verify the front of the queue - TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); // Numbered from 1 - BOOST_CHECK_EQUAL("4", c->lastMessage.getContent()); - // Verify the back of the queue - BOOST_CHECK_EQUAL(6u, q->getPosition()); - - q->setPosition(5); - - c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire - q->seek(*c, Queue::MessagePredicate(), 4); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence()); // Numbered from 1 - BOOST_CHECK(!q->dispatch(c)); -} - -QPID_AUTO_TEST_CASE(testSetPositionPriority) { - QueueSettings settings; - settings.priorities = 10; - QueueFactory factory; - Queue::shared_ptr q(factory.create("my-queue", settings)); - - const int priorities[] = { 1, 2, 3, 2, 1, 3 }; - for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) { - qpid::types::Variant::Map properties; - properties["priority"] = priorities[i]; - q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1))); - } - - // Truncation removes messages in fifo order, not priority order. - q->setPosition(3); - TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in priority order - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence()); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence()); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); - BOOST_CHECK(!q->dispatch(c)); - - qpid::types::Variant::Map properties; - properties["priority"] = 4; - q->deliver(MessageUtils::createMessage(properties, "4a")); - - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); - BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent()); - - // But consumers see priority order - c.reset(new TestConsumer("test", true)); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); - BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent()); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence()); - BOOST_CHECK_EQUAL("3", c->lastMessage.getContent()); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence()); - BOOST_CHECK_EQUAL("2", c->lastMessage.getContent()); - BOOST_CHECK(q->dispatch(c)); - BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); - BOOST_CHECK_EQUAL("1", c->lastMessage.getContent()); -} - -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests |