diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/cpp/src/tests/ClientSessionTest.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/ClientSessionTest.cpp')
-rw-r--r-- | qpid/cpp/src/tests/ClientSessionTest.cpp | 682 |
1 files changed, 682 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/ClientSessionTest.cpp b/qpid/cpp/src/tests/ClientSessionTest.cpp new file mode 100644 index 0000000000..3c0cff7350 --- /dev/null +++ b/qpid/cpp/src/tests/ClientSessionTest.cpp @@ -0,0 +1,682 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "unit_test.h" +#include "test_tools.h" +#include "BrokerFixture.h" +#include "qpid/client/QueueOptions.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Time.h" +#include "qpid/client/Session.h" +#include "qpid/client/Message.h" +#include "qpid/framing/reply_exceptions.h" + +#include <boost/optional.hpp> +#include <boost/lexical_cast.hpp> +#include <boost/bind.hpp> +#include <boost/ptr_container/ptr_vector.hpp> + +#include <vector> + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(ClientSessionTest) + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid; +using qpid::sys::Monitor; +using qpid::sys::Thread; +using qpid::sys::TIME_SEC; +using qpid::broker::Broker; +using std::string; +using std::cout; +using std::endl; + + +struct DummyListener : public sys::Runnable, public MessageListener { + std::vector<Message> messages; + string name; + uint expected; + SubscriptionManager submgr; + + DummyListener(Session& session, const string& n, uint ex) : + name(n), expected(ex), submgr(session) {} + + void run() + { + submgr.subscribe(*this, name); + submgr.run(); + } + + void received(Message& msg) + { + messages.push_back(msg); + if (--expected == 0) { + submgr.stop(); + } + } +}; + +struct SimpleListener : public MessageListener +{ + Monitor lock; + std::vector<Message> messages; + + void received(Message& msg) + { + Monitor::ScopedLock l(lock); + messages.push_back(msg); + lock.notifyAll(); + } + + void waitFor(const uint n) + { + Monitor::ScopedLock l(lock); + while (messages.size() < n) { + lock.wait(); + } + } +}; + +struct ClientSessionFixture : public ProxySessionFixture +{ + ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) { + session.queueDeclare(arg::queue="my-queue"); + } +}; + +QPID_AUTO_TEST_CASE(testQueueQuery) { + ClientSessionFixture fix; + fix.session = fix.connection.newSession(); + fix.session.queueDeclare(arg::queue="q", arg::alternateExchange="amq.fanout", + arg::exclusive=true, arg::autoDelete=true); + QueueQueryResult result = fix.session.queueQuery("q"); + BOOST_CHECK_EQUAL(false, result.getDurable()); + BOOST_CHECK_EQUAL(true, result.getExclusive()); + BOOST_CHECK_EQUAL("amq.fanout", result.getAlternateExchange()); +} + +QPID_AUTO_TEST_CASE(testDispatcher) +{ + ClientSessionFixture fix; + fix.session =fix.connection.newSession(); + size_t count = 100; + for (size_t i = 0; i < count; ++i) + fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue")); + DummyListener listener(fix.session, "my-queue", count); + listener.run(); + BOOST_CHECK_EQUAL(count, listener.messages.size()); + for (size_t i = 0; i < count; ++i) + BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData()); +} + +QPID_AUTO_TEST_CASE(testDispatcherThread) +{ + ClientSessionFixture fix; + fix.session =fix.connection.newSession(); + size_t count = 10; + DummyListener listener(fix.session, "my-queue", count); + sys::Thread t(listener); + for (size_t i = 0; i < count; ++i) { + fix.session.messageTransfer(arg::content=Message(boost::lexical_cast<string>(i), "my-queue")); + } + t.join(); + BOOST_CHECK_EQUAL(count, listener.messages.size()); + for (size_t i = 0; i < count; ++i) + BOOST_CHECK_EQUAL(boost::lexical_cast<string>(i), listener.messages[i].getData()); +} + +// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented. +void testSuspend0Timeout() { + ClientSessionFixture fix; + fix.session.suspend(); // session has 0 timeout. + try { + fix.connection.resume(fix.session); + BOOST_FAIL("Expected InvalidArgumentException."); + } catch(const InternalErrorException&) {} +} + +QPID_AUTO_TEST_CASE(testUseSuspendedError) +{ + ClientSessionFixture fix; + fix.session.timeout(60); + fix.session.suspend(); + try { + fix.session.exchangeQuery(arg::exchange="amq.fanout"); + BOOST_FAIL("Expected session suspended exception"); + } catch(const NotAttachedException&) {} +} + +// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented. +void testSuspendResume() { + ClientSessionFixture fix; + fix.session.timeout(60); + fix.session.suspend(); + // Make sure we are still subscribed after resume. + fix.connection.resume(fix.session); + fix.session.messageTransfer(arg::content=Message("my-message", "my-queue")); + BOOST_CHECK_EQUAL("my-message", fix.subs.get("my-queue", TIME_SEC).getData()); +} + + +QPID_AUTO_TEST_CASE(testSendToSelf) { + ClientSessionFixture fix; + SimpleListener mylistener; + fix.session.queueDeclare(arg::queue="myq", arg::exclusive=true, arg::autoDelete=true); + fix.subs.subscribe(mylistener, "myq"); + sys::Thread runner(fix.subs);//start dispatcher thread + string data("msg"); + Message msg(data, "myq"); + const uint count=10; + for (uint i = 0; i < count; ++i) { + fix.session.messageTransfer(arg::content=msg); + } + mylistener.waitFor(count); + fix.subs.cancel("myq"); + fix.subs.stop(); + runner.join(); + fix.session.close(); + BOOST_CHECK_EQUAL(mylistener.messages.size(), count); + for (uint j = 0; j < count; ++j) { + BOOST_CHECK_EQUAL(mylistener.messages[j].getData(), data); + } +} + +QPID_AUTO_TEST_CASE(testLocalQueue) { + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="lq", arg::exclusive=true, arg::autoDelete=true); + LocalQueue lq; + fix.subs.subscribe(lq, "lq", FlowControl(2, FlowControl::UNLIMITED, false)); + fix.session.messageTransfer(arg::content=Message("foo0", "lq")); + fix.session.messageTransfer(arg::content=Message("foo1", "lq")); + fix.session.messageTransfer(arg::content=Message("foo2", "lq")); + BOOST_CHECK_EQUAL("foo0", lq.pop().getData()); + BOOST_CHECK_EQUAL("foo1", lq.pop().getData()); + BOOST_CHECK(lq.empty()); // Credit exhausted. + fix.subs.getSubscription("lq").setFlowControl(FlowControl::unlimited()); + BOOST_CHECK_EQUAL("foo2", lq.pop().getData()); +} + +struct DelayedTransfer : sys::Runnable +{ + ClientSessionFixture& fixture; + + DelayedTransfer(ClientSessionFixture& f) : fixture(f) {} + + void run() + { + qpid::sys::sleep(1); + fixture.session.messageTransfer(arg::content=Message("foo2", "getq")); + } +}; + +QPID_AUTO_TEST_CASE(testGet) { + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="getq", arg::exclusive=true, arg::autoDelete=true); + fix.session.messageTransfer(arg::content=Message("foo0", "getq")); + fix.session.messageTransfer(arg::content=Message("foo1", "getq")); + Message got; + BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC)); + BOOST_CHECK_EQUAL("foo0", got.getData()); + BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC)); + BOOST_CHECK_EQUAL("foo1", got.getData()); + BOOST_CHECK(!fix.subs.get(got, "getq")); + DelayedTransfer sender(fix); + Thread t(sender); + //test timed get where message shows up after a short delay + BOOST_CHECK(fix.subs.get(got, "getq", 5*TIME_SEC)); + BOOST_CHECK_EQUAL("foo2", got.getData()); + t.join(); +} + +QPID_AUTO_TEST_CASE(testOpenFailure) { + BrokerFixture b; + Connection c; + string host("unknowable-host"); + try { + c.open(host); + } catch (const Exception&) { + BOOST_CHECK(!c.isOpen()); + } + b.open(c); + BOOST_CHECK(c.isOpen()); + c.close(); + BOOST_CHECK(!c.isOpen()); +} + +QPID_AUTO_TEST_CASE(testPeriodicExpiration) { + Broker::Options opts; + opts.queueCleanInterval = 1; + opts.queueFlowStopRatio = 0; + opts.queueFlowResumeRatio = 0; + ClientSessionFixture fix(opts); + FieldTable args; + args.setInt("qpid.max_count",10); + fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); + + for (uint i = 0; i < 10; i++) { + Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); + if (i % 2) m.getDeliveryProperties().setTtl(500); + fix.session.messageTransfer(arg::content=m); + } + + BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 10u); + qpid::sys::sleep(2); + BOOST_CHECK_EQUAL(fix.session.queueQuery(string("my-queue")).getMessageCount(), 5u); + fix.session.messageTransfer(arg::content=Message("Message_11", "my-queue"));//ensure policy is also updated +} + +QPID_AUTO_TEST_CASE(testExpirationOnPop) { + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); + + for (uint i = 0; i < 10; i++) { + Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); + if (i % 2) m.getDeliveryProperties().setTtl(200); + fix.session.messageTransfer(arg::content=m); + } + + qpid::sys::usleep(300* 1000); + + for (uint i = 0; i < 10; i++) { + if (i % 2) continue; + Message m; + BOOST_CHECK(fix.subs.get(m, "my-queue", TIME_SEC)); + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); + } +} + +QPID_AUTO_TEST_CASE(testRelease) { + ClientSessionFixture fix; + + const uint count=10; + for (uint i = 0; i < count; i++) { + Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); + fix.session.messageTransfer(arg::content=m); + } + + fix.subs.setAutoStop(false); + fix.subs.start(); + SubscriptionSettings settings; + settings.autoAck = 0; + + SimpleListener l1; + Subscription s1 = fix.subs.subscribe(l1, "my-queue", settings); + l1.waitFor(count); + s1.cancel(); + + for (uint i = 0; i < count; i++) { + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l1.messages[i].getData()); + } + s1.release(s1.getUnaccepted()); + + //check that released messages are redelivered + settings.autoAck = 1; + SimpleListener l2; + Subscription s2 = fix.subs.subscribe(l2, "my-queue", settings); + l2.waitFor(count); + for (uint i = 0; i < count; i++) { + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l2.messages[i].getData()); + } + + fix.subs.stop(); + fix.subs.wait(); + fix.session.close(); +} + +QPID_AUTO_TEST_CASE(testCompleteOnAccept) { + ClientSessionFixture fix; + const uint count = 8; + const uint chunk = 4; + for (uint i = 0; i < count; i++) { + Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); + fix.session.messageTransfer(arg::content=m); + } + + SubscriptionSettings settings; + settings.autoAck = 0; + settings.completionMode = COMPLETE_ON_ACCEPT; + settings.flowControl = FlowControl::messageWindow(chunk); + + LocalQueue q; + Subscription s = fix.subs.subscribe(q, "my-queue", settings); + fix.session.messageFlush(arg::destination=s.getName()); + SequenceSet accepted; + for (uint i = 0; i < chunk; i++) { + Message m; + BOOST_CHECK(q.get(m)); + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); + accepted.add(m.getId()); + } + Message m; + BOOST_CHECK(!q.get(m)); + + s.accept(accepted); + fix.session.messageFlush(arg::destination=s.getName()); + accepted.clear(); + + for (uint i = chunk; i < count; i++) { + Message m; + BOOST_CHECK(q.get(m)); + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); + accepted.add(m.getId()); + } + fix.session.messageAccept(accepted); +} + +namespace +{ +struct Publisher : qpid::sys::Runnable +{ + AsyncSession session; + Message message; + uint count; + Thread thread; + + Publisher(Connection& con, Message m, uint c) : session(con.newSession()), message(m), count(c) {} + + void start() + { + thread = Thread(*this); + } + + void join() + { + thread.join(); + } + + void run() + { + for (uint i = 0; i < count; i++) { + session.messageTransfer(arg::content=message); + } + session.sync(); + session.close(); + } +}; +} + +QPID_AUTO_TEST_CASE(testConcurrentSenders) +{ + //Ensure concurrent publishing sessions on a connection don't + //cause assertions, deadlocks or other undesirables: + BrokerFixture fix; + Connection connection; + ConnectionSettings settings; + settings.maxFrameSize = 1024; + settings.port = fix.broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); + connection.open(settings); + AsyncSession session = connection.newSession(); + Message message(string(512, 'X')); + + boost::ptr_vector<Publisher> publishers; + for (size_t i = 0; i < 5; i++) { + publishers.push_back(new Publisher(connection, message, 100)); + } + std::for_each(publishers.begin(), publishers.end(), boost::bind(&Publisher::start, _1)); + std::for_each(publishers.begin(), publishers.end(), boost::bind(&Publisher::join, _1)); + connection.close(); +} + + +QPID_AUTO_TEST_CASE(testExclusiveSubscribe) +{ + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="myq", arg::exclusive=true, arg::autoDelete=true); + SubscriptionSettings settings; + settings.exclusive = true; + LocalQueue q; + fix.subs.subscribe(q, "myq", settings, "first"); + //attempt to create new subscriber should fail + ScopedSuppressLogging sl; + BOOST_CHECK_THROW(fix.subs.subscribe(q, "myq", "second"), ResourceLockedException); + ; + +} + +QPID_AUTO_TEST_CASE(testExclusiveBinding) { + FieldTable options; + options.setString("qpid.exclusive-binding", "anything"); + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="queue-1", arg::exclusive=true, arg::autoDelete=true); + fix.session.queueDeclare(arg::queue="queue-2", arg::exclusive=true, arg::autoDelete=true); + fix.session.exchangeBind(arg::exchange="amq.direct", arg::queue="queue-1", arg::bindingKey="my-key", arg::arguments=options); + fix.session.messageTransfer(arg::destination="amq.direct", arg::content=Message("message1", "my-key")); + fix.session.exchangeBind(arg::exchange="amq.direct", arg::queue="queue-2", arg::bindingKey="my-key", arg::arguments=options); + fix.session.messageTransfer(arg::destination="amq.direct", arg::content=Message("message2", "my-key")); + + Message got; + BOOST_CHECK(fix.subs.get(got, "queue-1")); + BOOST_CHECK_EQUAL("message1", got.getData()); + BOOST_CHECK(!fix.subs.get(got, "queue-1")); + + BOOST_CHECK(fix.subs.get(got, "queue-2")); + BOOST_CHECK_EQUAL("message2", got.getData()); + BOOST_CHECK(!fix.subs.get(got, "queue-2")); +} + +QPID_AUTO_TEST_CASE(testResubscribeWithLocalQueue) { + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="some-queue", arg::exclusive=true, arg::autoDelete=true); + LocalQueue p, q; + fix.subs.subscribe(p, "some-queue"); + fix.subs.cancel("some-queue"); + fix.subs.subscribe(q, "some-queue"); + + fix.session.messageTransfer(arg::content=Message("some-data", "some-queue")); + fix.session.messageFlush(arg::destination="some-queue"); + + Message got; + BOOST_CHECK(!p.get(got)); + + BOOST_CHECK(q.get(got)); + BOOST_CHECK_EQUAL("some-data", got.getData()); + BOOST_CHECK(!q.get(got)); +} + +QPID_AUTO_TEST_CASE(testReliableDispatch) { + ClientSessionFixture fix; + std::string queue("a-queue"); + fix.session.queueDeclare(arg::queue=queue, arg::autoDelete=true); + + ConnectionSettings settings; + settings.port = fix.broker->getPort(qpid::broker::Broker::TCP_TRANSPORT); + + Connection c1; + c1.open(settings); + Session s1 = c1.newSession(); + SubscriptionManager subs1(s1); + LocalQueue q1; + subs1.subscribe(q1, queue, FlowControl());//first subscriber has no credit + + Connection c2; + c2.open(settings); + Session s2 = c2.newSession(); + SubscriptionManager subs2(s2); + LocalQueue q2; + subs2.subscribe(q2, queue);//second subscriber has credit + + fix.session.messageTransfer(arg::content=Message("my-message", queue)); + + //check that the second consumer gets the message + Message got; + BOOST_CHECK(q2.get(got, 1*TIME_SEC)); + BOOST_CHECK_EQUAL("my-message", got.getData()); + + c1.close(); + c2.close(); +} + +QPID_AUTO_TEST_CASE(testSessionCloseOnInvalidSession) { + Session session; + session.close(); +} + +QPID_AUTO_TEST_CASE(testLVQVariedSize) { + ClientSessionFixture fix; + std::string queue("my-lvq"); + QueueOptions args; + args.setOrdering(LVQ_NO_BROWSE); + fix.session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); + + std::string key; + args.getLVQKey(key); + + for (size_t i = 0; i < 10; i++) { + std::ostringstream data; + size_t size = 100 - ((i % 10) * 10); + data << std::string(size, 'x'); + + Message m(data.str(), queue); + m.getHeaders().setString(key, "abc"); + fix.session.messageTransfer(arg::content=m); + } +} + +QPID_AUTO_TEST_CASE(testSessionManagerSetFlowControl) { + ClientSessionFixture fix; + std::string name("dummy"); + LocalQueue queue; + SubscriptionSettings settings; + settings.flowControl = FlowControl(); + fix.session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true); + fix.subs.subscribe(queue, name, settings); + fix.session.messageTransfer(arg::content=Message("my-message", name)); + fix.subs.setFlowControl(name, 1, FlowControl::UNLIMITED, false); + fix.session.messageFlush(name); + Message got; + BOOST_CHECK(queue.get(got, 0)); + BOOST_CHECK_EQUAL("my-message", got.getData()); +} + +QPID_AUTO_TEST_CASE(testGetThenSubscribe) { + ClientSessionFixture fix; + std::string name("myqueue"); + fix.session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true); + fix.session.messageTransfer(arg::content=Message("one", name)); + fix.session.messageTransfer(arg::content=Message("two", name)); + Message got; + BOOST_CHECK(fix.subs.get(got, name)); + BOOST_CHECK_EQUAL("one", got.getData()); + + DummyListener listener(fix.session, name, 1); + listener.run(); + BOOST_CHECK_EQUAL(1u, listener.messages.size()); + if (!listener.messages.empty()) { + BOOST_CHECK_EQUAL("two", listener.messages[0].getData()); + } +} + +QPID_AUTO_TEST_CASE(testSessionIsValid) { + ClientSessionFixture fix; + BOOST_CHECK(fix.session.isValid()); + Session session; + BOOST_CHECK(!session.isValid()); +} + +QPID_AUTO_TEST_CASE(testExpirationNotAltered) { + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="my-queue", arg::exclusive=true, arg::autoDelete=true); + + Message m("my-message", "my-queue"); + m.getDeliveryProperties().setTtl(60000); + m.getDeliveryProperties().setExpiration(12345); + fix.session.messageTransfer(arg::content=m); + Message got; + BOOST_CHECK(fix.subs.get(got, "my-queue")); + BOOST_CHECK_EQUAL("my-message", got.getData()); + BOOST_CHECK_EQUAL(12345u, got.getDeliveryProperties().getExpiration()); +} + +QPID_AUTO_TEST_CASE(testGetConnectionFromSession) { + ClientSessionFixture fix; + FieldTable options; + options.setInt("no-local", 1); + fix.session.queueDeclare(arg::queue="a", arg::exclusive=true, arg::autoDelete=true, arg::arguments=options); + fix.session.queueDeclare(arg::queue="b", arg::exclusive=true, arg::autoDelete=true); + + Connection c = fix.session.getConnection(); + Session s = c.newSession(); + //If this new session was created as expected on the same connection as + //fix.session, then the no-local behaviour means that queue 'a' + //will not enqueue messages from this new session but queue 'b' + //will. + s.messageTransfer(arg::content=Message("a", "a")); + s.messageTransfer(arg::content=Message("b", "b")); + + Message got; + BOOST_CHECK(fix.subs.get(got, "b")); + BOOST_CHECK_EQUAL("b", got.getData()); + BOOST_CHECK(!fix.subs.get(got, "a")); +} + + +QPID_AUTO_TEST_CASE(testQueueDeleted) +{ + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="my-queue"); + LocalQueue queue; + fix.subs.subscribe(queue, "my-queue"); + + ScopedSuppressLogging sl; + fix.session.queueDelete(arg::queue="my-queue"); + BOOST_CHECK_THROW(queue.get(1*qpid::sys::TIME_SEC), qpid::framing::ResourceDeletedException); +} + +QPID_AUTO_TEST_CASE(testTtl) +{ + const uint64_t ms = 1000ULL; // convert sec to ms + const uint64_t us = 1000ULL * 1000ULL; // convert sec to us + + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="ttl-test", arg::exclusive=true, arg::autoDelete=true); + Message msg1 = Message("AAA", "ttl-test"); + uint64_t ttl = 2 * ms; // 2 sec + msg1.getDeliveryProperties().setTtl(ttl); + Connection c = fix.session.getConnection(); + Session s = c.newSession(); + s.messageTransfer(arg::content=msg1); + + Message msg2 = Message("BBB", "ttl-test"); + ttl = 10 * ms; // 10 sec + msg2.getDeliveryProperties().setTtl(ttl); + s.messageTransfer(arg::content=msg2); + + qpid::sys::usleep(5 * us); // 5 sec + + // Message "AAA" should be expired and never be delivered + // Check "BBB" has ttl somewhere between 1 and 5 secs + Message got; + BOOST_CHECK(fix.subs.get(got, "ttl-test")); + BOOST_CHECK_EQUAL("BBB", got.getData()); + BOOST_CHECK(got.getDeliveryProperties().getTtl() > 1 * ms); + BOOST_CHECK(got.getDeliveryProperties().getTtl() < ttl - (5 * ms)); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests |