diff options
author | Gordon Sim <gsim@apache.org> | 2008-10-13 17:09:06 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-10-13 17:09:06 +0000 |
commit | d2030e43d1d2a68c37ed7c780f336f81a8b16064 (patch) | |
tree | 04f700ed0325eee6bfa5f750a3cc217c705f6c96 /cpp/src/tests | |
parent | 81e78eba4d75597a470356eebde654f14acdcc87 (diff) | |
download | qpid-python-d2030e43d1d2a68c37ed7c780f336f81a8b16064.tar.gz |
Periodically purge expired messages from queues
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704166 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 40 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 39 |
2 files changed, 79 insertions, 0 deletions
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 85497ace5d..440605a2e4 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -42,6 +42,7 @@ using namespace qpid; using qpid::sys::Monitor; using qpid::sys::Thread; using qpid::sys::TIME_SEC; +using qpid::broker::Broker; using std::string; using std::cout; using std::endl; @@ -94,6 +95,8 @@ struct SimpleListener : public MessageListener struct ClientSessionFixture : public ProxySessionFixture { + ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {} + void declareSubscribe(const string& q="my-queue", const string& dest="my-dest") { @@ -282,6 +285,43 @@ QPID_AUTO_TEST_CASE(testOpenFailure) { BOOST_CHECK(!c.isOpen()); } +QPID_AUTO_TEST_CASE(testPeriodicExpiration) { + Broker::Options opts; + opts.queueCleanInterval = 1; + ClientSessionFixture fix(opts); + fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); + + for (uint i = 0; i < 10; i++) { + Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); + if (i % 2) m.getDeliveryProperties().setTtl(500); + fix.session.messageTransfer(arg::content=m); + } + + BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u); + sleep(2); + BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u); +} + +QPID_AUTO_TEST_CASE(testExpirationOnPop) { + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); + + for (uint i = 0; i < 10; i++) { + Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); + if (i % 2) m.getDeliveryProperties().setTtl(200); + fix.session.messageTransfer(arg::content=m); + } + + ::usleep(300* 1000); + + for (uint i = 0; i < 10; i++) { + if (i % 2) continue; + Message m; + BOOST_CHECK(fix.subs.get(m, "my-queue", TIME_SEC)); + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); + } +} + QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index ccc2fc2391..ef8aa69dd6 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -20,6 +20,7 @@ */ #include "unit_test.h" #include "qpid/Exception.h" +#include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/ExchangeRegistry.h" @@ -460,6 +461,44 @@ QPID_AUTO_TEST_CASE(testLVQSaftyCheck){ } +void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) +{ + for (uint i = 0; i < count; i++) { + intrusive_ptr<Message> m = message("exchange", "key"); + if (i % 2) { + if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl); + } else { + if (evenTtl) m->getProperties<DeliveryProperties>()->setTtl(evenTtl); + } + m->setTimestamp(); + queue.deliver(m); + } +} + +QPID_AUTO_TEST_CASE(testPurgeExpired) { + Queue queue("my-queue"); + addMessagesToQueue(10, queue); + BOOST_CHECK_EQUAL(queue.getMessageCount(), 10u); + ::usleep(300*1000); + queue.purgeExpired(); + BOOST_CHECK_EQUAL(queue.getMessageCount(), 5u); +} + +QPID_AUTO_TEST_CASE(testQueueCleaner) { + Timer timer; + QueueRegistry queues; + Queue::shared_ptr queue = queues.declare("my-queue").first; + addMessagesToQueue(10, *queue, 200, 400); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 10u); + + QueueCleaner cleaner(queues, timer); + cleaner.start(100 * qpid::sys::TIME_MSEC); + ::usleep(300*1000); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 5u); + ::usleep(300*1000); + BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); +} + QPID_AUTO_TEST_SUITE_END() |