diff options
| author | Robert Gemmell <robbie@apache.org> | 2016-07-05 21:55:35 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2016-07-05 21:55:35 +0000 |
| commit | f160cb6566c17945f7ebc4f3a752b2cc6a051685 (patch) | |
| tree | 809f04fc1967c22e5abc52de07602555bed0e920 /qpid/cpp/src/tests/MessagingSessionTests.cpp | |
| parent | ebb276cca41582b73223b55eff9f2d4386f4f746 (diff) | |
| download | qpid-python-f160cb6566c17945f7ebc4f3a752b2cc6a051685.tar.gz | |
QPID-7207: remove cpp and python subdirs from svn trunk, they have migrated to their own git repositories
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1751566 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/MessagingSessionTests.cpp')
| -rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 1628 |
1 files changed, 0 insertions, 1628 deletions
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp deleted file mode 100644 index 3b7ba34fe9..0000000000 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ /dev/null @@ -1,1628 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "MessagingFixture.h" -#include "unit_test.h" -#include "test_tools.h" -#include "qpid/messaging/Address.h" -#include "qpid/messaging/Connection.h" -#include "qpid/messaging/Message.h" -#include "qpid/messaging/Receiver.h" -#include "qpid/messaging/Sender.h" -#include "qpid/messaging/Session.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Session.h" -#include "qpid/framing/ExchangeQueryResult.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/Uuid.h" -#include "qpid/sys/Time.h" -#include <boost/assign.hpp> -#include <boost/format.hpp> -#include <string> -#include <vector> - -namespace qpid { -namespace tests { - -QPID_AUTO_TEST_SUITE(MessagingSessionTests) - -using namespace qpid::messaging; -using namespace qpid::types; -using namespace qpid; -using qpid::broker::BrokerOptions; -using qpid::framing::Uuid; - - -QPID_AUTO_TEST_CASE(testSimpleSendReceive) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out("test-message"); - out.setSubject("test-subject"); - sender.send(out); - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(Duration::SECOND * 5); - fix.session.acknowledge(); - BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); - BOOST_CHECK_EQUAL(in.getSubject(), out.getSubject()); -} - -QPID_AUTO_TEST_CASE(testSyncSendReceive) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out("test-message"); - sender.send(out, true); - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(Duration::IMMEDIATE); - fix.session.acknowledge(true); - BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); -} - -QPID_AUTO_TEST_CASE(testSendReceiveHeaders) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out("test-message"); - for (uint i = 0; i < 10; ++i) { - out.getProperties()["a"] = i; - out.setProperty("b", i + 100); - sender.send(out); - } - uint8_t v1(255u); - int8_t v2(-120); - out.getProperties()["c"] = v1; - out.getProperties()["d"] = v2; - sender.send(out); - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in; - for (uint i = 0; i < 10; ++i) { - BOOST_CHECK(receiver.fetch(in, Duration::SECOND * 5)); - BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); - BOOST_CHECK_EQUAL(in.getProperties()["a"].asUint32(), i); - BOOST_CHECK_EQUAL(in.getProperties()["b"].asUint32(), i + 100); - fix.session.acknowledge(); - } - BOOST_CHECK(receiver.fetch(in, Duration::SECOND * 5)); - Variant& c = in.getProperties()["c"]; - BOOST_CHECK_EQUAL(c.getType(), VAR_UINT8); - BOOST_CHECK_EQUAL(c.asUint8(), v1); - Variant& d = in.getProperties()["d"]; - BOOST_CHECK_EQUAL(d.getType(), VAR_INT8); - BOOST_CHECK_EQUAL(d.asInt8(), v2); -} - -QPID_AUTO_TEST_CASE(testSenderError) -{ - MessagingFixture fix; - ScopedSuppressLogging sl; - BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::NotFound); - fix.session = fix.connection.createSession(); - BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress; {create:receiver}"), - qpid::messaging::NotFound); -} - -QPID_AUTO_TEST_CASE(testReceiverError) -{ - MessagingFixture fix; - ScopedSuppressLogging sl; - BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::NotFound); - fix.session = fix.connection.createSession(); - BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress; {create:sender}"), - qpid::messaging::NotFound); -} - -QPID_AUTO_TEST_CASE(testSimpleTopic) -{ - TopicFixture fix; - - Sender sender = fix.session.createSender(fix.topic); - Message msg("one"); - sender.send(msg); - Receiver sub1 = fix.session.createReceiver(fix.topic); - sub1.setCapacity(10u); - msg.setContent("two"); - sender.send(msg); - Receiver sub2 = fix.session.createReceiver(fix.topic); - sub2.setCapacity(10u); - msg.setContent("three"); - sender.send(msg); - Receiver sub3 = fix.session.createReceiver(fix.topic); - sub3.setCapacity(10u); - msg.setContent("four"); - sender.send(msg); - BOOST_CHECK_EQUAL(fetch(sub2, 2), boost::assign::list_of<std::string>("three")("four")); - sub2.close(); - - msg.setContent("five"); - sender.send(msg); - BOOST_CHECK_EQUAL(fetch(sub1, 4), boost::assign::list_of<std::string>("two")("three")("four")("five")); - BOOST_CHECK_EQUAL(fetch(sub3, 2), boost::assign::list_of<std::string>("four")("five")); - Message in; - BOOST_CHECK(!sub2.fetch(in, Duration::IMMEDIATE));//TODO: or should this raise an error? - - - //TODO: check pending messages... -} - -QPID_AUTO_TEST_CASE(testNextReceiver) -{ - MultiQueueFixture fix; - - for (uint i = 0; i < fix.queues.size(); i++) { - Receiver r = fix.session.createReceiver(fix.queues[i]); - r.setCapacity(10u); - } - - for (uint i = 0; i < fix.queues.size(); i++) { - Sender s = fix.session.createSender(fix.queues[i]); - Message msg((boost::format("Message_%1%") % (i+1)).str()); - s.send(msg); - } - - for (uint i = 0; i < fix.queues.size(); i++) { - Message msg; - BOOST_CHECK(fix.session.nextReceiver().fetch(msg, Duration::SECOND)); - BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); - } -} - -QPID_AUTO_TEST_CASE(testMapMessage) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out; - Variant::Map content; - content["abc"] = "def"; - content["pi"] = 3.14f; - Variant utf8("A utf 8 string"); - utf8.setEncoding("utf8"); - content["utf8"] = utf8; - Variant utf16("\x00\x61\x00\x62\x00\x63"); - utf16.setEncoding("utf16"); - content["utf16"] = utf16; - encode(content, out); - sender.send(out); - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * Duration::SECOND); - Variant::Map view; - decode(in, view); - BOOST_CHECK_EQUAL(view["abc"].asString(), "def"); - BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f); - BOOST_CHECK_EQUAL(view["utf8"].asString(), utf8.asString()); - BOOST_CHECK_EQUAL(view["utf8"].getEncoding(), utf8.getEncoding()); - BOOST_CHECK_EQUAL(view["utf16"].asString(), utf16.asString()); - BOOST_CHECK_EQUAL(view["utf16"].getEncoding(), utf16.getEncoding()); - fix.session.acknowledge(); -} - -QPID_AUTO_TEST_CASE(testMapMessageWithInitial) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out; - Variant::Map imap; - imap["abc"] = "def"; - imap["pi"] = 3.14f; - encode(imap, out); - sender.send(out); - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * Duration::SECOND); - Variant::Map view; - decode(in, view); - BOOST_CHECK_EQUAL(view["abc"].asString(), "def"); - BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f); - fix.session.acknowledge(); -} - -QPID_AUTO_TEST_CASE(testListMessage) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out; - Variant::List content; - content.push_back(Variant("abc")); - content.push_back(Variant(1234)); - content.push_back(Variant("def")); - content.push_back(Variant(56.789)); - encode(content, out); - sender.send(out); - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * Duration::SECOND); - Variant::List view; - decode(in, view); - BOOST_CHECK_EQUAL(view.size(), content.size()); - BOOST_CHECK_EQUAL(view.front().asString(), "abc"); - BOOST_CHECK_EQUAL(view.back().asDouble(), 56.789); - - Variant::List::const_iterator i = view.begin(); - BOOST_CHECK(i != view.end()); - BOOST_CHECK_EQUAL(i->asString(), "abc"); - BOOST_CHECK(++i != view.end()); - BOOST_CHECK_EQUAL(i->asInt64(), 1234); - BOOST_CHECK(++i != view.end()); - BOOST_CHECK_EQUAL(i->asString(), "def"); - BOOST_CHECK(++i != view.end()); - BOOST_CHECK_EQUAL(i->asDouble(), 56.789); - BOOST_CHECK(++i == view.end()); - - fix.session.acknowledge(); -} - -QPID_AUTO_TEST_CASE(testListMessageWithInitial) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out; - Variant::List ilist; - ilist.push_back(Variant("abc")); - ilist.push_back(Variant(1234)); - ilist.push_back(Variant("def")); - ilist.push_back(Variant(56.789)); - encode(ilist, out); - sender.send(out); - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * Duration::SECOND); - Variant::List view; - decode(in, view); - BOOST_CHECK_EQUAL(view.size(), ilist.size()); - BOOST_CHECK_EQUAL(view.front().asString(), "abc"); - BOOST_CHECK_EQUAL(view.back().asDouble(), 56.789); - - Variant::List::const_iterator i = view.begin(); - BOOST_CHECK(i != view.end()); - BOOST_CHECK_EQUAL(i->asString(), "abc"); - BOOST_CHECK(++i != view.end()); - BOOST_CHECK_EQUAL(i->asInt64(), 1234); - BOOST_CHECK(++i != view.end()); - BOOST_CHECK_EQUAL(i->asString(), "def"); - BOOST_CHECK(++i != view.end()); - BOOST_CHECK_EQUAL(i->asDouble(), 56.789); - BOOST_CHECK(++i == view.end()); - - fix.session.acknowledge(); -} - -QPID_AUTO_TEST_CASE(testReject) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message m1("reject-me"); - sender.send(m1); - Message m2("accept-me"); - sender.send(m2); - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * Duration::SECOND); - BOOST_CHECK_EQUAL(in.getContent(), m1.getContent()); - fix.session.reject(in); - in = receiver.fetch(5 * Duration::SECOND); - BOOST_CHECK_EQUAL(in.getContent(), m2.getContent()); - fix.session.acknowledge(); -} - -QPID_AUTO_TEST_CASE(testAvailable) -{ - MultiQueueFixture fix; - - Receiver r1 = fix.session.createReceiver(fix.queues[0]); - r1.setCapacity(100); - - Receiver r2 = fix.session.createReceiver(fix.queues[1]); - r2.setCapacity(100); - - Sender s1 = fix.session.createSender(fix.queues[0]); - Sender s2 = fix.session.createSender(fix.queues[1]); - - for (uint i = 0; i < 10; ++i) { - s1.send(Message((boost::format("A_%1%") % (i+1)).str())); - } - for (uint i = 0; i < 5; ++i) { - s2.send(Message((boost::format("B_%1%") % (i+1)).str())); - } - qpid::sys::sleep(1);//is there any avoid an arbitrary sleep while waiting for messages to be dispatched? - for (uint i = 0; i < 5; ++i) { - BOOST_CHECK_EQUAL(fix.session.getReceivable(), 15u - 2*i); - BOOST_CHECK_EQUAL(r1.getAvailable(), 10u - i); - BOOST_CHECK_EQUAL(r1.fetch().getContent(), (boost::format("A_%1%") % (i+1)).str()); - BOOST_CHECK_EQUAL(r2.getAvailable(), 5u - i); - BOOST_CHECK_EQUAL(r2.fetch().getContent(), (boost::format("B_%1%") % (i+1)).str()); - fix.session.acknowledge(); - } - for (uint i = 5; i < 10; ++i) { - BOOST_CHECK_EQUAL(fix.session.getReceivable(), 10u - i); - BOOST_CHECK_EQUAL(r1.getAvailable(), 10u - i); - BOOST_CHECK_EQUAL(r1.fetch().getContent(), (boost::format("A_%1%") % (i+1)).str()); - } -} - -QPID_AUTO_TEST_CASE(testUnsettledAcks) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - for (uint i = 0; i < 10; ++i) { - sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); - } - Receiver receiver = fix.session.createReceiver(fix.queue); - for (uint i = 0; i < 10; ++i) { - BOOST_CHECK_EQUAL(receiver.fetch().getContent(), (boost::format("Message_%1%") % (i+1)).str()); - } - BOOST_CHECK_EQUAL(fix.session.getUnsettledAcks(), 0u); - fix.session.acknowledge(); - BOOST_CHECK_EQUAL(fix.session.getUnsettledAcks(), 10u); - fix.session.sync(); - BOOST_CHECK_EQUAL(fix.session.getUnsettledAcks(), 0u); -} - -QPID_AUTO_TEST_CASE(testUnsettledSend) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - send(sender, 10); - //Note: this test relies on 'inside knowledge' of the sender - //implementation and the fact that the simple test case makes it - //possible to predict when completion information will be sent to - //the client. TODO: is there a better way of testing this? - BOOST_CHECK_EQUAL(sender.getUnsettled(), 10u); - fix.session.sync(); - BOOST_CHECK_EQUAL(sender.getUnsettled(), 0u); - - Receiver receiver = fix.session.createReceiver(fix.queue); - receive(receiver, 10); - fix.session.acknowledge(); -} - -QPID_AUTO_TEST_CASE(testBrowse) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - send(sender, 10); - Receiver browser1 = fix.session.createReceiver(fix.queue + "; {mode:browse}"); - receive(browser1, 10); - Receiver browser2 = fix.session.createReceiver(fix.queue + "; {mode:browse}"); - receive(browser2, 10); - Receiver releaser1 = fix.session.createReceiver(fix.queue); - Message m1 = releaser1.fetch(messaging::Duration::SECOND*5); - BOOST_CHECK(!m1.getRedelivered()); - fix.session.release(m1); - Receiver releaser2 = fix.session.createReceiver(fix.queue); - Message m2 = releaser2.fetch(messaging::Duration::SECOND*5); - BOOST_CHECK(m2.getRedelivered()); - fix.session.release(m2); - Receiver consumer = fix.session.createReceiver(fix.queue); - receive(consumer, 10); - fix.session.acknowledge(); -} - -struct QueueCreatePolicyFixture : public MessagingFixture -{ - qpid::messaging::Address address; - - QueueCreatePolicyFixture(const std::string& a) : address(a) {} - - void test() - { - ping(address); - BOOST_CHECK(admin.checkQueueExists(address.getName())); - } - - ~QueueCreatePolicyFixture() - { - admin.deleteQueue(address.getName()); - } -}; - -QPID_AUTO_TEST_CASE(testCreatePolicyQueueAlways) -{ - QueueCreatePolicyFixture fix("#; {create:always, node:{type:queue}}"); - fix.test(); -} - -QPID_AUTO_TEST_CASE(testCreatePolicyQueueReceiver) -{ - QueueCreatePolicyFixture fix("#; {create:receiver, node:{type:queue}}"); - Receiver r = fix.session.createReceiver(fix.address); - fix.test(); - r.close(); -} - -QPID_AUTO_TEST_CASE(testCreatePolicyQueueSender) -{ - QueueCreatePolicyFixture fix("#; {create:sender, node:{type:queue}}"); - Sender s = fix.session.createSender(fix.address); - fix.test(); - s.close(); -} - -struct ExchangeCreatePolicyFixture : public MessagingFixture -{ - qpid::messaging::Address address; - const std::string exchangeType; - - ExchangeCreatePolicyFixture(const std::string& a, const std::string& t) : - address(a), exchangeType(t) {} - - void test() - { - ping(address); - std::string actualType; - BOOST_CHECK(admin.checkExchangeExists(address.getName(), actualType)); - BOOST_CHECK_EQUAL(exchangeType, actualType); - } - - ~ExchangeCreatePolicyFixture() - { - admin.deleteExchange(address.getName()); - } -}; - -QPID_AUTO_TEST_CASE(testCreatePolicyTopic) -{ - ExchangeCreatePolicyFixture fix("#; {create:always, node:{type:topic}}", - "topic"); - fix.test(); -} - -QPID_AUTO_TEST_CASE(testCreatePolicyTopicReceiverFanout) -{ - ExchangeCreatePolicyFixture fix("#/my-subject; {create:receiver, node:{type:topic, x-declare:{type:fanout}}}", "fanout"); - Receiver r = fix.session.createReceiver(fix.address); - fix.test(); - r.close(); -} - -QPID_AUTO_TEST_CASE(testCreatePolicyTopicSenderDirect) -{ - ExchangeCreatePolicyFixture fix("#/my-subject; {create:sender, node:{type:topic, x-declare:{type:direct}}}", "direct"); - Sender s = fix.session.createSender(fix.address); - fix.test(); - s.close(); -} - -struct DeletePolicyFixture : public MessagingFixture -{ - enum Mode {RECEIVER, SENDER, ALWAYS, NEVER}; - - std::string getPolicy(Mode mode) - { - switch (mode) { - case SENDER: - return "{delete:sender}"; - case RECEIVER: - return "{delete:receiver}"; - case ALWAYS: - return "{delete:always}"; - case NEVER: - return "{delete:never}"; - } - return ""; - } - - void testAll() - { - test(RECEIVER); - test(SENDER); - test(ALWAYS); - test(NEVER); - } - - virtual ~DeletePolicyFixture() {} - virtual void create(const qpid::messaging::Address&) = 0; - virtual void destroy(const qpid::messaging::Address&) = 0; - virtual bool exists(const qpid::messaging::Address&) = 0; - - void test(Mode mode) - { - qpid::messaging::Address address("testqueue#; " + getPolicy(mode)); - create(address); - - Sender s = session.createSender(address); - Receiver r = session.createReceiver(address); - switch (mode) { - case RECEIVER: - s.close(); - BOOST_CHECK(exists(address)); - r.close(); - BOOST_CHECK(!exists(address)); - break; - case SENDER: - r.close(); - BOOST_CHECK(exists(address)); - s.close(); - BOOST_CHECK(!exists(address)); - break; - case ALWAYS: - s.close(); - BOOST_CHECK(!exists(address)); - break; - case NEVER: - r.close(); - BOOST_CHECK(exists(address)); - s.close(); - BOOST_CHECK(exists(address)); - destroy(address); - } - } -}; - -struct QueueDeletePolicyFixture : DeletePolicyFixture -{ - void create(const qpid::messaging::Address& address) - { - admin.createQueue(address.getName()); - } - void destroy(const qpid::messaging::Address& address) - { - admin.deleteQueue(address.getName()); - } - bool exists(const qpid::messaging::Address& address) - { - return admin.checkQueueExists(address.getName()); - } -}; - -struct ExchangeDeletePolicyFixture : DeletePolicyFixture -{ - const std::string exchangeType; - ExchangeDeletePolicyFixture(const std::string type = "topic") : exchangeType(type) {} - - void create(const qpid::messaging::Address& address) - { - admin.createExchange(address.getName(), exchangeType); - } - void destroy(const qpid::messaging::Address& address) - { - admin.deleteExchange(address.getName()); - } - bool exists(const qpid::messaging::Address& address) - { - std::string actualType; - return admin.checkExchangeExists(address.getName(), actualType) && actualType == exchangeType; - } -}; - -QPID_AUTO_TEST_CASE(testDeletePolicyQueue) -{ - QueueDeletePolicyFixture fix; - fix.testAll(); -} - -QPID_AUTO_TEST_CASE(testDeletePolicyExchange) -{ - ExchangeDeletePolicyFixture fix; - fix.testAll(); -} - -QPID_AUTO_TEST_CASE(testAssertPolicyQueue) -{ - MessagingFixture fix; - std::string a1 = "q; {create:always, assert:always, node:{type:queue, durable:false, x-declare:{arguments:{qpid.max-count:100}}}}"; - Sender s1 = fix.session.createSender(a1); - s1.close(); - Receiver r1 = fix.session.createReceiver(a1); - r1.close(); - - std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}"; - Sender s2 = fix.session.createSender(a2); - s2.close(); - BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::AssertionFailed); - - std::string a3 = "q; {assert:sender, node:{x-declare:{arguments:{qpid.max-count:99}}}}"; - BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::AssertionFailed); - Receiver r3 = fix.session.createReceiver(a3); - r3.close(); - - fix.admin.deleteQueue("q"); -} - -QPID_AUTO_TEST_CASE(testAssertExchangeOption) -{ - MessagingFixture fix; - std::string a1 = "e; {create:always, assert:always, node:{type:topic, x-declare:{type:direct, arguments:{qpid.msg_sequence:True}}}}"; - Sender s1 = fix.session.createSender(a1); - s1.close(); - Receiver r1 = fix.session.createReceiver(a1); - r1.close(); - - std::string a2 = "e; {assert:receiver, node:{type:topic, x-declare:{type:fanout, arguments:{qpid.msg_sequence:True}}}}"; - Sender s2 = fix.session.createSender(a2); - s2.close(); - BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::AssertionFailed); - - std::string a3 = "e; {assert:sender, node:{x-declare:{arguments:{qpid.msg_sequence:False}}}}"; - BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::AssertionFailed); - Receiver r3 = fix.session.createReceiver(a3); - r3.close(); - - fix.admin.deleteExchange("e"); -} - -QPID_AUTO_TEST_CASE(testGetSender) -{ - QueueFixture fix; - std::string name = fix.session.createSender(fix.queue).getName(); - Sender sender = fix.session.getSender(name); - BOOST_CHECK_EQUAL(name, sender.getName()); - Message out(Uuid(true).str()); - sender.send(out); - Message in; - BOOST_CHECK(fix.session.createReceiver(fix.queue).fetch(in)); - BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); - BOOST_CHECK_THROW(fix.session.getSender("UnknownSender"), qpid::messaging::KeyError); -} - -QPID_AUTO_TEST_CASE(testGetReceiver) -{ - QueueFixture fix; - std::string name = fix.session.createReceiver(fix.queue).getName(); - Receiver receiver = fix.session.getReceiver(name); - BOOST_CHECK_EQUAL(name, receiver.getName()); - Message out(Uuid(true).str()); - fix.session.createSender(fix.queue).send(out); - Message in; - BOOST_CHECK(receiver.fetch(in)); - BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); - BOOST_CHECK_THROW(fix.session.getReceiver("UnknownReceiver"), qpid::messaging::KeyError); -} - -QPID_AUTO_TEST_CASE(testGetSessionFromConnection) -{ - QueueFixture fix; - fix.connection.createSession("my-session"); - Session session = fix.connection.getSession("my-session"); - Message out(Uuid(true).str()); - session.createSender(fix.queue).send(out); - Message in; - BOOST_CHECK(session.createReceiver(fix.queue).fetch(in)); - BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); - BOOST_CHECK_THROW(fix.connection.getSession("UnknownSession"), qpid::messaging::KeyError); -} - -QPID_AUTO_TEST_CASE(testGetConnectionFromSession) -{ - QueueFixture fix; - Message out(Uuid(true).str()); - Sender sender = fix.session.createSender(fix.queue); - sender.send(out); - Message in; - sender.getSession().getConnection().createSession("incoming"); - BOOST_CHECK(fix.connection.getSession("incoming").createReceiver(fix.queue).fetch(in)); - BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); -} - -QPID_AUTO_TEST_CASE(testTx) -{ - QueueFixture fix; - Session ssn1 = fix.connection.createTransactionalSession(); - Session ssn2 = fix.connection.createTransactionalSession(); - Sender sender1 = ssn1.createSender(fix.queue); - Sender sender2 = ssn2.createSender(fix.queue); - Receiver receiver1 = ssn1.createReceiver(fix.queue); - Receiver receiver2 = ssn2.createReceiver(fix.queue); - Message in; - - send(sender1, 5, 1, "A"); - send(sender2, 5, 1, "B"); - ssn2.commit(); - receive(receiver1, 5, 1, "B");//(only those from sender2 should be received) - BOOST_CHECK(!receiver1.fetch(in, Duration::IMMEDIATE));//check there are no more messages - ssn1.rollback(); - receive(receiver2, 5, 1, "B"); - BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE));//check there are no more messages - ssn2.rollback(); - receive(receiver1, 5, 1, "B"); - BOOST_CHECK(!receiver1.fetch(in, Duration::IMMEDIATE));//check there are no more messages - ssn1.commit(); - //check neither receiver gets any more messages: - BOOST_CHECK(!receiver1.fetch(in, Duration::IMMEDIATE)); - BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE)); -} - -QPID_AUTO_TEST_CASE(testRelease) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out("test-message"); - sender.send(out, true); - Receiver receiver = fix.session.createReceiver(fix.queue); - Message m1 = receiver.fetch(Duration::IMMEDIATE); - fix.session.release(m1); - Message m2 = receiver.fetch(Duration::SECOND * 1); - BOOST_CHECK_EQUAL(m1.getContent(), out.getContent()); - BOOST_CHECK_EQUAL(m1.getContent(), m2.getContent()); - BOOST_CHECK(m2.getRedelivered()); - fix.session.acknowledge(true); -} - -QPID_AUTO_TEST_CASE(testOptionVerification) -{ - MessagingFixture fix; - fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}"); - BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError); -} - -QPID_AUTO_TEST_CASE(testReceiveSpecialProperties) -{ - QueueFixture fix; - - qpid::client::Message out; - out.getDeliveryProperties().setRoutingKey(fix.queue); - out.getMessageProperties().setAppId("my-app-id"); - out.getMessageProperties().setMessageId(qpid::framing::Uuid(true)); - out.getMessageProperties().setContentEncoding("my-content-encoding"); - fix.admin.send(out); - - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(Duration::SECOND * 5); - BOOST_CHECK_EQUAL(in.getProperties()["x-amqp-0-10.routing-key"].asString(), out.getDeliveryProperties().getRoutingKey()); - BOOST_CHECK_EQUAL(in.getProperties()["x-amqp-0-10.app-id"].asString(), out.getMessageProperties().getAppId()); - BOOST_CHECK_EQUAL(in.getProperties()["x-amqp-0-10.content-encoding"].asString(), out.getMessageProperties().getContentEncoding()); - BOOST_CHECK_EQUAL(in.getMessageId(), out.getMessageProperties().getMessageId().str()); - fix.session.acknowledge(true); -} - -QPID_AUTO_TEST_CASE(testSendSpecialProperties) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out("test-message"); - std::string appId = "my-app-id"; - std::string contentEncoding = "my-content-encoding"; - out.getProperties()["x-amqp-0-10.app-id"] = appId; - out.getProperties()["x-amqp-0-10.content-encoding"] = contentEncoding; - out.setMessageId(qpid::framing::Uuid(true).str()); - sender.send(out, true); - - qpid::client::LocalQueue q; - qpid::client::SubscriptionManager subs(fix.admin.session); - qpid::client::Subscription s = subs.subscribe(q, fix.queue); - qpid::client::Message in = q.get(); - s.cancel(); - fix.admin.session.sync(); - - BOOST_CHECK_EQUAL(in.getMessageProperties().getAppId(), appId); - BOOST_CHECK_EQUAL(in.getMessageProperties().getContentEncoding(), contentEncoding); - BOOST_CHECK_EQUAL(in.getMessageProperties().getMessageId().str(), out.getMessageId()); -} - -QPID_AUTO_TEST_CASE(testExclusiveSubscriber) -{ - QueueFixture fix; - std::string address = (boost::format("%1%; { link: { x-subscribe : { exclusive:true } } }") % fix.queue).str(); - Receiver receiver = fix.session.createReceiver(address); - ScopedSuppressLogging sl; - try { - fix.session.createReceiver(address); - fix.session.sync(); - BOOST_FAIL("Expected exception."); - } catch (const MessagingException& /*e*/) {} -} - - -QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser) -{ - MessagingFixture fix; - - std::string address = "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }"; - std::string browseAddress = "exclusive-queue; { mode: browse }"; - - Receiver receiver = fix.session.createReceiver(address); - fix.session.sync(); - - Connection c2 = fix.newConnection(); - c2.open(); - Session s2 = c2.createSession(); - - BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress)); - c2.close(); -} - - -QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages) -{ - MessagingFixture fix; - const uint capacity = 5; - - Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}"); - Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}"); - Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}"); - - receiver1.setCapacity(capacity); - receiver2.setCapacity(capacity*2); - - Message out("test-message"); - for (uint i = 0; i < capacity*2; ++i) { - sender.send(out); - } - - receiver1.close(); - - // Make sure all pending messages were sent to the alternate - // exchange when the queue was deleted. - Message in; - for (uint i = 0; i < capacity*2; ++i) { - in = receiver2.fetch(Duration::SECOND * 5); - BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); - } -} - -QPID_AUTO_TEST_CASE(testAuthenticatedUsername) -{ - MessagingFixture fix; - Connection connection = fix.newConnection(); - connection.setOption("sasl-mechanism", "PLAIN"); - connection.setOption("username", "test-user"); - connection.setOption("password", "ignored"); - connection.open(); - BOOST_CHECK_EQUAL(connection.getAuthenticatedUsername(), std::string("test-user")); -} - -QPID_AUTO_TEST_CASE(testExceptionOnClosedConnection) -{ - MessagingFixture fix; - fix.connection.close(); - BOOST_CHECK_THROW(fix.connection.createSession(), MessagingException); - Connection connection("blah"); - BOOST_CHECK_THROW(connection.createSession(), MessagingException); -} - -QPID_AUTO_TEST_CASE(testAcknowledge) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - const uint count(20); - for (uint i = 0; i < count; ++i) { - sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); - } - - Session other = fix.connection.createSession(); - Receiver receiver = other.createReceiver(fix.queue); - std::vector<Message> messages; - for (uint i = 0; i < count; ++i) { - Message msg = receiver.fetch(); - BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); - messages.push_back(msg); - } - const uint batch(10); //acknowledge first 10 messages only - for (uint i = 0; i < batch; ++i) { - other.acknowledge(messages[i]); - } - messages.clear(); - other.sync(); - other.close(); - - other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); - for (uint i = 0; i < (count-batch); ++i) { - Message msg = receiver.fetch(); - BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); - if (i % 2) other.acknowledge(msg); //acknowledge every other message - } - other.sync(); - other.close(); - - //check unacknowledged messages are still enqueued - other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); - for (uint i = 0; i < ((count-batch)/2); ++i) { - Message msg = receiver.fetch(); - BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str()); - } - other.acknowledge();//acknowledge all messages - other.sync(); - other.close(); - - Message m; - //check queue is empty - BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE)); -} - -QPID_AUTO_TEST_CASE(testQmfCreateAndDelete) -{ - MessagingFixture fix(BrokerOptions(), true/*enable management*/); - MethodInvoker control(fix.session); - control.createQueue("my-queue"); - control.createExchange("my-exchange", "topic"); - control.bind("my-exchange", "my-queue", "subject1"); - - Sender sender = fix.session.createSender("my-exchange"); - Receiver receiver = fix.session.createReceiver("my-queue"); - Message out; - out.setSubject("subject1"); - out.setContent("one"); - sender.send(out); - Message in; - BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5)); - BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); - control.unbind("my-exchange", "my-queue", "subject1"); - control.bind("my-exchange", "my-queue", "subject2"); - - out.setContent("two"); - sender.send(out);//should be dropped - - out.setSubject("subject2"); - out.setContent("three"); - sender.send(out);//should not be dropped - - BOOST_CHECK(receiver.fetch(in, Duration::SECOND*5)); - BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); - BOOST_CHECK(!receiver.fetch(in, Duration::IMMEDIATE)); - sender.close(); - receiver.close(); - - control.deleteExchange("my-exchange"); - messaging::Session other = fix.connection.createSession(); - { - ScopedSuppressLogging sl; - BOOST_CHECK_THROW(other.createSender("my-exchange"), qpid::messaging::NotFound); - } - control.deleteQueue("my-queue"); - other = fix.connection.createSession(); - { - ScopedSuppressLogging sl; - BOOST_CHECK_THROW(other.createReceiver("my-queue"), qpid::messaging::NotFound); - } -} - -QPID_AUTO_TEST_CASE(testRejectAndCredit) -{ - //Ensure credit is restored on completing rejected messages - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Receiver receiver = fix.session.createReceiver(fix.queue); - - const uint count(10); - receiver.setCapacity(count); - for (uint i = 0; i < count; i++) { - sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); - } - - Message in; - for (uint i = 0; i < count; ++i) { - if (receiver.fetch(in, Duration::SECOND)) { - BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") % (i+1)).str()); - fix.session.reject(in); - } else { - BOOST_FAIL((boost::format("Message_%1% not received as expected") % (i+1)).str()); - break; - } - } - //send another batch of messages - for (uint i = 0; i < count; i++) { - sender.send(Message((boost::format("Message_%1%") % (i+count)).str())); - } - - for (uint i = 0; i < count; ++i) { - if (receiver.fetch(in, Duration::SECOND)) { - BOOST_CHECK_EQUAL(in.getContent(), (boost::format("Message_%1%") % (i+count)).str()); - } else { - BOOST_FAIL((boost::format("Message_%1% not received as expected") % (i+count)).str()); - break; - } - } - fix.session.acknowledge(); - receiver.close(); - sender.close(); -} - -QPID_AUTO_TEST_CASE(testTtlForever) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out("I want to live forever!"); - out.setTtl(Duration::FOREVER); - sender.send(out, true); - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(Duration::IMMEDIATE); - fix.session.acknowledge(); - BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); - BOOST_CHECK(in.getTtl() == Duration::FOREVER); -} - -QPID_AUTO_TEST_CASE(testExclusiveTopicSubscriber) -{ - TopicFixture fix; - std::string address = (boost::format("%1%; { link: { name: 'my-subscription', x-declare: { auto-delete: true, exclusive: true }}}") % fix.topic).str(); - Sender sender = fix.session.createSender(fix.topic); - Receiver receiver1 = fix.session.createReceiver(address); - { - ScopedSuppressLogging sl; - try { - fix.session.createReceiver(address); - fix.session.sync(); - BOOST_FAIL("Expected exception."); - } catch (const MessagingException& /*e*/) {} - } -} - -QPID_AUTO_TEST_CASE(testNonExclusiveSubscriber) -{ - TopicFixture fix; - std::string address = (boost::format("%1%; {node:{type:topic}, link:{name:'my-subscription', x-declare:{auto-delete:true, exclusive:false}}}") % fix.topic).str(); - Receiver receiver1 = fix.session.createReceiver(address); - Receiver receiver2 = fix.session.createReceiver(address); - Sender sender = fix.session.createSender(fix.topic); - sender.send(Message("one"), true); - Message in = receiver1.fetch(Duration::IMMEDIATE); - BOOST_CHECK_EQUAL(in.getContent(), std::string("one")); - sender.send(Message("two"), true); - in = receiver2.fetch(Duration::IMMEDIATE); - BOOST_CHECK_EQUAL(in.getContent(), std::string("two")); - fix.session.acknowledge(); -} - -QPID_AUTO_TEST_CASE(testAcknowledgeUpTo) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - const uint count(20); - for (uint i = 0; i < count; ++i) { - sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); - } - - Session other = fix.connection.createSession(); - Receiver receiver = other.createReceiver(fix.queue); - std::vector<Message> messages; - for (uint i = 0; i < count; ++i) { - Message msg = receiver.fetch(); - BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); - messages.push_back(msg); - } - const uint batch = 10; - other.acknowledgeUpTo(messages[batch-1]);//acknowledge first 10 messages only - - messages.clear(); - other.sync(); - other.close(); - - other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); - Message msg; - for (uint i = 0; i < (count-batch); ++i) { - msg = receiver.fetch(); - BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); - } - other.acknowledgeUpTo(msg); - other.sync(); - other.close(); - - Message m; - //check queue is empty - BOOST_CHECK(!fix.session.createReceiver(fix.queue).fetch(m, Duration::IMMEDIATE)); -} - -QPID_AUTO_TEST_CASE(testCreateBindingsOnStandardExchange) -{ - QueueFixture fix; - Sender sender = fix.session.createSender((boost::format("amq.direct; {create:always, node:{type:topic, x-bindings:[{queue:%1%, key:my-subject}]}}") % fix.queue).str()); - Message out("test-message"); - out.setSubject("my-subject"); - sender.send(out); - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(Duration::SECOND * 5); - fix.session.acknowledge(); - BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); - BOOST_CHECK_EQUAL(in.getSubject(), out.getSubject()); -} - -QPID_AUTO_TEST_CASE(testUnsubscribeOnClose) -{ - MessagingFixture fix; - Sender sender = fix.session.createSender("my-exchange/my-subject; {create: always, delete:sender, node:{type:topic, x-declare:{alternate-exchange:amq.fanout}}}"); - Receiver receiver = fix.session.createReceiver("my-exchange/my-subject"); - Receiver deadletters = fix.session.createReceiver("amq.fanout"); - - sender.send(Message("first")); - Message in = receiver.fetch(Duration::SECOND); - BOOST_CHECK_EQUAL(in.getContent(), std::string("first")); - fix.session.acknowledge(); - receiver.close(); - sender.send(Message("second")); - in = deadletters.fetch(Duration::SECOND); - BOOST_CHECK_EQUAL(in.getContent(), std::string("second")); - fix.session.acknowledge(); -} - -QPID_AUTO_TEST_CASE(testHeadersExchange) -{ - MessagingFixture fix; - //use both quoted and unquoted values - Receiver receiver = fix.session.createReceiver("amq.match; {link:{x-bindings:[{arguments:{x-match:all,qpid.subject:'abc',my-property:abc}}]}}"); - Sender sender = fix.session.createSender("amq.match"); - Message out("test-message"); - out.setSubject("abc"); - Variant& property = out.getProperties()["my-property"]; - property = "abc"; - property.setEncoding("utf8"); - sender.send(out, true); - Message in; - if (receiver.fetch(in, Duration::SECOND)) { - fix.session.acknowledge(); - BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); - } else { - BOOST_FAIL("Message did not match as expected!"); - } -} - -QPID_AUTO_TEST_CASE(testLargeRoutingKey) -{ - MessagingFixture fix; - std::string address = "amq.direct/" + std::string(300, 'x');//routing/binding key can be at most 225 chars in 0-10 - BOOST_CHECK_THROW(fix.session.createReceiver(address), qpid::messaging::MessagingException); -} - -QPID_AUTO_TEST_CASE(testAlternateExchangeInLinkDeclare) -{ - MessagingFixture fix; - Sender s = fix.session.createSender("amq.direct/key"); - Receiver r1 = fix.session.createReceiver("amq.direct/key;{link:{x-declare:{alternate-exchange:'amq.fanout'}}}"); - Receiver r2 = fix.session.createReceiver("amq.fanout"); - - for (uint i = 0; i < 10; ++i) { - s.send(Message((boost::format("Message_%1%") % (i+1)).str()), true); - } - r1.close();//orphans all messages in subscription queue, which should then be routed through alternate exchange - for (uint i = 0; i < 10; ++i) { - Message received; - BOOST_CHECK(r2.fetch(received, Duration::SECOND)); - BOOST_CHECK_EQUAL(received.getContent(), (boost::format("Message_%1%") % (i+1)).str()); - } -} - -QPID_AUTO_TEST_CASE(testBrowseOnly) -{ - /* Set up a queue browse-only, and try to receive - the same messages twice with two different receivers. - This works because the browse-only queue does not - allow message acquisition. */ - - QueueFixture fix; - std::string addr = "q; {create:always, node:{type:queue, durable:false, x-declare:{arguments:{qpid.browse-only:1}}}}"; - Sender sender = fix.session.createSender(addr); - Message out("test-message"); - - int count = 10; - for ( int i = 0; i < count; ++ i ) { - sender.send(out); - } - - Message m; - - Receiver receiver_1 = fix.session.createReceiver(addr); - for ( int i = 0; i < count; ++ i ) { - BOOST_CHECK(receiver_1.fetch(m, Duration::SECOND)); - } - - Receiver receiver_2 = fix.session.createReceiver(addr); - for ( int i = 0; i < count; ++ i ) { - BOOST_CHECK(receiver_2.fetch(m, Duration::SECOND)); - } - - fix.session.acknowledge(); -} - -QPID_AUTO_TEST_CASE(testLinkBindingCleanup) -{ - MessagingFixture fix; - - Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}"); - - Connection connection = fix.newConnection(); - connection.open(); - - Session session(connection.createSession()); - Receiver receiver1 = session.createReceiver("test.q;{create:always, node:{type:queue, x-bindings:[{exchange:test.ex,queue:test.q,key:#,arguments:{x-scope:session}}]}}"); - Receiver receiver2 = fix.session.createReceiver("test.q;{create:never, delete:always}"); - connection.close(); - - sender.send(Message("test-message"), true); - - // The session-scoped binding should be removed when receiver1's network connection is lost - Message in; - BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE)); -} - -namespace { -struct Fetcher : public qpid::sys::Runnable { - Receiver receiver; - Message message; - bool result; - qpid::messaging::Duration timeout; - bool timedOut; - - Fetcher(Receiver r) : receiver(r), result(false), timeout(Duration::SECOND*10), timedOut(false) {} - void run() - { - qpid::sys::AbsTime start(qpid::sys::now()); - try { - result = receiver.fetch(message, timeout); - } catch (const MessagingException&) {} - qpid::sys::Duration timeTaken(start, qpid::sys::now()); - timedOut = (uint64_t) timeTaken >= timeout.getMilliseconds() * qpid::sys::TIME_MSEC; - } -}; -} - -QPID_AUTO_TEST_CASE(testConcurrentFetch) -{ - MessagingFixture fix; - Sender sender = fix.session.createSender("my-test-queue;{create:always, node : { x-declare : { auto-delete: true}}}"); - Receiver receiver = fix.session.createReceiver("my-test-queue"); - Fetcher fetcher(fix.session.createReceiver("amq.fanout")); - qpid::sys::Thread runner(fetcher); - Message out("test-message"); - for (int i = 0; i < 10; i++) {//try several times to make sure - sender.send(out, true); - //since the message is now on the queue, it should take less than the timeout to actually fetch it - qpid::sys::AbsTime start = qpid::sys::AbsTime::now(); - Message in; - BOOST_CHECK(receiver.fetch(in, qpid::messaging::Duration::SECOND*2)); - qpid::sys::Duration time(start, qpid::sys::AbsTime::now()); - BOOST_CHECK(time < qpid::sys::TIME_SEC*2); - if (time >= qpid::sys::TIME_SEC*2) break;//if we failed, no need to keep testing - } - fix.session.createSender("amq.fanout").send(out); - runner.join(); - BOOST_CHECK(fetcher.result); -} - -QPID_AUTO_TEST_CASE(testSimpleRequestResponse) -{ - QueueFixture fix; - //create receiver on temp queue for responses (using shorthand for temp queue) - Receiver r1 = fix.session.createReceiver("#"); - //send request - Sender s1 = fix.session.createSender(fix.queue); - Message original("test-message"); - original.setSubject("test-subject"); - original.setReplyTo(r1.getAddress()); - s1.send(original); - - //receive request and send response - Receiver r2 = fix.session.createReceiver(fix.queue); - Message m = r2.fetch(Duration::SECOND * 5); - Sender s2 = fix.session.createSender(m.getReplyTo()); - s2.send(m); - m = r1.fetch(Duration::SECOND * 5); - fix.session.acknowledge(); - BOOST_CHECK_EQUAL(m.getContent(), original.getContent()); - BOOST_CHECK_EQUAL(m.getSubject(), original.getSubject()); -} - -QPID_AUTO_TEST_CASE(testSelfDestructQueue) -{ - MessagingFixture fix; - Session other = fix.connection.createSession(); - Receiver r1 = other.createReceiver("amq.fanout; {link:{reliability:at-least-once, x-declare:{arguments:{qpid.max_count:10,qpid.policy_type:self-destruct}}}}"); - Receiver r2 = fix.session.createReceiver("amq.fanout"); - //send request - Sender s = fix.session.createSender("amq.fanout"); - for (uint i = 0; i < 20; ++i) { - s.send(Message((boost::format("MSG_%1%") % (i+1)).str())); - } - try { - ScopedSuppressLogging sl; - for (uint i = 0; i < 20; ++i) { - r1.fetch(Duration::SECOND); - } - BOOST_FAIL("Expected exception."); - } catch (const qpid::messaging::MessagingException&) { - } - - for (uint i = 0; i < 20; ++i) { - BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str()); - } -} - -QPID_AUTO_TEST_CASE(testReroutingRingQueue) -{ - MessagingFixture fix; - Receiver r1 = fix.session.createReceiver("my-queue; {create:always, node:{x-declare:{alternate-exchange:amq.fanout, auto-delete:True, arguments:{qpid.max_count:10,qpid.policy_type:ring}}}}"); - Receiver r2 = fix.session.createReceiver("amq.fanout"); - - Sender s = fix.session.createSender("my-queue"); - for (uint i = 0; i < 20; ++i) { - s.send(Message((boost::format("MSG_%1%") % (i+1)).str())); - } - for (uint i = 10; i < 20; ++i) { - BOOST_CHECK_EQUAL(r1.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str()); - } - for (uint i = 0; i < 10; ++i) { - BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str()); - } -} - -QPID_AUTO_TEST_CASE(testReleaseOnPriorityQueue) -{ - MessagingFixture fix; - std::string queue("queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{qpid.priorities:10}}}}"); - std::string text("my message"); - Sender sender = fix.session.createSender(queue); - sender.send(Message(text)); - Receiver receiver = fix.session.createReceiver(queue); - Message msg; - for (uint i = 0; i < 10; ++i) { - if (receiver.fetch(msg, Duration::SECOND)) { - BOOST_CHECK_EQUAL(msg.getContent(), text); - fix.session.release(msg); - } else { - BOOST_FAIL("Released message not redelivered as expected."); - } - } - fix.session.acknowledge(); -} - -QPID_AUTO_TEST_CASE(testRollbackWithFullPrefetch) -{ - QueueFixture fix; - std::string first("first"); - std::string second("second"); - Sender sender = fix.session.createSender(fix.queue); - for (uint i = 0; i < 10; ++i) { - sender.send(Message((boost::format("MSG_%1%") % (i+1)).str())); - } - Session txsession = fix.connection.createTransactionalSession(); - Receiver receiver = txsession.createReceiver(fix.queue); - receiver.setCapacity(9); - Message msg; - for (uint i = 0; i < 10; ++i) { - if (receiver.fetch(msg, Duration::SECOND)) { - BOOST_CHECK_EQUAL(msg.getContent(), std::string("MSG_1")); - txsession.rollback(); - } else { - BOOST_FAIL("Released message not redelivered as expected."); - break; - } - } - txsession.acknowledge(); - txsession.commit(); -} - -QPID_AUTO_TEST_CASE(testCloseAndConcurrentFetch) -{ - QueueFixture fix; - Receiver receiver = fix.session.createReceiver(fix.queue); - Fetcher fetcher(receiver); - qpid::sys::Thread runner(fetcher); - qpid::sys::usleep(500); - receiver.close(); - runner.join(); - BOOST_CHECK(!fetcher.timedOut); -} - -QPID_AUTO_TEST_CASE(testCloseAndMultipleConcurrentFetches) -{ - QueueFixture fix; - Receiver receiver = fix.session.createReceiver(fix.queue); - Receiver receiver2 = fix.session.createReceiver("amq.fanout"); - Receiver receiver3 = fix.session.createReceiver("amq.fanout"); - Fetcher fetcher(receiver); - Fetcher fetcher2(receiver2); - Fetcher fetcher3(receiver3); - qpid::sys::Thread runner(fetcher); - qpid::sys::Thread runner2(fetcher2); - qpid::sys::Thread runner3(fetcher3); - qpid::sys::usleep(500); - receiver.close(); - Message message("Test"); - fix.session.createSender("amq.fanout").send(message); - runner2.join(); - BOOST_CHECK(fetcher2.result); - BOOST_CHECK_EQUAL(fetcher2.message.getContent(), message.getContent()); - runner3.join(); - BOOST_CHECK(fetcher3.result); - BOOST_CHECK_EQUAL(fetcher3.message.getContent(), message.getContent()); - runner.join(); - BOOST_CHECK(!fetcher.timedOut); -} - -QPID_AUTO_TEST_CASE(testSessionCheckError) -{ - MessagingFixture fix; - Session session = fix.connection.createSession(); - Sender sender = session.createSender("q; {create:always, node:{x-declare:{auto-delete:True, arguments:{qpid.max_count:1}}}}"); - ScopedSuppressLogging sl; - for (uint i = 0; i < 2; ++i) { - sender.send(Message((boost::format("A_%1%") % (i+1)).str())); - } - try { - while (true) session.checkError(); - } catch (const qpid::types::Exception&) { - //this is ok - } catch (const qpid::Exception&) { - BOOST_FAIL("Wrong exception type thrown"); - } -} - -QPID_AUTO_TEST_CASE(testImmediateNextReceiver) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out("test message"); - sender.send(out); - fix.session.createReceiver(fix.queue).setCapacity(1); - Receiver next; - qpid::sys::AbsTime start = qpid::sys::now(); - try { - while (!fix.session.nextReceiver(next, qpid::messaging::Duration::IMMEDIATE)) { - qpid::sys::Duration running(start, qpid::sys::now()); - if (running > 5*qpid::sys::TIME_SEC) { - throw qpid::types::Exception("Timed out spinning on nextReceiver(IMMEDIATE)"); - } - qpid::sys::usleep(1); // for valgrind - } - Message in; - BOOST_CHECK(next.fetch(in, qpid::messaging::Duration::IMMEDIATE)); - BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); - next.close(); - } catch (const std::exception& e) { - BOOST_FAIL(e.what()); - } -} - -QPID_AUTO_TEST_CASE(testImmediateNextReceiverNoMessage) -{ - QueueFixture fix; - Receiver r = fix.session.createReceiver(fix.queue); - r.setCapacity(1); - Receiver next; - try { - BOOST_CHECK(!fix.session.nextReceiver(next, qpid::messaging::Duration::IMMEDIATE)); - r.close(); - } catch (const std::exception& e) { - BOOST_FAIL(e.what()); - } -} - -QPID_AUTO_TEST_CASE(testResendEmpty) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out("test-message"); - sender.send(out); - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(Duration::SECOND * 5); - fix.session.acknowledge(); - BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); - //set content on received message to empty string and resend - in.setContent(""); - sender.send(in); - in = receiver.fetch(Duration::SECOND * 5); - fix.session.acknowledge(); - BOOST_CHECK_EQUAL(in.getContent(), std::string()); -} - -QPID_AUTO_TEST_CASE(testResendMapAsString) -{ - QueueFixture fix; - Sender sender = fix.session.createSender(fix.queue); - Message out; - qpid::types::Variant::Map content; - content["foo"] = "bar"; - encode(content, out); - sender.send(out); - - Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(Duration::SECOND * 5); - fix.session.acknowledge(); - BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); - //change content and resend - std::string newContent("something random"); - in.setContent(newContent); - in.setContentType(std::string());//it is no longer a map - sender.send(in); - in = receiver.fetch(Duration::SECOND * 5); - fix.session.acknowledge(); - BOOST_CHECK_EQUAL(in.getContent(), newContent); -} - -QPID_AUTO_TEST_CASE(testClientExpiration) -{ - QueueFixture fix; - Receiver receiver = fix.session.createReceiver(fix.queue); - receiver.setCapacity(5); - Sender sender = fix.session.createSender(fix.queue); - for (uint i = 0; i < 5000; ++i) { - Message msg((boost::format("a_%1%") % (i+1)).str()); - msg.setSubject("a"); - msg.setTtl(Duration(10)); - sender.send(msg); - } - for (uint i = 0; i < 50; ++i) { - Message msg((boost::format("b_%1%") % (i+1)).str()); - msg.setSubject("b"); - sender.send(msg); - } - Message received; - bool done = false; - uint b_count = 0; - while (!done && receiver.fetch(received, Duration::IMMEDIATE)) { - if (received.getSubject() == "b") { - b_count++; - } - done = received.getContent() == "b_50"; - fix.session.acknowledge(); - } - BOOST_CHECK_EQUAL(b_count, 50); -} - -QPID_AUTO_TEST_CASE(testExpiredPrefetchOnClose) -{ - QueueFixture fix; - Receiver receiver = fix.session.createReceiver(fix.queue); - Session other = fix.connection.createSession(); - Receiver receiver2 = other.createReceiver("amq.fanout"); - receiver.setCapacity(500); - Sender sender = fix.session.createSender(fix.queue); - for (uint i = 0; i < 500; ++i) { - Message msg((boost::format("a_%1%") % (i+1)).str()); - msg.setSubject("a"); - msg.setTtl(Duration(5)); - sender.send(msg); - } - Sender sender2 = other.createSender("amq.fanout"); - sender2.send(Message("done")); - BOOST_CHECK_EQUAL(receiver2.fetch().getContent(), "done"); - qpid::sys::usleep(qpid::sys::TIME_MSEC*5);//sorry Alan, I can't see any way to avoid a sleep; need to ensure messages in prefetch have expired - receiver.close(); -} - -QPID_AUTO_TEST_CASE(testPriorityRingEviction) -{ - MessagingFixture fix; - std::string queue("queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{qpid.priorities:10, qpid.max_count:5, qpid.policy_type:ring}}}}"); - Sender sender = fix.session.createSender(queue); - Receiver receiver = fix.session.createReceiver(queue); - std::vector<Message> acquired; - for (uint i = 0; i < 5; ++i) { - Message msg((boost::format("msg_%1%") % (i+1)).str()); - sender.send(msg); - } - //fetch but don't acknowledge messages, leaving them in acquired state - for (uint i = 0; i < 5; ++i) { - Message msg; - BOOST_CHECK(receiver.fetch(msg, Duration::IMMEDIATE)); - BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("msg_%1%") % (i+1)).str()); - acquired.push_back(msg); - } - //send 5 more messages to the queue, which should cause all the - //acquired messages to be dropped - for (uint i = 5; i < 10; ++i) { - Message msg((boost::format("msg_%1%") % (i+1)).str()); - sender.send(msg); - } - //now release the acquired messages, which should have been evicted... - for (std::vector<Message>::iterator i = acquired.begin(); i != acquired.end(); ++i) { - fix.session.release(*i); - } - acquired.clear(); - //and check that the newest five are received - for (uint i = 5; i < 10; ++i) { - Message msg; - BOOST_CHECK(receiver.fetch(msg, Duration::IMMEDIATE)); - BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("msg_%1%") % (i+1)).str()); - acquired.push_back(msg); - } - Message msg; - BOOST_CHECK(!receiver.fetch(msg, Duration::IMMEDIATE)); -} - -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests |
