diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-01-13 18:11:43 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-01-13 18:11:43 +0000 |
commit | 7e34266b9a23f4536415bfbc3f161b84615b6550 (patch) | |
tree | 484008cf2d413f58b5e4ab80b373303c66200888 /RC9/qpid/cpp/src/tests/QueueTest.cpp | |
parent | 4612263ea692f00a4bd810438bdaf9bc88022091 (diff) | |
download | qpid-python-M4.tar.gz |
Tag M4 RC9M4
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@734202 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC9/qpid/cpp/src/tests/QueueTest.cpp')
-rw-r--r-- | RC9/qpid/cpp/src/tests/QueueTest.cpp | 501 |
1 files changed, 501 insertions, 0 deletions
diff --git a/RC9/qpid/cpp/src/tests/QueueTest.cpp b/RC9/qpid/cpp/src/tests/QueueTest.cpp new file mode 100644 index 0000000000..f1771e26cd --- /dev/null +++ b/RC9/qpid/cpp/src/tests/QueueTest.cpp @@ -0,0 +1,501 @@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "unit_test.h" +#include "qpid/Exception.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/Deliverable.h" +#include "qpid/broker/ExchangeRegistry.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/NullMessageStore.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/client/QueueOptions.h" +#include <iostream> +#include "boost/format.hpp" + +using boost::intrusive_ptr; +using namespace qpid; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +class TestConsumer : public virtual Consumer{ +public: + typedef boost::shared_ptr<TestConsumer> shared_ptr; + + intrusive_ptr<Message> last; + bool received; + TestConsumer(bool acquire = true):Consumer(acquire), received(false) {}; + + virtual bool deliver(QueuedMessage& msg){ + last = msg.payload; + received = true; + return true; + }; + void notify() {} + OwnershipToken* getSession() { return 0; } +}; + +class FailOnDeliver : public Deliverable +{ + Message msg; +public: + void deliverTo(const boost::shared_ptr<Queue>& queue) + { + throw Exception(QPID_MSG("Invalid delivery to " << queue->getName())); + } + Message& getMessage() { return msg; } +}; + +intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) { + intrusive_ptr<Message> msg(new Message()); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0)); + AMQFrame header(in_place<AMQHeaderBody>()); + msg->getFrames().append(method); + msg->getFrames().append(header); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + return msg; +} + +QPID_AUTO_TEST_SUITE(QueueTestSuite) + +QPID_AUTO_TEST_CASE(testAsyncMessage) { + Queue::shared_ptr queue(new Queue("my_test_queue", true)); + intrusive_ptr<Message> received; + + TestConsumer::shared_ptr c1(new TestConsumer()); + queue->consume(c1); + + + //Test basic delivery: + intrusive_ptr<Message> msg1 = create_message("e", "A"); + msg1->enqueueAsync();//this is done on enqueue which is not called from process + queue->process(msg1); + sleep(2); + + BOOST_CHECK(!c1->received); + msg1->enqueueComplete(); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg1.get(), received.get()); +} + + +QPID_AUTO_TEST_CASE(testAsyncMessageCount){ + Queue::shared_ptr queue(new Queue("my_test_queue", true)); + intrusive_ptr<Message> msg1 = create_message("e", "A"); + msg1->enqueueAsync();//this is done on enqueue which is not called from process + + queue->process(msg1); + sleep(2); + uint32_t compval=0; + BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); + msg1->enqueueComplete(); + compval=1; + BOOST_CHECK_EQUAL(compval, queue->getMessageCount()); +} + +QPID_AUTO_TEST_CASE(testConsumers){ + Queue::shared_ptr queue(new Queue("my_queue", true)); + + //Test adding consumers: + TestConsumer::shared_ptr c1(new TestConsumer()); + TestConsumer::shared_ptr c2(new TestConsumer()); + queue->consume(c1); + queue->consume(c2); + + BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount()); + + //Test basic delivery: + intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = create_message("e", "C"); + + queue->deliver(msg1); + BOOST_CHECK(queue->dispatch(c1)); + BOOST_CHECK_EQUAL(msg1.get(), c1->last.get()); + + queue->deliver(msg2); + BOOST_CHECK(queue->dispatch(c2)); + BOOST_CHECK_EQUAL(msg2.get(), c2->last.get()); + + c1->received = false; + queue->deliver(msg3); + BOOST_CHECK(queue->dispatch(c1)); + BOOST_CHECK_EQUAL(msg3.get(), c1->last.get()); + + //Test cancellation: + queue->cancel(c1); + BOOST_CHECK_EQUAL(uint32_t(1), queue->getConsumerCount()); + queue->cancel(c2); + BOOST_CHECK_EQUAL(uint32_t(0), queue->getConsumerCount()); +} + +QPID_AUTO_TEST_CASE(testRegistry){ + //Test use of queues in registry: + QueueRegistry registry; + registry.declare("queue1", true, true); + registry.declare("queue2", true, true); + registry.declare("queue3", true, true); + + BOOST_CHECK(registry.find("queue1")); + BOOST_CHECK(registry.find("queue2")); + BOOST_CHECK(registry.find("queue3")); + + registry.destroy("queue1"); + registry.destroy("queue2"); + registry.destroy("queue3"); + + BOOST_CHECK(!registry.find("queue1")); + BOOST_CHECK(!registry.find("queue2")); + BOOST_CHECK(!registry.find("queue3")); +} + +QPID_AUTO_TEST_CASE(testDequeue){ + Queue::shared_ptr queue(new Queue("my_queue", true)); + intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> received; + + queue->deliver(msg1); + queue->deliver(msg2); + queue->deliver(msg3); + + BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg1.get(), received.get()); + BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount()); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg2.get(), received.get()); + BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount()); + + TestConsumer::shared_ptr consumer(new TestConsumer()); + queue->consume(consumer); + queue->dispatch(consumer); + if (!consumer->received) + sleep(2); + + BOOST_CHECK_EQUAL(msg3.get(), consumer->last.get()); + BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); + + received = queue->get().payload; + BOOST_CHECK(!received); + BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount()); + +} + +QPID_AUTO_TEST_CASE(testBound) +{ + //test the recording of bindings, and use of those to allow a queue to be unbound + string key("my-key"); + FieldTable args; + + Queue::shared_ptr queue(new Queue("my-queue", true)); + ExchangeRegistry exchanges; + //establish bindings from exchange->queue and notify the queue as it is bound: + Exchange::shared_ptr exchange1 = exchanges.declare("my-exchange-1", "direct").first; + exchange1->bind(queue, key, &args); + queue->bound(exchange1->getName(), key, args); + + Exchange::shared_ptr exchange2 = exchanges.declare("my-exchange-2", "fanout").first; + exchange2->bind(queue, key, &args); + queue->bound(exchange2->getName(), key, args); + + Exchange::shared_ptr exchange3 = exchanges.declare("my-exchange-3", "topic").first; + exchange3->bind(queue, key, &args); + queue->bound(exchange3->getName(), key, args); + + //delete one of the exchanges: + exchanges.destroy(exchange2->getName()); + exchange2.reset(); + + //unbind the queue from all exchanges it knows it has been bound to: + queue->unbind(exchanges, queue); + + //ensure the remaining exchanges don't still have the queue bound to them: + FailOnDeliver deliverable; + exchange1->route(deliverable, key, &args); + exchange3->route(deliverable, key, &args); +} + +QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ + + client::QueueOptions args; + args.setPersistLastNode(); + + Queue::shared_ptr queue(new Queue("my-queue", true)); + queue->configure(args); + + intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = create_message("e", "C"); + + //enqueue 2 messages + queue->deliver(msg1); + queue->deliver(msg2); + + //change mode + queue->setLastNodeFailure(); + + //enqueue 1 message + queue->deliver(msg3); + + //check all have persistent ids. + BOOST_CHECK(msg1->isPersistent()); + BOOST_CHECK(msg2->isPersistent()); + BOOST_CHECK(msg3->isPersistent()); + +} + +class TestMessageStoreOC : public NullMessageStore +{ + public: + + virtual void dequeue(TransactionContext*, + const boost::intrusive_ptr<PersistableMessage>& /*msg*/, + const PersistableQueue& /*queue*/) + { + } + + virtual void enqueue(TransactionContext*, + const boost::intrusive_ptr<PersistableMessage>& /*msg*/, + const PersistableQueue& /* queue */) + { + } + + TestMessageStoreOC() : NullMessageStore() {} + ~TestMessageStoreOC(){} +}; + + +QPID_AUTO_TEST_CASE(testLVQOrdering){ + + client::QueueOptions args; + // set queue mode + args.setOrdering(client::LVQ); + + Queue::shared_ptr queue(new Queue("my-queue", true )); + queue->configure(args); + + intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg4 = create_message("e", "D"); + intrusive_ptr<Message> received; + + //set deliever match for LVQ a,b,c,a + + string key; + args.getLVQKey(key); + BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); + + + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); + msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + + //enqueue 4 message + queue->deliver(msg1); + queue->deliver(msg2); + queue->deliver(msg3); + queue->deliver(msg4); + + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg4.get(), received.get()); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg2.get(), received.get()); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg3.get(), received.get()); + + intrusive_ptr<Message> msg5 = create_message("e", "A"); + intrusive_ptr<Message> msg6 = create_message("e", "B"); + intrusive_ptr<Message> msg7 = create_message("e", "C"); + msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); + queue->deliver(msg5); + queue->deliver(msg6); + queue->deliver(msg7); + + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg5.get(), received.get()); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg6.get(), received.get()); + + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg7.get(), received.get()); + +} + +QPID_AUTO_TEST_CASE(testLVQAcquire){ + + client::QueueOptions args; + // set queue mode + args.setOrdering(client::LVQ); + + Queue::shared_ptr queue(new Queue("my-queue", true )); + queue->configure(args); + + intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg4 = create_message("e", "D"); + intrusive_ptr<Message> msg5 = create_message("e", "F"); + intrusive_ptr<Message> msg6 = create_message("e", "G"); + + //set deliever match for LVQ a,b,c,a + + string key; + args.getLVQKey(key); + BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); + + + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); + msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); + msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); + + //enqueue 4 message + queue->deliver(msg1); + queue->deliver(msg2); + queue->deliver(msg3); + queue->deliver(msg4); + + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); + + framing::SequenceNumber sequence(1); + QueuedMessage qmsg(queue.get(), msg1, sequence); + QueuedMessage qmsg2(queue.get(), msg2, ++sequence); + + BOOST_CHECK(!queue->acquire(qmsg)); + BOOST_CHECK(queue->acquire(qmsg2)); + + BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u); + + queue->deliver(msg5); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); + + // set mode to no browse and check + args.setOrdering(client::LVQ_NO_BROWSE); + queue->configure(args); + TestConsumer::shared_ptr c1(new TestConsumer(false)); + + queue->dispatch(c1); + queue->dispatch(c1); + queue->dispatch(c1); + + queue->deliver(msg6); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u); + + intrusive_ptr<Message> received; + received = queue->get().payload; + BOOST_CHECK_EQUAL(msg4.get(), received.get()); + +} + +QPID_AUTO_TEST_CASE(testLVQMultiQueue){ + + client::QueueOptions args; + // set queue mode + args.setOrdering(client::LVQ); + + Queue::shared_ptr queue1(new Queue("my-queue", true )); + Queue::shared_ptr queue2(new Queue("my-queue", true )); + intrusive_ptr<Message> received; + queue1->configure(args); + queue2->configure(args); + + intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg2 = create_message("e", "A"); + + string key; + args.getLVQKey(key); + BOOST_CHECK_EQUAL(key, "qpid.LVQ_key"); + + msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); + + queue1->deliver(msg1); + queue2->deliver(msg1); + queue1->deliver(msg2); + + received = queue1->get().payload; + BOOST_CHECK_EQUAL(msg2.get(), received.get()); + + received = queue2->get().payload; + BOOST_CHECK_EQUAL(msg1.get(), received.get()); + +} + +void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) +{ + for (uint i = 0; i < count; i++) { + intrusive_ptr<Message> m = create_message("exchange", "key"); + if (i % 2) { + if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl); + } else { + if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl); + } + m->setTimestamp(); + queue.deliver(m); + } +} + +QPID_AUTO_TEST_CASE(testPurgeExpired) { + Queue queue("my-queue"); + addMessagesToQueue(10, queue); + BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u); + ::usleep(300*1000); + queue.purgeExpired(); + BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u); +} + +QPID_AUTO_TEST_CASE(testQueueCleaner) { + Timer timer; + QueueRegistry queues; + Queue::shared_ptr queue = queues.declare("my-queue").first; + addMessagesToQueue(10, *queue, 200, 400); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); + + QueueCleaner cleaner(queues, timer); + cleaner.start(100 * qpid::sys::TIME_MSEC); + ::usleep(300*1000); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); + ::usleep(300*1000); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); +} + +QPID_AUTO_TEST_SUITE_END() + + |