summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/MessagingSessionTests.cpp
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-09-17 16:21:13 +0000
committerAidan Skinner <aidan@apache.org>2009-09-17 16:21:13 +0000
commit7d6a028be9f6c47418e98a6fa74a359864428150 (patch)
tree78083427e88c48edb3b23605309e57f1301cba19 /qpid/cpp/src/tests/MessagingSessionTests.cpp
parent31bbc100ac6b3a31eb25d29f407d60ff23334d1f (diff)
downloadqpid-python-7d6a028be9f6c47418e98a6fa74a359864428150.tar.gz
Merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@816261 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/MessagingSessionTests.cpp')
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp119
1 files changed, 100 insertions, 19 deletions
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index 4ee27f0764..f5a5420d3a 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,6 +36,9 @@
#include <string>
#include <vector>
+namespace qpid {
+namespace tests {
+
QPID_AUTO_TEST_SUITE(MessagingSessionTests)
using namespace qpid::messaging;
@@ -86,7 +89,7 @@ struct MessagingFixture : public BrokerFixture
Session session;
BrokerAdmin admin;
- MessagingFixture(Broker::Options opts = Broker::Options()) :
+ MessagingFixture(Broker::Options opts = Broker::Options()) :
BrokerFixture(opts),
connection(Connection::open((boost::format("amqp:tcp:localhost:%1%") % (broker->getPort(Broker::TCP_TRANSPORT))).str())),
session(connection.newSession()),
@@ -133,7 +136,7 @@ struct TopicFixture : MessagingFixture
struct MultiQueueFixture : MessagingFixture
{
- typedef std::vector<std::string>::const_iterator const_iterator;
+ typedef std::vector<std::string>::const_iterator const_iterator;
std::vector<std::string> queues;
MultiQueueFixture(const std::vector<std::string>& names = boost::assign::list_of<std::string>("q1")("q2")("q3")) : queues(names)
@@ -161,7 +164,7 @@ struct MessageDataCollector : MessageListener
}
};
-std::vector<std::string> fetch(Receiver& receiver, int count, qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5)
+std::vector<std::string> fetch(Receiver& receiver, int count, qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5)
{
std::vector<std::string> data;
Message message;
@@ -189,13 +192,12 @@ QPID_AUTO_TEST_CASE(testSendReceiveHeaders)
Sender sender = fix.session.createSender(fix.queue);
Message out("test-message");
for (uint i = 0; i < 10; ++i) {
- out.getHeaders()["a"] = i;
+ out.getHeaders()["a"] = i;
sender.send(out);
}
Receiver receiver = fix.session.createReceiver(fix.queue);
Message in;
for (uint i = 0; i < 10; ++i) {
- //Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
BOOST_CHECK(receiver.fetch(in, 5 * qpid::sys::TIME_SEC));
BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes());
BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i);
@@ -249,14 +251,14 @@ QPID_AUTO_TEST_CASE(testSimpleTopic)
Message in;
BOOST_CHECK(!sub2.fetch(in, 0));//TODO: or should this raise an error?
-
+
//TODO: check pending messages...
}
QPID_AUTO_TEST_CASE(testSessionFetch)
{
MultiQueueFixture fix;
-
+
for (uint i = 0; i < fix.queues.size(); i++) {
Receiver r = fix.session.createReceiver(fix.queues[i]);
r.setCapacity(10u);
@@ -267,8 +269,8 @@ QPID_AUTO_TEST_CASE(testSessionFetch)
Sender s = fix.session.createSender(fix.queues[i]);
Message msg((boost::format("Message_%1%") % (i+1)).str());
s.send(msg);
- }
-
+ }
+
for (uint i = 0; i < fix.queues.size(); i++) {
Message msg;
BOOST_CHECK(fix.session.fetch(msg, qpid::sys::TIME_SEC));
@@ -279,7 +281,7 @@ QPID_AUTO_TEST_CASE(testSessionFetch)
QPID_AUTO_TEST_CASE(testSessionDispatch)
{
MultiQueueFixture fix;
-
+
MessageDataCollector collector;
for (uint i = 0; i < fix.queues.size(); i++) {
Receiver r = fix.session.createReceiver(fix.queues[i]);
@@ -292,10 +294,10 @@ QPID_AUTO_TEST_CASE(testSessionDispatch)
Sender s = fix.session.createSender(fix.queues[i]);
Message msg((boost::format("Message_%1%") % (i+1)).str());
s.send(msg);
- }
+ }
while (fix.session.dispatch(qpid::sys::TIME_SEC)) ;
-
+
BOOST_CHECK_EQUAL(collector.messageData, boost::assign::list_of<std::string>("Message_1")("Message_2")("Message_3"));
}
@@ -309,7 +311,7 @@ QPID_AUTO_TEST_CASE(testMapMessage)
out.getContent().asMap()["pi"] = 3.14f;
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
BOOST_CHECK_EQUAL(in.getContent().asMap()["abc"].asString(), "def");
BOOST_CHECK_EQUAL(in.getContent().asMap()["pi"].asFloat(), 3.14f);
fix.session.acknowledge();
@@ -324,11 +326,11 @@ QPID_AUTO_TEST_CASE(testListMessage)
out.getContent() << "abc";
out.getContent() << 1234;
out.getContent() << "def";
- out.getContent() << 56.789;
+ out.getContent() << 56.789;
sender.send(out);
Receiver receiver = fix.session.createReceiver(fix.queue);
- Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
- Variant::List& list = in.getContent().asList();
+ Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ Variant::List& list = in.getContent().asList();
BOOST_CHECK_EQUAL(list.size(), out.getContent().asList().size());
BOOST_CHECK_EQUAL(list.front().asString(), "abc");
list.pop_front();
@@ -354,7 +356,86 @@ QPID_AUTO_TEST_CASE(testReject)
fix.session.reject(in);
in = receiver.fetch(5 * qpid::sys::TIME_SEC);
BOOST_CHECK_EQUAL(in.getBytes(), m2.getBytes());
- fix.session.acknowledge();
+ fix.session.acknowledge();
+}
+
+QPID_AUTO_TEST_CASE(testAvailable)
+{
+ MultiQueueFixture fix;
+
+ Receiver r1 = fix.session.createReceiver(fix.queues[0]);
+ r1.setCapacity(100);
+ r1.start();
+
+ Receiver r2 = fix.session.createReceiver(fix.queues[1]);
+ r2.setCapacity(100);
+ r2.start();
+
+ Sender s1 = fix.session.createSender(fix.queues[0]);
+ Sender s2 = fix.session.createSender(fix.queues[1]);
+
+ for (uint i = 0; i < 10; ++i) {
+ s1.send(Message((boost::format("A_%1%") % (i+1)).str()));
+ }
+ for (uint i = 0; i < 5; ++i) {
+ s2.send(Message((boost::format("B_%1%") % (i+1)).str()));
+ }
+ qpid::sys::sleep(1);//is there any avoid an arbitrary sleep while waiting for messages to be dispatched?
+ for (uint i = 0; i < 5; ++i) {
+ BOOST_CHECK_EQUAL(fix.session.available(), 15u - 2*i);
+ BOOST_CHECK_EQUAL(r1.available(), 10u - i);
+ BOOST_CHECK_EQUAL(r1.fetch().getBytes(), (boost::format("A_%1%") % (i+1)).str());
+ BOOST_CHECK_EQUAL(r2.available(), 5u - i);
+ BOOST_CHECK_EQUAL(r2.fetch().getBytes(), (boost::format("B_%1%") % (i+1)).str());
+ fix.session.acknowledge();
+ }
+ for (uint i = 5; i < 10; ++i) {
+ BOOST_CHECK_EQUAL(fix.session.available(), 10u - i);
+ BOOST_CHECK_EQUAL(r1.available(), 10u - i);
+ BOOST_CHECK_EQUAL(r1.fetch().getBytes(), (boost::format("A_%1%") % (i+1)).str());
+ }
+}
+
+QPID_AUTO_TEST_CASE(testPendingAck)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ for (uint i = 0; i < 10; ++i) {
+ sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+ }
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ for (uint i = 0; i < 10; ++i) {
+ BOOST_CHECK_EQUAL(receiver.fetch().getBytes(), (boost::format("Message_%1%") % (i+1)).str());
+ }
+ BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u);
+ fix.session.acknowledge();
+ BOOST_CHECK_EQUAL(fix.session.pendingAck(), 10u);
+ fix.session.sync();
+ BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u);
+}
+
+QPID_AUTO_TEST_CASE(testPendingSend)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ for (uint i = 0; i < 10; ++i) {
+ sender.send(Message((boost::format("Message_%1%") % (i+1)).str()));
+ }
+ //Note: this test relies on 'inside knowledge' of the sender
+ //implementation and the fact that the simple test case makes it
+ //possible to predict when completion information will be sent to
+ //the client. TODO: is there a better way of testing this?
+ BOOST_CHECK_EQUAL(sender.pending(), 10u);
+ fix.session.sync();
+ BOOST_CHECK_EQUAL(sender.pending(), 0u);
+
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ for (uint i = 0; i < 10; ++i) {
+ BOOST_CHECK_EQUAL(receiver.fetch().getBytes(), (boost::format("Message_%1%") % (i+1)).str());
+ }
+ fix.session.acknowledge();
}
QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests