summaryrefslogtreecommitdiff
path: root/cpp/src/tests/QueuePolicyTest.cpp
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/tests/QueuePolicyTest.cpp
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-rajith_jms_client.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/QueuePolicyTest.cpp')
-rw-r--r--cpp/src/tests/QueuePolicyTest.cpp406
1 files changed, 0 insertions, 406 deletions
diff --git a/cpp/src/tests/QueuePolicyTest.cpp b/cpp/src/tests/QueuePolicyTest.cpp
deleted file mode 100644
index 5455105078..0000000000
--- a/cpp/src/tests/QueuePolicyTest.cpp
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <sstream>
-#include "unit_test.h"
-#include "test_tools.h"
-
-#include "qpid/broker/QueuePolicy.h"
-#include "qpid/broker/QueueFlowLimit.h"
-#include "qpid/client/QueueOptions.h"
-#include "qpid/sys/Time.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "MessageUtils.h"
-#include "BrokerFixture.h"
-
-using namespace qpid::broker;
-using namespace qpid::client;
-using namespace qpid::framing;
-
-namespace qpid {
-namespace tests {
-
-QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite)
-
-namespace {
-QueuedMessage createMessage(uint32_t size)
-{
- QueuedMessage msg;
- msg.payload = MessageUtils::createMessage();
- MessageUtils::addContent(msg.payload, std::string (size, 'x'));
- return msg;
-}
-}
-
-QPID_AUTO_TEST_CASE(testCount)
-{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 0));
- BOOST_CHECK_EQUAL((uint64_t) 0, policy->getMaxSize());
- BOOST_CHECK_EQUAL((uint32_t) 5, policy->getMaxCount());
-
- QueuedMessage msg = createMessage(10);
- for (size_t i = 0; i < 5; i++) {
- policy->tryEnqueue(msg.payload);
- }
- try {
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on enqueuing sixth message");
- } catch (const ResourceLimitExceededException&) {}
-
- policy->dequeued(msg);
- policy->tryEnqueue(msg.payload);
-
- try {
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on enqueuing sixth message (after dequeue)");
- } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_CASE(testSize)
-{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 0, 50));
- QueuedMessage msg = createMessage(10);
-
- for (size_t i = 0; i < 5; i++) {
- policy->tryEnqueue(msg.payload);
- }
- try {
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
- } catch (const ResourceLimitExceededException&) {}
-
- policy->dequeued(msg);
- policy->tryEnqueue(msg.payload);
-
- try {
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on aggregate size exceeding 50 (after dequeue). " << *policy);
- } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_CASE(testBoth)
-{
- std::auto_ptr<QueuePolicy> policy(QueuePolicy::createQueuePolicy("test", 5, 50));
- try {
- QueuedMessage msg = createMessage(51);
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on single message exceeding 50. " << *policy);
- } catch (const ResourceLimitExceededException&) {}
-
- std::vector<QueuedMessage> messages;
- messages.push_back(createMessage(15));
- messages.push_back(createMessage(10));
- messages.push_back(createMessage(11));
- messages.push_back(createMessage(2));
- messages.push_back(createMessage(7));
- for (size_t i = 0; i < messages.size(); i++) {
- policy->tryEnqueue(messages[i].payload);
- }
- //size = 45 at this point, count = 5
- try {
- QueuedMessage msg = createMessage(5);
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on count exceeding 6. " << *policy);
- } catch (const ResourceLimitExceededException&) {}
- try {
- QueuedMessage msg = createMessage(10);
- policy->tryEnqueue(msg.payload);
- BOOST_FAIL("Policy did not fail on aggregate size exceeding 50. " << *policy);
- } catch (const ResourceLimitExceededException&) {}
-
-
- policy->dequeued(messages[0]);
- try {
- QueuedMessage msg = createMessage(20);
- policy->tryEnqueue(msg.payload);
- } catch (const ResourceLimitExceededException&) {
- BOOST_FAIL("Policy failed incorrectly after dequeue. " << *policy);
- }
-}
-
-QPID_AUTO_TEST_CASE(testSettings)
-{
- //test reading and writing the policy from/to field table
- std::auto_ptr<QueuePolicy> a(QueuePolicy::createQueuePolicy("test", 101, 303));
- FieldTable settings;
- a->update(settings);
- std::auto_ptr<QueuePolicy> b(QueuePolicy::createQueuePolicy("test", settings));
- BOOST_CHECK_EQUAL(a->getMaxCount(), b->getMaxCount());
- BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize());
-}
-
-QPID_AUTO_TEST_CASE(testRingPolicyCount)
-{
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING);
- policy->update(args);
-
- ProxySessionFixture f;
- std::string q("my-ring-queue");
- f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
- for (int i = 0; i < 10; i++) {
- f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
- }
- client::Message msg;
- for (int i = 5; i < 10; i++) {
- BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC));
- BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData());
- }
- BOOST_CHECK(!f.subs.get(msg, q));
-
- for (int i = 10; i < 20; i++) {
- f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
- }
- for (int i = 15; i < 20; i++) {
- BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC));
- BOOST_CHECK_EQUAL((boost::format("%1%_%2%") % "Message" % (i+1)).str(), msg.getData());
- }
- BOOST_CHECK(!f.subs.get(msg, q));
-}
-
-QPID_AUTO_TEST_CASE(testRingPolicySize)
-{
- std::string hundredBytes = std::string(100, 'h');
- std::string fourHundredBytes = std::string (400, 'f');
- std::string thousandBytes = std::string(1000, 't');
-
- // Ring queue, 500 bytes maxSize
-
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING);
- policy->update(args);
-
- ProxySessionFixture f;
- std::string q("my-ring-queue");
- f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
-
- // A. Send messages 0 .. 5, each 100 bytes
-
- client::Message m(hundredBytes, q);
-
- for (int i = 0; i < 6; i++) {
- std::stringstream id;
- id << i;
- m.getMessageProperties().setCorrelationId(id.str());
- f.session.messageTransfer(arg::content=m);
- }
-
- // should find 1 .. 5 on the queue, 0 is displaced by 5
- client::Message msg;
- for (int i = 1; i < 6; i++) {
- std::stringstream id;
- id << i;
- BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC));
- BOOST_CHECK_EQUAL(msg.getMessageProperties().getCorrelationId(), id.str());
- }
- BOOST_CHECK(!f.subs.get(msg, q));
-
- // B. Now make sure that one 400 byte message displaces four 100 byte messages
-
- // Send messages 0 .. 5, each 100 bytes
- for (int i = 0; i < 6; i++) {
- client::Message m(hundredBytes, q);
- std::stringstream id;
- id << i;
- m.getMessageProperties().setCorrelationId(id.str());
- f.session.messageTransfer(arg::content=m);
- }
-
- // Now send one 400 byte message
- client::Message m2(fourHundredBytes, q);
- m2.getMessageProperties().setCorrelationId("6");
- f.session.messageTransfer(arg::content=m2);
-
- // expect to see 5, 6 on the queue
- for (int i = 5; i < 7; i++) {
- std::stringstream id;
- id << i;
- BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC));
- BOOST_CHECK_EQUAL(msg.getMessageProperties().getCorrelationId(), id.str());
- }
- BOOST_CHECK(!f.subs.get(msg, q));
-
-
- // C. Try sending a 1000-byte message, should fail - exceeds maxSize of queue
-
- client::Message m3(thousandBytes, q);
- m3.getMessageProperties().setCorrelationId("6");
- try {
- ScopedSuppressLogging sl;
- f.session.messageTransfer(arg::content=m3);
- BOOST_FAIL("Ooops - successfully added a 1000 byte message to a 512 byte ring queue ...");
- }
- catch (...) {
- }
-
-}
-
-
-QPID_AUTO_TEST_CASE(testStrictRingPolicy)
-{
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING_STRICT);
- policy->update(args);
-
- ProxySessionFixture f;
- std::string q("my-ring-queue");
- f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
- LocalQueue incoming;
- SubscriptionSettings settings(FlowControl::unlimited());
- settings.autoAck = 0; // no auto ack.
- Subscription sub = f.subs.subscribe(incoming, q, settings);
- for (int i = 0; i < 5; i++) {
- f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
- }
- for (int i = 0; i < 5; i++) {
- BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
- }
- try {
- ScopedSuppressLogging sl; // Suppress messages for expected errors.
- f.session.messageTransfer(arg::content=client::Message("Message_6", q));
- BOOST_FAIL("expecting ResourceLimitExceededException.");
- } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_CASE(testPolicyWithDtx)
-{
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
- policy->update(args);
-
- ProxySessionFixture f;
- std::string q("my-policy-queue");
- f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
- LocalQueue incoming;
- SubscriptionSettings settings(FlowControl::unlimited());
- settings.autoAck = 0; // no auto ack.
- Subscription sub = f.subs.subscribe(incoming, q, settings);
- f.session.dtxSelect();
- Xid tx1(1, "test-dtx-mgr", "tx1");
- f.session.dtxStart(arg::xid=tx1);
- for (int i = 0; i < 5; i++) {
- f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
- }
- f.session.dtxEnd(arg::xid=tx1);
- f.session.dtxCommit(arg::xid=tx1, arg::onePhase=true);
-
- Xid tx2(1, "test-dtx-mgr", "tx2");
- f.session.dtxStart(arg::xid=tx2);
- for (int i = 0; i < 5; i++) {
- BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
- }
- SequenceSet accepting=sub.getUnaccepted();
- f.session.messageAccept(accepting);
- f.session.dtxEnd(arg::xid=tx2);
- f.session.dtxPrepare(arg::xid=tx2);
- f.session.dtxRollback(arg::xid=tx2);
- f.session.messageRelease(accepting);
-
- Xid tx3(1, "test-dtx-mgr", "tx3");
- f.session.dtxStart(arg::xid=tx3);
- for (int i = 0; i < 5; i++) {
- incoming.pop();
- }
- accepting=sub.getUnaccepted();
- f.session.messageAccept(accepting);
- f.session.dtxEnd(arg::xid=tx3);
- f.session.dtxPrepare(arg::xid=tx3);
-
- Session other = f.connection.newSession();
- try {
- ScopedSuppressLogging sl; // Suppress messages for expected errors.
- other.messageTransfer(arg::content=client::Message("Message_6", q));
- BOOST_FAIL("expecting ResourceLimitExceededException.");
- } catch (const ResourceLimitExceededException&) {}
-
- f.session.dtxCommit(arg::xid=tx3);
- //now retry and this time should succeed
- other = f.connection.newSession();
- other.messageTransfer(arg::content=client::Message("Message_6", q));
-}
-
-QPID_AUTO_TEST_CASE(testFlowToDiskWithNoStore)
-{
- //Ensure that with no store loaded, we don't flow to disk but
- //fallback to rejecting messages
- QueueOptions args;
- args.setSizePolicy(FLOW_TO_DISK, 0, 5);
- // Disable flow control, or else we'll never hit the max limit
- args.setInt(QueueFlowLimit::flowStopCountKey, 0);
-
- ProxySessionFixture f;
- std::string q("my-queue");
- f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
- LocalQueue incoming;
- SubscriptionSettings settings(FlowControl::unlimited());
- settings.autoAck = 0; // no auto ack.
- Subscription sub = f.subs.subscribe(incoming, q, settings);
- for (int i = 0; i < 5; i++) {
- f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
- }
- for (int i = 0; i < 5; i++) {
- BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
- }
- try {
- ScopedSuppressLogging sl; // Suppress messages for expected errors.
- f.session.messageTransfer(arg::content=client::Message("Message_6", q));
- BOOST_FAIL("expecting ResourceLimitExceededException.");
- } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_CASE(testPolicyFailureOnCommit)
-{
- FieldTable args;
- std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::REJECT);
- policy->update(args);
-
- ProxySessionFixture f;
- std::string q("q");
- f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
- f.session.txSelect();
- for (int i = 0; i < 10; i++) {
- f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
- }
- ScopedSuppressLogging sl; // Suppress messages for expected errors.
- BOOST_CHECK_THROW(f.session.txCommit(), InternalErrorException);
-}
-
-QPID_AUTO_TEST_CASE(testCapacityConversion)
-{
- FieldTable args;
- args.setString("qpid.max_count", "5");
- args.setString("qpid.flow_stop_count", "0");
-
- ProxySessionFixture f;
- std::string q("q");
- f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
- for (int i = 0; i < 5; i++) {
- f.session.messageTransfer(arg::content=client::Message((boost::format("%1%_%2%") % "Message" % (i+1)).str(), q));
- }
- try {
- ScopedSuppressLogging sl; // Suppress messages for expected errors.
- f.session.messageTransfer(arg::content=client::Message("Message_6", q));
- BOOST_FAIL("expecting ResourceLimitExceededException.");
- } catch (const ResourceLimitExceededException&) {}
-}
-
-QPID_AUTO_TEST_SUITE_END()
-
-}} // namespace qpid::tests