summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/QueueTest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/QueueTest.cpp')
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp629
1 files changed, 0 insertions, 629 deletions
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
deleted file mode 100644
index ee9d37e76d..0000000000
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ /dev/null
@@ -1,629 +0,0 @@
- /*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "MessageUtils.h"
-#include "unit_test.h"
-#include "test_tools.h"
-#include "qpid/Exception.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/broker/FanOutExchange.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/Deliverable.h"
-#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/NullMessageStore.h"
-#include "qpid/framing/DeliveryProperties.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/client/QueueOptions.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/broker/QueueFlowLimit.h"
-#include "qpid/broker/QueueSettings.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Timer.h"
-
-#include <iostream>
-#include <vector>
-#include <boost/format.hpp>
-#include <boost/lexical_cast.hpp>
-
-using namespace std;
-using boost::intrusive_ptr;
-using namespace qpid;
-using namespace qpid::broker;
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-namespace qpid {
-namespace tests {
-class TestConsumer : public virtual Consumer{
-public:
- typedef boost::shared_ptr<TestConsumer> shared_ptr;
-
- QueueCursor lastCursor;
- Message lastMessage;
- bool received;
- TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER, ""), received(false) {};
-
- virtual bool deliver(const QueueCursor& cursor, const Message& message){
- lastCursor = cursor;
- lastMessage = message;
- received = true;
- return true;
- };
- void notify() {}
- void cancel() {}
- void acknowledged(const DeliveryRecord&) {}
- OwnershipToken* getSession() { return 0; }
-};
-
-class FailOnDeliver : public Deliverable
-{
- Message msg;
-public:
- FailOnDeliver() : msg(MessageUtils::createMessage()) {}
- void deliverTo(const boost::shared_ptr<Queue>& queue)
- {
- throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
- }
- Message& getMessage() { return msg; }
-};
-
-QPID_AUTO_TEST_SUITE(QueueTestSuite)
-
-QPID_AUTO_TEST_CASE(testBound){
- //test the recording of bindings, and use of those to allow a queue to be unbound
- string key("my-key");
- FieldTable args;
-
- Queue::shared_ptr queue(new Queue("my-queue"));
- ExchangeRegistry exchanges;
- //establish bindings from exchange->queue and notify the queue as it is bound:
- Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first;
- exchange1->bind(queue, key, &args);
- queue->bound(exchange1->getName(), key, args);
-
- Exchange::shared_ptr exchange2 = exchanges.declare("my-exchange-2", "fanout").first;
- exchange2->bind(queue, key, &args);
- queue->bound(exchange2->getName(), key, args);
-
- Exchange::shared_ptr exchange3 = exchanges.declare("my-exchange-3", "topic").first;
- exchange3->bind(queue, key, &args);
- queue->bound(exchange3->getName(), key, args);
-
- //delete one of the exchanges:
- exchanges.destroy(exchange2->getName());
- exchange2.reset();
-
- //unbind the queue from all exchanges it knows it has been bound to:
- queue->unbind(exchanges);
-
- //ensure the remaining exchanges don't still have the queue bound to them:
- FailOnDeliver deliverable;
- exchange1->route(deliverable);
- exchange3->route(deliverable);
-}
-
-QPID_AUTO_TEST_CASE(testLVQ){
-
- QueueSettings settings;
- string key="key";
- settings.lvqKey = key;
- QueueFactory factory;
- Queue::shared_ptr q(factory.create("my-queue", settings));
-
- const char* values[] = { "a", "b", "c", "a"};
- for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
- qpid::types::Variant::Map properties;
- properties[key] = values[i];
- q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
- }
- BOOST_CHECK_EQUAL(q->getMessageCount(), 3u);
-
- TestConsumer::shared_ptr c(new TestConsumer("test", true));
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(std::string("2"), c->lastMessage.getContent());
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(std::string("3"), c->lastMessage.getContent());
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(std::string("4"), c->lastMessage.getContent());
-
-
- const char* values2[] = { "a", "b", "c"};
- for (size_t i = 0; i < sizeof(values2)/sizeof(values2[0]); ++i) {
- qpid::types::Variant::Map properties;
- properties[key] = values[i];
- q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+5)));
- }
- BOOST_CHECK_EQUAL(q->getMessageCount(), 3u);
-
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(std::string("5"), c->lastMessage.getContent());
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(std::string("6"), c->lastMessage.getContent());
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(std::string("7"), c->lastMessage.getContent());
-}
-
-QPID_AUTO_TEST_CASE(testLVQEmptyKey){
-
- QueueSettings settings;
- string key="key";
- settings.lvqKey = key;
- QueueFactory factory;
- Queue::shared_ptr q(factory.create("my-queue", settings));
-
-
- qpid::types::Variant::Map properties;
- properties["key"] = "a";
- q->deliver(MessageUtils::createMessage(properties, "one"));
- properties.clear();
- q->deliver(MessageUtils::createMessage(properties, "two"));
- BOOST_CHECK_EQUAL(q->getMessageCount(), 2u);
-}
-
-void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
-{
- for (uint i = 0; i < count; i++) {
- Message m = MessageUtils::createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
- queue.deliver(m);
- }
-}
-
-QPID_AUTO_TEST_CASE(testPurgeExpired) {
- Queue queue("my-queue");
- addMessagesToQueue(10, queue);
- BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u);
- ::usleep(300*1000);
- queue.purgeExpired(0);
- BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u);
-}
-
-QPID_AUTO_TEST_CASE(testQueueCleaner) {
- boost::shared_ptr<Poller> poller(new Poller);
- Thread runner(poller.get());
- Timer timer;
- QueueRegistry queues;
- Queue::shared_ptr queue = queues.declare("my-queue", QueueSettings()).first;
- addMessagesToQueue(10, *queue, 200, 400);
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u);
-
- QueueCleaner cleaner(queues, poller, &timer);
- cleaner.start(100 * qpid::sys::TIME_MSEC);
- ::usleep(300*1000);
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u);
- ::usleep(300*1000);
- BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
- poller->shutdown();
- runner.join();
-}
-namespace {
-int getIntProperty(const Message& message, const std::string& key)
-{
- qpid::types::Variant v = message.getProperty(key);
- int i(0);
- if (!v.isVoid()) i = v;
- return i;
-}
-// helper for group tests
-void verifyAcquire( Queue::shared_ptr queue,
- TestConsumer::shared_ptr c,
- std::deque<QueueCursor>& results,
- const std::string& expectedGroup,
- const int expectedId )
-{
- bool success = queue->dispatch(c);
- BOOST_CHECK(success);
- if (success) {
- results.push_back(c->lastCursor);
- std::string group = c->lastMessage.getPropertyAsString("GROUP-ID");
- int id = getIntProperty(c->lastMessage, "MY-ID");
- BOOST_CHECK_EQUAL( group, expectedGroup );
- BOOST_CHECK_EQUAL( id, expectedId );
- }
-}
-
-Message createGroupMessage(int id, const std::string& group)
-{
- qpid::types::Variant::Map properties;
- properties["GROUP-ID"] = group;
- properties["MY-ID"] = id;
- return MessageUtils::createMessage(properties);
-}
-}
-
-QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
- //
- // Verify that consumers of grouped messages own the groups once a message is acquired,
- // and release the groups once all acquired messages have been dequeued or requeued
- //
- QueueSettings settings;
- settings.shareGroups = 1;
- settings.groupKey = "GROUP-ID";
- QueueFactory factory;
- Queue::shared_ptr queue(factory.create("my_queue", settings));
-
- std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
- std::string("b"), std::string("b"), std::string("b"),
- std::string("c"), std::string("c"), std::string("c") };
- for (int i = 0; i < 9; ++i) {
- queue->deliver(createGroupMessage(i, groups[i]));
- }
-
- // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
- // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
-
- BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount());
-
- TestConsumer::shared_ptr c1(new TestConsumer("C1"));
- TestConsumer::shared_ptr c2(new TestConsumer("C2"));
-
- queue->consume(c1);
- queue->consume(c2);
-
- std::deque<QueueCursor> dequeMeC1;
- std::deque<QueueCursor> dequeMeC2;
-
-
- verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0)
- verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3)
-
- // now let c1 complete the 'a-0' message - this should free the 'a' group
- queue->dequeue( 0, dequeMeC1.front() );
- dequeMeC1.pop_front();
-
- // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
- // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, ---
-
- // now c2 should pick up the next 'a-1', since it is oldest free
- verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b"
-
- // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
- // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, ---
-
- // c1 should only be able to snarf up the first "c" message now...
- verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c"
-
- // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
- // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1
-
- // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired)
- queue->dequeue( 0, dequeMeC2.front() );
- dequeMeC2.pop_front();
-
- // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
- // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1
-
- // b group is free, c is owned by c1 - c1's next get should grab 'b-4'
- verifyAcquire(queue, c1, dequeMeC1, "b", 4 );
-
- // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
- // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1
-
- // c2 can now only grab a-2, and that's all
- verifyAcquire(queue, c2, dequeMeC2, "a", 2 );
-
- // now C2 can't get any more, since C1 owns "b" and "c" group...
- bool gotOne = queue->dispatch(c2);
- BOOST_CHECK( !gotOne );
-
- // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4)
- queue->dequeue( 0, dequeMeC1.front() );
- dequeMeC1.pop_front();
-
- // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
- // Owners= ^C2, ^C2, ^C1, ^C1, ---, ---
-
- // c2 can now grab c-7
- verifyAcquire(queue, c2, dequeMeC2, "c", 7 );
-
- // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
- // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
-
- // what happens if C-2 "requeues" a-1 and a-2?
- queue->release( dequeMeC2.front() );
- dequeMeC2.pop_front();
- queue->release( dequeMeC2.front() );
- dequeMeC2.pop_front(); // now just has c-7 acquired
-
- // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
- // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2
-
- // now c1 will grab a-1 and a-2...
- verifyAcquire(queue, c1, dequeMeC1, "a", 1 );
- verifyAcquire(queue, c1, dequeMeC1, "a", 2 );
-
- // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
- // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2
-
- // c2 can now acquire c-8 only
- verifyAcquire(queue, c2, dequeMeC2, "c", 8 );
-
- // and c1 can get b-5
- verifyAcquire(queue, c1, dequeMeC1, "b", 5 );
-
- // should be no more acquire-able for anyone now:
- gotOne = queue->dispatch(c1);
- BOOST_CHECK( !gotOne );
- gotOne = queue->dispatch(c2);
- BOOST_CHECK( !gotOne );
-
- // release all of C1's acquired messages, then cancel C1
- while (!dequeMeC1.empty()) {
- queue->release(dequeMeC1.front());
- dequeMeC1.pop_front();
- }
- queue->cancel(c1);
-
- // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
- // Owners= ---, ---, ---, ---, ^C2, ^C2
-
- // b-4, a-1, a-2, b-5 all should be available, right?
- verifyAcquire(queue, c2, dequeMeC2, "a", 1 );
-
- while (!dequeMeC2.empty()) {
- queue->dequeue(0, dequeMeC2.front());
- dequeMeC2.pop_front();
- }
-
- // Queue = a-2, b-4, b-5
- // Owners= ---, ---, ---
-
- TestConsumer::shared_ptr c3(new TestConsumer("C3"));
- queue->consume(c3);
- std::deque<QueueCursor> dequeMeC3;
-
- verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
- verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
-
- // Queue = a-2, b-4, b-5
- // Owners= ^C3, ^C2, ^C2
-
- gotOne = queue->dispatch(c3);
- BOOST_CHECK( !gotOne );
-
- verifyAcquire(queue, c2, dequeMeC2, "b", 5 );
-
- while (!dequeMeC2.empty()) {
- queue->dequeue(0, dequeMeC2.front());
- dequeMeC2.pop_front();
- }
-
- // Queue = a-2,
- // Owners= ^C3,
- queue->deliver(createGroupMessage(9, "a"));
-
- // Queue = a-2, a-9
- // Owners= ^C3, ^C3
-
- gotOne = queue->dispatch(c2);
- BOOST_CHECK( !gotOne );
-
- queue->deliver(createGroupMessage(10, "b"));
-
- // Queue = a-2, a-9, b-10
- // Owners= ^C3, ^C3, ----
-
- verifyAcquire(queue, c2, dequeMeC2, "b", 10 );
- verifyAcquire(queue, c3, dequeMeC3, "a", 9 );
-
- gotOne = queue->dispatch(c3);
- BOOST_CHECK( !gotOne );
-
- queue->cancel(c2);
- queue->cancel(c3);
-}
-
-
-QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
- //
- // Verify that the same default group name is automatically applied to messages that
- // do not specify a group name.
- //
- QueueSettings settings;
- settings.shareGroups = 1;
- settings.groupKey = "GROUP-ID";
- QueueFactory factory;
- Queue::shared_ptr queue(factory.create("my_queue", settings));
-
- for (int i = 0; i < 3; ++i) {
- qpid::types::Variant::Map properties;
- // no "GROUP-ID" header
- properties["MY-ID"] = i;
- queue->deliver(MessageUtils::createMessage(properties));
- }
-
- // Queue = 0, 1, 2
-
- BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
-
- TestConsumer::shared_ptr c1(new TestConsumer("C1"));
- TestConsumer::shared_ptr c2(new TestConsumer("C2"));
-
- queue->consume(c1);
- queue->consume(c2);
-
- std::deque<QueueCursor> dequeMeC1;
- std::deque<QueueCursor> dequeMeC2;
-
- queue->dispatch(c1); // c1 now owns default group (acquired 0)
- dequeMeC1.push_back(c1->lastCursor);
- int id = getIntProperty(c1->lastMessage, "MY-ID");
- BOOST_CHECK_EQUAL( id, 0 );
-
- bool gotOne = queue->dispatch(c2); // c2 should get nothing
- BOOST_CHECK( !gotOne );
-
- queue->dispatch(c1); // c1 now acquires 1
- dequeMeC1.push_back(c1->lastCursor);
- id = getIntProperty(c1->lastMessage, "MY-ID");
- BOOST_CHECK_EQUAL( id, 1 );
-
- gotOne = queue->dispatch(c2); // c2 should still get nothing
- BOOST_CHECK( !gotOne );
-
- while (!dequeMeC1.empty()) {
- queue->dequeue(0, dequeMeC1.front());
- dequeMeC1.pop_front();
- }
-
- // now default group should be available...
- queue->dispatch(c2); // c2 now owns default group (acquired 2)
- id = c2->lastMessage.getProperty("MY-ID");
- BOOST_CHECK_EQUAL( id, 2 );
-
- gotOne = queue->dispatch(c1); // c1 should get nothing
- BOOST_CHECK( !gotOne );
-
- queue->cancel(c1);
- queue->cancel(c2);
-}
-
-QPID_AUTO_TEST_CASE(testSetPositionFifo) {
- Queue::shared_ptr q(new Queue("my-queue", true));
- BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0));
- for (int i = 0; i < 10; ++i)
- q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), boost::lexical_cast<string>(i+1)));
-
- // Verify the front of the queue
- TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence()); // Numbered from 1
- BOOST_CHECK_EQUAL("1", c->lastMessage.getContent());
-
- // Verify the back of the queue
- BOOST_CHECK_EQUAL(10u, q->getPosition());
- BOOST_CHECK_EQUAL(10u, q->getMessageCount());
-
- // Using setPosition to introduce a gap in sequence numbers.
- q->setPosition(15);
- BOOST_CHECK_EQUAL(10u, q->getMessageCount());
- BOOST_CHECK_EQUAL(15u, q->getPosition());
- q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "16"));
-
- q->seek(*c, Queue::MessagePredicate(), 9);
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(10u, c->lastMessage.getSequence());
- BOOST_CHECK_EQUAL("10", c->lastMessage.getContent());
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(16u, c->lastMessage.getSequence());
- BOOST_CHECK_EQUAL("16", c->lastMessage.getContent());
-
- // Using setPosition to trunkcate the queue
- q->setPosition(5);
- BOOST_CHECK_EQUAL(5u, q->getMessageCount());
- q->deliver(MessageUtils::createMessage(qpid::types::Variant::Map(), "6a"));
- c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire
- q->seek(*c, Queue::MessagePredicate(), 4);
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence());
- BOOST_CHECK_EQUAL("5", c->lastMessage.getContent());
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(6u, c->lastMessage.getSequence());
- BOOST_CHECK_EQUAL("6a", c->lastMessage.getContent());
- BOOST_CHECK(!q->dispatch(c)); // No more messages.
-}
-
-QPID_AUTO_TEST_CASE(testSetPositionLvq) {
- QueueSettings settings;
- string key="key";
- settings.lvqKey = key;
- QueueFactory factory;
- Queue::shared_ptr q(factory.create("my-queue", settings));
-
- const char* values[] = { "a", "b", "c", "a", "b", "c" };
- for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
- qpid::types::Variant::Map properties;
- properties[key] = values[i];
- q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
- }
- BOOST_CHECK_EQUAL(3u, q->getMessageCount());
- // Verify the front of the queue
- TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence()); // Numbered from 1
- BOOST_CHECK_EQUAL("4", c->lastMessage.getContent());
- // Verify the back of the queue
- BOOST_CHECK_EQUAL(6u, q->getPosition());
-
- q->setPosition(5);
-
- c = boost::shared_ptr<TestConsumer>(new TestConsumer("test", false)); // Don't acquire
- q->seek(*c, Queue::MessagePredicate(), 4);
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(5u, c->lastMessage.getSequence()); // Numbered from 1
- BOOST_CHECK(!q->dispatch(c));
-}
-
-QPID_AUTO_TEST_CASE(testSetPositionPriority) {
- QueueSettings settings;
- settings.priorities = 10;
- QueueFactory factory;
- Queue::shared_ptr q(factory.create("my-queue", settings));
-
- const int priorities[] = { 1, 2, 3, 2, 1, 3 };
- for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) {
- qpid::types::Variant::Map properties;
- properties["priority"] = priorities[i];
- q->deliver(MessageUtils::createMessage(properties, boost::lexical_cast<string>(i+1)));
- }
-
- // Truncation removes messages in fifo order, not priority order.
- q->setPosition(3);
- TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in priority order
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence());
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence());
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence());
- BOOST_CHECK(!q->dispatch(c));
-
- qpid::types::Variant::Map properties;
- properties["priority"] = 4;
- q->deliver(MessageUtils::createMessage(properties, "4a"));
-
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence());
- BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent());
-
- // But consumers see priority order
- c.reset(new TestConsumer("test", true));
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(4u, c->lastMessage.getSequence());
- BOOST_CHECK_EQUAL("4a", c->lastMessage.getContent());
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(3u, c->lastMessage.getSequence());
- BOOST_CHECK_EQUAL("3", c->lastMessage.getContent());
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(2u, c->lastMessage.getSequence());
- BOOST_CHECK_EQUAL("2", c->lastMessage.getContent());
- BOOST_CHECK(q->dispatch(c));
- BOOST_CHECK_EQUAL(1u, c->lastMessage.getSequence());
- BOOST_CHECK_EQUAL("1", c->lastMessage.getContent());
-}
-
-QPID_AUTO_TEST_SUITE_END()
-
-}} // namespace qpid::tests