diff options
Diffstat (limited to 'qpid/cpp/src/tests/ClientSessionTest.cpp')
-rw-r--r-- | qpid/cpp/src/tests/ClientSessionTest.cpp | 663 |
1 files changed, 0 insertions, 663 deletions
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp deleted file mode 100644 index f35524c0c0..0000000000 --- a/qpid/cpp/src/tests/ClientSessionTest.cpp +++ /dev/null @@ -1,663 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "unit_test.h" -#include "test_tools.h" -#include "BrokerFixture.h" -#include "qpid/client/QueueOptions.h" -#include "qpid/client/MessageListener.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Time.h" -#include "qpid/client/Session.h" -#include "qpid/client/Message.h" -#include "qpid/framing/reply_exceptions.h" - -#include <boost/optional.hpp> -#include <boost/lexical_cast.hpp> -#include <boost/bind.hpp> -#include <boost/ptr_container/ptr_vector.hpp> -#include <boost/format.hpp> - -#include <vector> - -namespace qpid { -namespace tests { - -QPID_AUTO_TEST_SUITE(ClientSessionTest) - -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid; -using qpid::sys::Monitor; -using qpid::sys::Thread; -using qpid::sys::TIME_SEC; -using qpid::broker::BrokerOptions; -using std::string; -using std::cout; -using std::endl; - - -struct DummyListener : public sys::Runnable, public MessageListener { - std::vector<Message> messages; - string name; - uint expected; - SubscriptionManager submgr; - - DummyListener(Session& session, const string& n, uint ex) : - name(n), expected(ex), submgr(session) {} - - void run() - { - submgr.subscribe(*this, name); - submgr.run(); - } - - void received(Message& msg) - { - messages.push_back(msg); - if (--expected == 0) { - submgr.stop(); - } - } -}; - -struct SimpleListener : public MessageListener -{ - Monitor lock; - std::vector<Message> messages; - - void received(Message& msg) - { - Monitor::ScopedLock l(lock); - messages.push_back(msg); - lock.notifyAll(); - } - - void waitFor(const uint n) - { - Monitor::ScopedLock l(lock); - while (messages.size() < n) { - lock.wait(); - } - } -}; - -struct ClientSessionFixture : public SessionFixture -{ - ClientSessionFixture(const BrokerOptions& opts = BrokerOptions()) : SessionFixture(opts) { - session.queueDeclare(arg::queue="my-queue"); - } -}; - -QPID_AUTO_TEST_CASE(testQueueQuery) { - ClientSessionFixture fix; - fix.session = fix.connection.newSession(); - fix.session.queueDeclare(arg::queue="q", arg::alternateExchange="amq.fanout", - arg::exclusive=true, arg::autoDelete=true); - QueueQueryResult result = fix.session.queueQuery("q"); - BOOST_CHECK_EQUAL(false, result.getDurable()); - BOOST_CHECK_EQUAL(true, result.getExclusive()); - BOOST_CHECK_EQUAL("amq.fanout", result.getAlternateExchange()); -} - -QPID_AUTO_TEST_CASE(testDispatcher) -{ - ClientSessionFixture fix; - fix.session =fix.connection.newSession(); - size_t count = 100; - for (size_t i = 0; i < count; ++i) - fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue")); - DummyListener listener(fix.session, "my-queue", count); - listener.run(); - BOOST_CHECK_EQUAL(count, listener.messages.size()); - for (size_t i = 0; i < count; ++i) - BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData()); -} - -QPID_AUTO_TEST_CASE(testDispatcherThread) -{ - ClientSessionFixture fix; - fix.session =fix.connection.newSession(); - size_t count = 10; - DummyListener listener(fix.session, "my-queue", count); - sys::Thread t(listener); - for (size_t i = 0; i < count; ++i) { - fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue")); - } - t.join(); - BOOST_CHECK_EQUAL(count, listener.messages.size()); - for (size_t i = 0; i < count; ++i) - BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData()); -} - -QPID_AUTO_TEST_CASE(testUseSuspendedError) -{ - ClientSessionFixture fix; - fix.session.timeout(60); - fix.session.suspend(); - try { - fix.session.exchangeQuery(arg::exchange="amq.fanout"); - BOOST_FAIL("Expected session suspended exception"); - } catch(const NotAttachedException&) {} -} - -QPID_AUTO_TEST_CASE(testSendToSelf) { - ClientSessionFixture fix; - SimpleListener mylistener; - fix.session.queueDeclare(arg::queue="myq", arg::exclusive=true, arg::autoDelete=true); - fix.subs.subscribe(mylistener, "myq"); - sys::Thread runner(fix.subs);//start dispatcher thread - string data("msg"); - Message msg(data, "myq"); - const uint count=10; - for (uint i = 0; i < count; ++i) { - fix.session.messageTransfer(arg::content=msg); - } - mylistener.waitFor(count); - fix.subs.cancel("myq"); - fix.subs.stop(); - runner.join(); - fix.session.close(); - BOOST_CHECK_EQUAL(mylistener.messages.size(), count); - for (uint j = 0; j < count; ++j) { - BOOST_CHECK_EQUAL(mylistener.messages[j].getData(), data); - } -} - -QPID_AUTO_TEST_CASE(testLocalQueue) { - ClientSessionFixture fix; - fix.session.queueDeclare(arg::queue="lq", arg::exclusive=true, arg::autoDelete=true); - LocalQueue lq; - fix.subs.subscribe(lq, "lq", FlowControl(2, FlowControl::UNLIMITED, false)); - fix.session.messageTransfer(arg::content=Message("foo0", "lq")); - fix.session.messageTransfer(arg::content=Message("foo1", "lq")); - fix.session.messageTransfer(arg::content=Message("foo2", "lq")); - BOOST_CHECK_EQUAL("foo0", lq.pop().getData()); - BOOST_CHECK_EQUAL("foo1", lq.pop().getData()); - BOOST_CHECK(lq.empty()); // Credit exhausted. - fix.subs.getSubscription("lq").setFlowControl(FlowControl::unlimited()); - BOOST_CHECK_EQUAL("foo2", lq.pop().getData()); -} - -struct DelayedTransfer : sys::Runnable -{ - ClientSessionFixture& fixture; - - DelayedTransfer(ClientSessionFixture& f) : fixture(f) {} - - void run() - { - qpid::sys::sleep(1); - fixture.session.messageTransfer(arg::content=Message("foo2", "getq")); - } -}; - -QPID_AUTO_TEST_CASE(testGet) { - ClientSessionFixture fix; - fix.session.queueDeclare(arg::queue="getq", arg::exclusive=true, arg::autoDelete=true); - fix.session.messageTransfer(arg::content=Message("foo0", "getq")); - fix.session.messageTransfer(arg::content=Message("foo1", "getq")); - Message got; - BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC)); - BOOST_CHECK_EQUAL("foo0", got.getData()); - BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC)); - BOOST_CHECK_EQUAL("foo1", got.getData()); - BOOST_CHECK(!fix.subs.get(got, "getq")); - DelayedTransfer sender(fix); - Thread t(sender); - //test timed get where message shows up after a short delay - BOOST_CHECK(fix.subs.get(got, "getq", 5*TIME_SEC)); - BOOST_CHECK_EQUAL("foo2", got.getData()); - t.join(); -} - -QPID_AUTO_TEST_CASE(testOpenFailure) { - BrokerFixture b; - Connection c; - string host("unknowable-host"); - try { - c.open(host); - } catch (const Exception&) { - BOOST_CHECK(!c.isOpen()); - } - b.open(c); - BOOST_CHECK(c.isOpen()); - c.close(); - BOOST_CHECK(!c.isOpen()); -} - -QPID_AUTO_TEST_CASE(testPeriodicExpiration) { - BrokerOptions opts; - opts.queueCleanInterval = 1*TIME_SEC; - opts.queueFlowStopRatio = 0; - opts.queueFlowResumeRatio = 0; - ClientSessionFixture fix(opts); - FieldTable args; - args.setInt("qpid.max_count",10); - fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); - - for (uint i = 0; i < 10; i++) { - Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); - if (i % 2) m.getDeliveryProperties().setTtl(500); - fix.session.messageTransfer(arg::content=m); - } - - BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u); - qpid::sys::sleep(2); - BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u); - fix.session.messageTransfer(arg::content=Message("Message_11", "my-queue"));//ensure policy is also updated -} - -QPID_AUTO_TEST_CASE(testExpirationOnPop) { - ClientSessionFixture fix; - fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); - - for (uint i = 0; i < 10; i++) { - Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); - if (i % 2) m.getDeliveryProperties().setTtl(200); - fix.session.messageTransfer(arg::content=m); - } - - qpid::sys::usleep(300* 1000); - - for (uint i = 0; i < 10; i++) { - if (i % 2) continue; - Message m; - BOOST_CHECK(fix.subs.get(m, "my-queue", TIME_SEC)); - BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); - } -} - -QPID_AUTO_TEST_CASE(testRelease) { - ClientSessionFixture fix; - - const uint count=10; - for (uint i = 0; i < count; i++) { - Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); - fix.session.messageTransfer(arg::content=m); - } - - fix.subs.setAutoStop(false); - fix.subs.start(); - SubscriptionSettings settings; - settings.autoAck = 0; - - SimpleListener l1; - Subscription s1 = fix.subs.subscribe(l1, "my-queue", settings); - l1.waitFor(count); - s1.cancel(); - - for (uint i = 0; i < count; i++) { - BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l1.messages[i].getData()); - } - s1.release(s1.getUnaccepted()); - - //check that released messages are redelivered - settings.autoAck = 1; - SimpleListener l2; - Subscription s2 = fix.subs.subscribe(l2, "my-queue", settings); - l2.waitFor(count); - for (uint i = 0; i < count; i++) { - BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l2.messages[i].getData()); - } - - fix.subs.stop(); - fix.subs.wait(); - fix.session.close(); -} - -QPID_AUTO_TEST_CASE(testCompleteOnAccept) { - ClientSessionFixture fix; - const uint count = 8; - const uint chunk = 4; - for (uint i = 0; i < count; i++) { - Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); - fix.session.messageTransfer(arg::content=m); - } - - SubscriptionSettings settings; - settings.autoAck = 0; - settings.completionMode = COMPLETE_ON_ACCEPT; - settings.flowControl = FlowControl::messageWindow(chunk); - - LocalQueue q; - Subscription s = fix.subs.subscribe(q, "my-queue", settings); - fix.session.messageFlush(arg::destination=s.getName()); - SequenceSet accepted; - for (uint i = 0; i < chunk; i++) { - Message m; - BOOST_CHECK(q.get(m)); - BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); - accepted.add(m.getId()); - } - Message m; - BOOST_CHECK(!q.get(m)); - - s.accept(accepted); - //need to reallocate credit as we have flushed it all out - s.setFlowControl(FlowControl::messageWindow(chunk)); - fix.session.messageFlush(arg::destination=s.getName()); - accepted.clear(); - - for (uint i = chunk; i < count; i++) { - Message m; - BOOST_CHECK(q.get(m)); - BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); - accepted.add(m.getId()); - } - fix.session.messageAccept(accepted); -} - -namespace -{ -struct Publisher : qpid::sys::Runnable -{ - AsyncSession session; - Message message; - uint count; - Thread thread; - - Publisher(Connection& con, Message m, uint c) : session(con.newSession()), message(m), count(c) {} - - void start() - { - thread = Thread(*this); - } - - void join() - { - thread.join(); - } - - void run() - { - for (uint i = 0; i < count; i++) { - session.messageTransfer(arg::content=message); - } - session.sync(); - session.close(); - } -}; -} - -QPID_AUTO_TEST_CASE(testConcurrentSenders) -{ - //Ensure concurrent publishing sessions on a connection don't - //cause assertions, deadlocks or other undesirables: - BrokerFixture fix; - Connection connection; - ConnectionSettings settings; - settings.maxFrameSize = 1024; - settings.port = fix.broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); - connection.open(settings); - AsyncSession session = connection.newSession(); - Message message(string(512, 'X')); - - boost::ptr_vector<Publisher> publishers; - for (size_t i = 0; i < 5; i++) { - publishers.push_back(new Publisher(connection, message, 100)); - } - std::for_each(publishers.begin(), publishers.end(), boost::bind(&Publisher::start, _1)); - std::for_each(publishers.begin(), publishers.end(), boost::bind(&Publisher::join, _1)); - connection.close(); -} - - -QPID_AUTO_TEST_CASE(testExclusiveSubscribe) -{ - ClientSessionFixture fix; - fix.session.queueDeclare(arg::queue="myq", arg::exclusive=true, arg::autoDelete=true); - SubscriptionSettings settings; - settings.exclusive = true; - LocalQueue q; - fix.subs.subscribe(q, "myq", settings, "first"); - //attempt to create new subscriber should fail - ScopedSuppressLogging sl; - BOOST_CHECK_THROW(fix.subs.subscribe(q, "myq", "second"), ResourceLockedException); - ; - -} - -QPID_AUTO_TEST_CASE(testExclusiveBinding) { - FieldTable options; - options.setString("qpid.exclusive-binding", "anything"); - ClientSessionFixture fix; - fix.session.queueDeclare(arg::queue="queue-1", arg::exclusive=true, arg::autoDelete=true); - fix.session.queueDeclare(arg::queue="queue-2", arg::exclusive=true, arg::autoDelete=true); - fix.session.exchangeBind(arg::exchange="amq.direct", arg::queue="queue-1", arg::bindingKey="my-key", arg::arguments=options); - fix.session.messageTransfer(arg::destination="amq.direct", arg::content=Message("message1", "my-key")); - fix.session.exchangeBind(arg::exchange="amq.direct", arg::queue="queue-2", arg::bindingKey="my-key", arg::arguments=options); - fix.session.messageTransfer(arg::destination="amq.direct", arg::content=Message("message2", "my-key")); - - Message got; - BOOST_CHECK(fix.subs.get(got, "queue-1")); - BOOST_CHECK_EQUAL("message1", got.getData()); - BOOST_CHECK(!fix.subs.get(got, "queue-1")); - - BOOST_CHECK(fix.subs.get(got, "queue-2")); - BOOST_CHECK_EQUAL("message2", got.getData()); - BOOST_CHECK(!fix.subs.get(got, "queue-2")); -} - -QPID_AUTO_TEST_CASE(testResubscribeWithLocalQueue) { - ClientSessionFixture fix; - fix.session.queueDeclare(arg::queue="some-queue", arg::exclusive=true, arg::autoDelete=true); - LocalQueue p, q; - fix.subs.subscribe(p, "some-queue"); - fix.subs.cancel("some-queue"); - fix.subs.subscribe(q, "some-queue"); - - fix.session.messageTransfer(arg::content=Message("some-data", "some-queue")); - fix.session.messageFlush(arg::destination="some-queue"); - - Message got; - BOOST_CHECK(!p.get(got)); - - BOOST_CHECK(q.get(got)); - BOOST_CHECK_EQUAL("some-data", got.getData()); - BOOST_CHECK(!q.get(got)); -} - -QPID_AUTO_TEST_CASE(testReliableDispatch) { - ClientSessionFixture fix; - std::string queue("a-queue"); - fix.session.queueDeclare(arg::queue=queue, arg::autoDelete=true); - - ConnectionSettings settings; - settings.port = fix.broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); - - Connection c1; - c1.open(settings); - Session s1 = c1.newSession(); - SubscriptionManager subs1(s1); - LocalQueue q1; - subs1.subscribe(q1, queue, FlowControl());//first subscriber has no credit - - Connection c2; - c2.open(settings); - Session s2 = c2.newSession(); - SubscriptionManager subs2(s2); - LocalQueue q2; - subs2.subscribe(q2, queue);//second subscriber has credit - - fix.session.messageTransfer(arg::content=Message("my-message", queue)); - - //check that the second consumer gets the message - Message got; - BOOST_CHECK(q2.get(got, 1*TIME_SEC)); - BOOST_CHECK_EQUAL("my-message", got.getData()); - - c1.close(); - c2.close(); -} - -QPID_AUTO_TEST_CASE(testSessionCloseOnInvalidSession) { - Session session; - session.close(); -} - -QPID_AUTO_TEST_CASE(testLVQVariedSize) { - ClientSessionFixture fix; - std::string queue("my-lvq"); - QueueOptions args; - args.setOrdering(LVQ_NO_BROWSE); - fix.session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); - - std::string key; - args.getLVQKey(key); - - for (size_t i = 0; i < 10; i++) { - std::ostringstream data; - size_t size = 100 - ((i % 10) * 10); - data << std::string(size, 'x'); - - Message m(data.str(), queue); - m.getHeaders().setString(key, "abc"); - fix.session.messageTransfer(arg::content=m); - } -} - -QPID_AUTO_TEST_CASE(testSessionManagerSetFlowControl) { - ClientSessionFixture fix; - std::string name("dummy"); - LocalQueue queue; - SubscriptionSettings settings; - settings.flowControl = FlowControl(); - fix.session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true); - fix.subs.subscribe(queue, name, settings); - fix.session.messageTransfer(arg::content=Message("my-message", name)); - fix.subs.setFlowControl(name, 1, FlowControl::UNLIMITED, false); - fix.session.messageFlush(name); - Message got; - BOOST_CHECK(queue.get(got, 0)); - BOOST_CHECK_EQUAL("my-message", got.getData()); -} - -QPID_AUTO_TEST_CASE(testGetThenSubscribe) { - ClientSessionFixture fix; - std::string name("myqueue"); - fix.session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true); - fix.session.messageTransfer(arg::content=Message("one", name)); - fix.session.messageTransfer(arg::content=Message("two", name)); - Message got; - BOOST_CHECK(fix.subs.get(got, name)); - BOOST_CHECK_EQUAL("one", got.getData()); - - DummyListener listener(fix.session, name, 1); - listener.run(); - BOOST_CHECK_EQUAL(1u, listener.messages.size()); - if (!listener.messages.empty()) { - BOOST_CHECK_EQUAL("two", listener.messages[0].getData()); - } -} - -QPID_AUTO_TEST_CASE(testSessionIsValid) { - ClientSessionFixture fix; - BOOST_CHECK(fix.session.isValid()); - Session session; - BOOST_CHECK(!session.isValid()); -} - -QPID_AUTO_TEST_CASE(testExpirationNotAltered) { - ClientSessionFixture fix; - fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); - - Message m("my-message", "my-queue"); - m.getDeliveryProperties().setTtl(60000); - m.getDeliveryProperties().setExpiration(12345); - fix.session.messageTransfer(arg::content=m); - Message got; - BOOST_CHECK(fix.subs.get(got, "my-queue")); - BOOST_CHECK_EQUAL("my-message", got.getData()); - BOOST_CHECK_EQUAL(12345u, got.getDeliveryProperties().getExpiration()); -} - -QPID_AUTO_TEST_CASE(testGetConnectionFromSession) { - ClientSessionFixture fix; - FieldTable options; - options.setInt("no-local", 1); - fix.session.queueDeclare(arg::queue="a", arg::exclusive=true, arg::autoDelete=true, arg::arguments=options); - fix.session.queueDeclare(arg::queue="b", arg::exclusive=true, arg::autoDelete=true); - - Connection c = fix.session.getConnection(); - Session s = c.newSession(); - //If this new session was created as expected on the same connection as - //fix.session, then the no-local behaviour means that queue 'a' - //will not enqueue messages from this new session but queue 'b' - //will. - s.messageTransfer(arg::content=Message("a", "a")); - s.messageTransfer(arg::content=Message("b", "b")); - - Message got; - BOOST_CHECK(fix.subs.get(got, "b")); - BOOST_CHECK_EQUAL("b", got.getData()); - BOOST_CHECK(!fix.subs.get(got, "a")); -} - - -QPID_AUTO_TEST_CASE(testQueueDeleted) -{ - ClientSessionFixture fix; - fix.session.queueDeclare(arg::queue="my-queue"); - LocalQueue queue; - fix.subs.subscribe(queue, "my-queue"); - - ScopedSuppressLogging sl; - fix.session.queueDelete(arg::queue="my-queue"); - BOOST_CHECK_THROW(queue.get(1*qpid::sys::TIME_SEC), qpid::framing::ResourceDeletedException); -} - -QPID_AUTO_TEST_CASE(testTtl) -{ - const uint64_t ms = 1000ULL; // convert sec to ms - const uint64_t us = 1000ULL * 1000ULL; // convert sec to us - - ClientSessionFixture fix; - fix.session.queueDeclare(arg::queue="ttl-test", arg::exclusive=true, arg::autoDelete=true); - Message msg1 = Message("AAA", "ttl-test"); - uint64_t ttl = 2 * ms; // 2 sec - msg1.getDeliveryProperties().setTtl(ttl); - Connection c = fix.session.getConnection(); - Session s = c.newSession(); - s.messageTransfer(arg::content=msg1); - - Message msg2 = Message("BBB", "ttl-test"); - ttl = 10 * ms; // 10 sec - msg2.getDeliveryProperties().setTtl(ttl); - s.messageTransfer(arg::content=msg2); - - qpid::sys::usleep(5 * us); // 5 sec - - // Message "AAA" should be expired and never be delivered - // Check "BBB" has ttl somewhere between 1 and 5 secs - Message got; - BOOST_CHECK(fix.subs.get(got, "ttl-test")); - BOOST_CHECK_EQUAL("BBB", got.getData()); - BOOST_CHECK(got.getDeliveryProperties().getTtl() > 1 * ms); - BOOST_CHECK(got.getDeliveryProperties().getTtl() < ttl - (5 * ms)); -} - -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests |