diff options
Diffstat (limited to 'cpp/src/tests/MessagingSessionTests.cpp')
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 228 |
1 files changed, 228 insertions, 0 deletions
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index 991ec847bf..9d5db84bb4 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -611,6 +611,28 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue) fix.admin.deleteQueue("q"); } +QPID_AUTO_TEST_CASE(testAssertExchangeOption) +{ + MessagingFixture fix; + std::string a1 = "e; {create:always, assert:always, node:{type:topic, x-declare:{type:direct, arguments:{qpid.msg_sequence:True}}}}"; + Sender s1 = fix.session.createSender(a1); + s1.close(); + Receiver r1 = fix.session.createReceiver(a1); + r1.close(); + + std::string a2 = "e; {assert:receiver, node:{type:topic, x-declare:{type:fanout, arguments:{qpid.msg_sequence:True}}}}"; + Sender s2 = fix.session.createSender(a2); + s2.close(); + BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::AssertionFailed); + + std::string a3 = "e; {assert:sender, node:{x-declare:{arguments:{qpid.msg_sequence:False}}}}"; + BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::AssertionFailed); + Receiver r3 = fix.session.createReceiver(a3); + r3.close(); + + fix.admin.deleteExchange("e"); +} + QPID_AUTO_TEST_CASE(testGetSender) { QueueFixture fix; @@ -890,6 +912,212 @@ QPID_AUTO_TEST_CASE(testAcknowledge) BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE)); } +QPID_AUTO_TEST_CASE(testQmfCreateAndDelete) +{ + MessagingFixture fix(Broker::Options(), true/*enable management*/); + MethodInvoker control(fix.session); + control.createQueue("my-queue"); + control.createExchange("my-exchange", "topic"); + control.bind("my-exchange", "my-queue", "subject1"); + + Sender sender = fix.session.createSender("my-exchange"); + Receiver receiver = fix.session.createReceiver("my-queue"); + Message out; + out.setSubject("subject1"); + out.setContent("one"); + sender.send(out); + Message in; + BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5)); + BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); + control.unbind("my-exchange", "my-queue", "subject1"); + control.bind("my-exchange", "my-queue", "subject2"); + + out.setContent("two"); + sender.send(out);//should be dropped + + out.setSubject("subject2"); + out.setContent("three"); + sender.send(out);//should not be dropped + + BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5)); + BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); + BOOST_CHECK(!receiver.fetch(in, Duration::IMMEDIATE)); + sender.close(); + receiver.close(); + + control.deleteExchange("my-exchange"); + messaging::Session other = fix.connection.createSession(); + { + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(other.createSender("my-exchange"), qpid::messaging::NotFound); + } + control.deleteQueue("my-queue"); + other = fix.connection.createSession(); + { + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(other.createReceiver("my-queue"), qpid::messaging::NotFound); + } +} + +QPID_AUTO_TEST_CASE(testRejectAndCredit) +{ + //Ensure credit is restored on completing rejected messages + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + Receiver receiver = fix.session.createReceiver(fix.queue); + + const uint count(10); + receiver.setCapacity(count); + for (uint i = 0; i < count; i++) { + sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); + } + + Message in; + for (uint i = 0; i < count; ++i) { + if (receiver.fetch(in, Duration::SECOND)) { + BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") % (i+1)).str()); + fix.session.reject(in); + } else { + BOOST_FAIL((boost::format("Message_%1% not received as expected") % (i+1)).str()); + break; + } + } + //send another batch of messages + for (uint i = 0; i < count; i++) { + sender.send(Message((boost::format("Message_%1%") % (i+count)).str())); + } + + for (uint i = 0; i < count; ++i) { + if (receiver.fetch(in, Duration::SECOND)) { + BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") % (i+count)).str()); + } else { + BOOST_FAIL((boost::format("Message_%1% not received as expected") % (i+count)).str()); + break; + } + } + fix.session.acknowledge(); + receiver.close(); + sender.close(); +} + +QPID_AUTO_TEST_CASE(testTtlForever) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + Message out("I want to live forever!"); + out.setTtl(Duration::FOREVER); + sender.send(out, true); + Receiver receiver = fix.session.createReceiver(fix.queue); + Message in = receiver.fetch(Duration::IMMEDIATE); + fix.session.acknowledge(); + BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); + BOOST_CHECK(in.getTtl() == Duration::FOREVER); +} + +QPID_AUTO_TEST_CASE(testExclusiveTopicSubscriber) +{ + TopicFixture fix; + std::string address = (boost::format("%1%; { link: { name: 'my-subscription', x-declare: { auto-delete: true, exclusive: true }}}") % fix.topic).str(); + Sender sender = fix.session.createSender(fix.topic); + Receiver receiver1 = fix.session.createReceiver(address); + { + ScopedSuppressLogging sl; + try { + fix.session.createReceiver(address); + fix.session.sync(); + BOOST_FAIL("Expected exception."); + } catch (const MessagingException& /*e*/) {} + } +} + +QPID_AUTO_TEST_CASE(testNonExclusiveSubscriber) +{ + TopicFixture fix; + std::string address = (boost::format("%1%; {node:{type:topic}, link:{name:'my-subscription', x-declare:{auto-delete:true, exclusive:false}}}") % fix.topic).str(); + Receiver receiver1 = fix.session.createReceiver(address); + Receiver receiver2 = fix.session.createReceiver(address); + Sender sender = fix.session.createSender(fix.topic); + sender.send(Message("one"), true); + Message in = receiver1.fetch(Duration::IMMEDIATE); + BOOST_CHECK_EQUAL(in.getContent(), std::string("one")); + sender.send(Message("two"), true); + in = receiver2.fetch(Duration::IMMEDIATE); + BOOST_CHECK_EQUAL(in.getContent(), std::string("two")); + fix.session.acknowledge(); +} + +QPID_AUTO_TEST_CASE(testAcknowledgeUpTo) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + const uint count(20); + for (uint i = 0; i < count; ++i) { + sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); + } + + Session other = fix.connection.createSession(); + Receiver receiver = other.createReceiver(fix.queue); + std::vector<Message> messages; + for (uint i = 0; i < count; ++i) { + Message msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); + messages.push_back(msg); + } + const uint batch = 10; + other.acknowledgeUpTo(messages[batch-1]);//acknowledge first 10 messages only + + messages.clear(); + other.sync(); + other.close(); + + other = fix.connection.createSession(); + receiver = other.createReceiver(fix.queue); + Message msg; + for (uint i = 0; i < (count-batch); ++i) { + msg = receiver.fetch(); + BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); + } + other.acknowledgeUpTo(msg); + other.sync(); + other.close(); + + Message m; + //check queue is empty + BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE)); +} + +QPID_AUTO_TEST_CASE(testCreateBindingsOnStandardExchange) +{ + QueueFixture fix; + Sender sender = fix.session.createSender((boost::format("amq.direct; {create:always, node:{type:topic, x-bindings:[{queue:%1%, key:my-subject}]}}") % fix.queue).str()); + Message out("test-message"); + out.setSubject("my-subject"); + sender.send(out); + Receiver receiver = fix.session.createReceiver(fix.queue); + Message in = receiver.fetch(Duration::SECOND * 5); + fix.session.acknowledge(); + BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); + BOOST_CHECK_EQUAL(in.getSubject(), out.getSubject()); +} + +QPID_AUTO_TEST_CASE(testUnsubscribeOnClose) +{ + MessagingFixture fix; + Sender sender = fix.session.createSender("my-exchange/my-subject; {create: always, delete:sender, node:{type:topic, x-declare:{alternate-exchange:amq.fanout}}}"); + Receiver receiver = fix.session.createReceiver("my-exchange/my-subject"); + Receiver deadletters = fix.session.createReceiver("amq.fanout"); + + sender.send(Message("first")); + Message in = receiver.fetch(Duration::SECOND); + BOOST_CHECK_EQUAL(in.getContent(), std::string("first")); + fix.session.acknowledge(); + receiver.close(); + sender.send(Message("second")); + in = deadletters.fetch(Duration::SECOND); + BOOST_CHECK_EQUAL(in.getContent(), std::string("second")); + fix.session.acknowledge(); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |