diff options
author | Gordon Sim <gsim@apache.org> | 2008-09-21 20:39:40 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-09-21 20:39:40 +0000 |
commit | 03fa9018260242f08d2164f06875fc708fdbf4c7 (patch) | |
tree | 47bcba0feec9e95ecb06dfb0379b7abdf41c30f8 /cpp/src/tests/QueuePolicyTest.cpp | |
parent | 558e92d6c9cf8dfb875c1250ab8fe1cefaf30b05 (diff) | |
download | qpid-python-03fa9018260242f08d2164f06875fc708fdbf4c7.tar.gz |
Refactoring of queue/queue-policy:
- moved some logic out of Queue.cpp into QueuePolicy.cpp
- moved QueuedMessage definition into its own header file
- added checks for requeue and dequeue
- split QueuePolicy logic into different sub classes
Added ability to request old messages to be discareded to make room for new ones when configured limit has been reached.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697603 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/QueuePolicyTest.cpp')
-rw-r--r-- | cpp/src/tests/QueuePolicyTest.cpp | 147 |
1 files changed, 113 insertions, 34 deletions
diff --git a/cpp/src/tests/QueuePolicyTest.cpp b/cpp/src/tests/QueuePolicyTest.cpp index db88682010..4267047c3f 100644 --- a/cpp/src/tests/QueuePolicyTest.cpp +++ b/cpp/src/tests/QueuePolicyTest.cpp @@ -19,63 +19,142 @@ * */ #include "qpid/broker/QueuePolicy.h" +#include "qpid/sys/Time.h" #include "unit_test.h" +#include "MessageUtils.h" +#include "BrokerFixture.h" using namespace qpid::broker; +using namespace qpid::client; using namespace qpid::framing; QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite) +QueuedMessage createMessage(uint32_t size) +{ + QueuedMessage msg; + msg.payload = MessageUtils::createMessage(); + MessageUtils::addContent(msg.payload, std::string (size, 'x')); + return msg; +} + + QPID_AUTO_TEST_CASE(testCount) { - QueuePolicy policy(5, 0); - BOOST_CHECK(!policy.limitExceeded()); - for (int i = 0; i < 5; i++) policy.enqueued(10); - BOOST_CHECK_EQUAL((uint64_t) 0, policy.getMaxSize()); - BOOST_CHECK_EQUAL((uint32_t) 5, policy.getMaxCount()); - BOOST_CHECK(!policy.limitExceeded()); - policy.enqueued(10); - BOOST_CHECK(policy.limitExceeded()); - policy.dequeued(10); - BOOST_CHECK(!policy.limitExceeded()); - policy.enqueued(10); - BOOST_CHECK(policy.limitExceeded()); + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(5, 0)); + BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize()); + BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount()); + + QueuedMessage msg = createMessage(10); + for (size_t i = 0; i < 5; i++) { + policy->tryEnqueue(msg); + } + try { + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on enqueuing sixth message"); + } catch (const ResourceLimitExceededException&) {} + + policy->dequeued(msg); + policy->tryEnqueue(msg); + + try { + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)"); + } catch (const ResourceLimitExceededException&) {} } QPID_AUTO_TEST_CASE(testSize) { - QueuePolicy policy(0, 50); - for (int i = 0; i < 5; i++) policy.enqueued(10); - BOOST_CHECK(!policy.limitExceeded()); - policy.enqueued(10); - BOOST_CHECK(policy.limitExceeded()); - policy.dequeued(10); - BOOST_CHECK(!policy.limitExceeded()); - policy.enqueued(10); - BOOST_CHECK(policy.limitExceeded()); + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(0, 50)); + QueuedMessage msg = createMessage(10); + + for (size_t i = 0; i < 5; i++) { + policy->tryEnqueue(msg); + } + try { + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy); + } catch (const ResourceLimitExceededException&) {} + + policy->dequeued(msg); + policy->tryEnqueue(msg); + + try { + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy); + } catch (const ResourceLimitExceededException&) {} } QPID_AUTO_TEST_CASE(testBoth) { - QueuePolicy policy(5, 50); - for (int i = 0; i < 5; i++) policy.enqueued(11); - BOOST_CHECK(policy.limitExceeded()); - policy.dequeued(20); - BOOST_CHECK(!policy.limitExceeded());//fails - policy.enqueued(5); - policy.enqueued(10); - BOOST_CHECK(policy.limitExceeded()); + std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy(5, 50)); + try { + QueuedMessage msg = createMessage(51); + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy); + } catch (const ResourceLimitExceededException&) {} + + std::vector<QueuedMessage> messages; + messages.push_back(createMessage(15)); + messages.push_back(createMessage(10)); + messages.push_back(createMessage(11)); + messages.push_back(createMessage(2)); + messages.push_back(createMessage(7)); + for (size_t i = 0; i < messages.size(); i++) { + policy->tryEnqueue(messages[i]); + } + //size = 45 at this point, count = 5 + try { + QueuedMessage msg = createMessage(5); + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy); + } catch (const ResourceLimitExceededException&) {} + try { + QueuedMessage msg = createMessage(10); + policy->tryEnqueue(msg); + BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy); + } catch (const ResourceLimitExceededException&) {} + + + policy->dequeued(messages[0]); + try { + QueuedMessage msg = createMessage(20); + policy->tryEnqueue(msg); + } catch (const ResourceLimitExceededException&) { + BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy); + } } QPID_AUTO_TEST_CASE(testSettings) { //test reading and writing the policy from/to field table + std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy(101, 303)); FieldTable settings; - QueuePolicy a(101, 303); - a.update(settings); - QueuePolicy b(settings); - BOOST_CHECK_EQUAL(a.getMaxCount(), b.getMaxCount()); - BOOST_CHECK_EQUAL(a.getMaxSize(), b.getMaxSize()); + a->update(settings); + std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy(settings)); + BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount()); + BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize()); +} + +QPID_AUTO_TEST_CASE(testRingPolicy) +{ + FieldTable args; + std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy(5, 0, QueuePolicy::RING); + policy->update(args); + + ProxySessionFixture f; + std::string q("my-ring-queue"); + f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); + for (int i = 0; i < 10; i++) { + f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q)); + } + client::Message msg; + for (int i = 5; i < 10; i++) { + BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData()); + } + BOOST_CHECK(!f.subs.get(msg, q)); } + QPID_AUTO_TEST_SUITE_END() |