diff options
Diffstat (limited to 'cpp/src/tests/MessagingSessionTests.cpp')
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 303 |
1 files changed, 293 insertions, 10 deletions
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index 206f5ba691..fc39557a0e 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -21,6 +21,7 @@ #include "unit_test.h" #include "test_tools.h" #include "BrokerFixture.h" +#include "qpid/messaging/Address.h" #include "qpid/messaging/Connection.h" #include "qpid/messaging/ListContent.h" #include "qpid/messaging/ListView.h" @@ -33,7 +34,9 @@ #include "qpid/messaging/Session.h" #include "qpid/client/Connection.h" #include "qpid/client/Session.h" +#include "qpid/framing/ExchangeQueryResult.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/Uuid.h" #include "qpid/sys/Time.h" #include <boost/assign.hpp> #include <boost/format.hpp> @@ -48,6 +51,7 @@ QPID_AUTO_TEST_SUITE(MessagingSessionTests) using namespace qpid::messaging; using namespace qpid; using qpid::broker::Broker; +using qpid::framing::Uuid; struct BrokerAdmin { @@ -80,6 +84,18 @@ struct BrokerAdmin session.exchangeDelete(qpid::client::arg::exchange=name); } + bool checkQueueExists(const std::string& name) + { + return session.queueQuery(name).getQueue() == name; + } + + bool checkExchangeExists(const std::string& name, std::string& type) + { + qpid::framing::ExchangeQueryResult result = session.exchangeQuery(name); + type = result.getType(); + return !result.getNotFound(); + } + ~BrokerAdmin() { session.close(); @@ -99,6 +115,19 @@ struct MessagingFixture : public BrokerFixture session(connection.newSession()), admin(broker->getPort(Broker::TCP_TRANSPORT)) {} + void ping(const qpid::messaging::Address& address) + { + Receiver r = session.createReceiver(address); + Sender s = session.createSender(address); + Message out(Uuid(true).str()); + s.send(out); + Message in; + BOOST_CHECK(r.fetch(in, 5*qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); + r.cancel(); + s.cancel(); + } + ~MessagingFixture() { session.close(); @@ -178,6 +207,22 @@ std::vector<std::string> fetch(Receiver& receiver, int count, qpid::sys::Duratio return data; } + +void send(Sender& sender, uint count = 1, uint start = 1, const std::string& base = "Message") +{ + for (uint i = start; i < start + count; ++i) { + sender.send(Message((boost::format("%1%_%2%") % base % i).str())); + } +} + +void receive(Receiver& receiver, uint count = 1, uint start = 1, + const std::string& base = "Message", qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5) +{ + for (uint i = start; i < start + count; ++i) { + BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str()); + } +} + QPID_AUTO_TEST_CASE(testSimpleSendReceive) { QueueFixture fix; @@ -212,15 +257,19 @@ QPID_AUTO_TEST_CASE(testSendReceiveHeaders) QPID_AUTO_TEST_CASE(testSenderError) { MessagingFixture fix; - //TODO: this is the wrong type for the exception; define explicit set in messaging namespace - BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::framing::NotFoundException); + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::InvalidAddress); + BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress {create:receiver, type:queue}"), + qpid::messaging::InvalidAddress); } QPID_AUTO_TEST_CASE(testReceiverError) { MessagingFixture fix; - //TODO: this is the wrong type for the exception; define explicit set in messaging namespace - BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::framing::NotFoundException); + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::InvalidAddress); + BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress {create:sender, type:queue}"), + qpid::messaging::InvalidAddress); } QPID_AUTO_TEST_CASE(testSimpleTopic) @@ -433,9 +482,7 @@ QPID_AUTO_TEST_CASE(testPendingSend) { QueueFixture fix; Sender sender = fix.session.createSender(fix.queue); - for (uint i = 0; i < 10; ++i) { - sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); - } + send(sender, 10); //Note: this test relies on 'inside knowledge' of the sender //implementation and the fact that the simple test case makes it //possible to predict when completion information will be sent to @@ -445,12 +492,248 @@ QPID_AUTO_TEST_CASE(testPendingSend) BOOST_CHECK_EQUAL(sender.pending(), 0u); Receiver receiver = fix.session.createReceiver(fix.queue); - for (uint i = 0; i < 10; ++i) { - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), (boost::format("Message_%1%") % (i+1)).str()); - } + receive(receiver, 10); + fix.session.acknowledge(); +} + +QPID_AUTO_TEST_CASE(testBrowse) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + send(sender, 10); + Receiver browser1 = fix.session.createReceiver(fix.queue + " {browse:true}"); + receive(browser1, 10); + Receiver browser2 = fix.session.createReceiver(fix.queue + " {browse:true}"); + receive(browser2, 10); + Receiver consumer = fix.session.createReceiver(fix.queue); + receive(consumer, 10); fix.session.acknowledge(); } +struct QueueCreatePolicyFixture : public MessagingFixture +{ + qpid::messaging::Address address; + + QueueCreatePolicyFixture(const std::string& a) : address(a) {} + + void test() + { + ping(address); + BOOST_CHECK(admin.checkQueueExists(address.getName())); + } + + ~QueueCreatePolicyFixture() + { + admin.deleteQueue(address.getName()); + } +}; + +QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways) +{ + QueueCreatePolicyFixture fix("# {create:always, type:queue}"); + fix.test(); +} + +QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver) +{ + QueueCreatePolicyFixture fix("# {create:receiver, type:queue}"); + Receiver r = fix.session.createReceiver(fix.address); + fix.test(); + r.cancel(); +} + +QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender) +{ + QueueCreatePolicyFixture fix("# {create:sender, type:queue}"); + Sender s = fix.session.createSender(fix.address); + fix.test(); + s.cancel(); +} + +struct ExchangeCreatePolicyFixture : public MessagingFixture +{ + qpid::messaging::Address address; + const std::string exchangeType; + + ExchangeCreatePolicyFixture(const std::string& a, const std::string& t) : + address(a), exchangeType(t) {} + + void test() + { + ping(address); + std::string actualType; + BOOST_CHECK(admin.checkExchangeExists(address.getName(), actualType)); + BOOST_CHECK_EQUAL(exchangeType, actualType); + } + + ~ExchangeCreatePolicyFixture() + { + admin.deleteExchange(address.getName()); + } +}; + +QPID_AUTO_TEST_CASE(testCreatePolicyTopic) +{ + ExchangeCreatePolicyFixture fix("# {create:always, type:topic, node-properties:{x-amqp0-10-exchange-type:topic}}", + "topic"); + fix.test(); +} + +QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout) +{ + ExchangeCreatePolicyFixture fix("#/my-subject {create:receiver, type:topic, node-properties:{x-amqp0-10-exchange-type:fanout}}", "fanout"); + Receiver r = fix.session.createReceiver(fix.address); + fix.test(); + r.cancel(); +} + +QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect) +{ + ExchangeCreatePolicyFixture fix("#/my-subject {create:sender, type:topic, node-properties:{x-amqp0-10-exchange-type:direct}}", "direct"); + Sender s = fix.session.createSender(fix.address); + fix.test(); + s.cancel(); +} + +struct DeletePolicyFixture : public MessagingFixture +{ + enum Mode {RECEIVER, SENDER, ALWAYS, NEVER}; + + std::string getPolicy(Mode mode) + { + switch (mode) { + case SENDER: + return "{delete:sender}"; + case RECEIVER: + return "{delete:receiver}"; + case ALWAYS: + return "{delete:always}"; + case NEVER: + return "{delete:never}"; + } + } + + void testAll() + { + test(RECEIVER); + test(SENDER); + test(ALWAYS); + test(NEVER); + } + + virtual ~DeletePolicyFixture() {} + virtual void create(const qpid::messaging::Address&) = 0; + virtual void destroy(const qpid::messaging::Address&) = 0; + virtual bool exists(const qpid::messaging::Address&) = 0; + + void test(Mode mode) + { + qpid::messaging::Address address("# " + getPolicy(mode)); + create(address); + + Sender s = session.createSender(address); + Receiver r = session.createReceiver(address); + switch (mode) { + case RECEIVER: + s.cancel(); + BOOST_CHECK(exists(address)); + r.cancel(); + BOOST_CHECK(!exists(address)); + break; + case SENDER: + r.cancel(); + BOOST_CHECK(exists(address)); + s.cancel(); + BOOST_CHECK(!exists(address)); + break; + case ALWAYS: + //Problematic case at present; multiple attempts to delete + //will result in all but one attempt failing and killing + //the session which is not desirable. TODO: better + //implementation of delete policy. + s.cancel(); + BOOST_CHECK(!exists(address)); + break; + case NEVER: + r.cancel(); + BOOST_CHECK(exists(address)); + s.cancel(); + BOOST_CHECK(exists(address)); + destroy(address); + } + } +}; + +struct QueueDeletePolicyFixture : DeletePolicyFixture +{ + void create(const qpid::messaging::Address& address) + { + admin.createQueue(address.getName()); + } + void destroy(const qpid::messaging::Address& address) + { + admin.deleteQueue(address.getName()); + } + bool exists(const qpid::messaging::Address& address) + { + return admin.checkQueueExists(address.getName()); + } +}; + +struct ExchangeDeletePolicyFixture : DeletePolicyFixture +{ + const std::string exchangeType; + ExchangeDeletePolicyFixture(const std::string type = "topic") : exchangeType(type) {} + + void create(const qpid::messaging::Address& address) + { + admin.createExchange(address.getName(), exchangeType); + } + void destroy(const qpid::messaging::Address& address) + { + admin.deleteExchange(address.getName()); + } + bool exists(const qpid::messaging::Address& address) + { + std::string actualType; + return admin.checkExchangeExists(address.getName(), actualType) && actualType == exchangeType; + } +}; + +QPID_AUTO_TEST_CASE(testDeletePolicyQueue) +{ + QueueDeletePolicyFixture fix; + fix.testAll(); +} + +QPID_AUTO_TEST_CASE(testDeletePolicyExchange) +{ + ExchangeDeletePolicyFixture fix; + fix.testAll(); +} + +QPID_AUTO_TEST_CASE(testAssertPolicyQueue) +{ + MessagingFixture fix; + std::string a1 = "q {create:always, assert:always, type:queue, node-properties:{durable:false, x-amqp0-10-arguments:{qpid.max-count:100}}}"; + Sender s1 = fix.session.createSender(a1); + s1.cancel(); + Receiver r1 = fix.session.createReceiver(a1); + r1.cancel(); + + std::string a2 = "q {assert:receiver, node-properties:{durable:true, x-amqp0-10-arguments:{qpid.max-count:100}}}"; + Sender s2 = fix.session.createSender(a2); + s2.cancel(); + BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress); + + std::string a3 = "q {assert:sender, node-properties:{x-amqp0-10-arguments:{qpid.max-count:99}}}"; + BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress); + Receiver r3 = fix.session.createReceiver(a3); + r3.cancel(); + + fix.admin.deleteQueue("q"); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |