diff options
Diffstat (limited to 'qpid/cpp/src/tests/MessagingSessionTests.cpp')
| -rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 49 |
1 files changed, 47 insertions, 2 deletions
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 323d7e8231..605faf7fdd 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1243,11 +1243,18 @@ struct Fetcher : public qpid::sys::Runnable { Receiver receiver; Message message; bool result; + qpid::messaging::Duration timeout; + bool timedOut; - Fetcher(Receiver r) : receiver(r), result(false) {} + Fetcher(Receiver r) : receiver(r), result(false), timeout(Duration::SECOND*10), timedOut(false) {} void run() { - result = receiver.fetch(message, Duration::SECOND*10); + qpid::sys::AbsTime start(qpid::sys::now()); + try { + result = receiver.fetch(message, timeout); + } catch (const MessagingException&) {} + qpid::sys::Duration timeTaken(start, qpid::sys::now()); + timedOut = (uint64_t) timeTaken >= timeout.getMilliseconds() * qpid::sys::TIME_MSEC; } }; } @@ -1387,6 +1394,44 @@ QPID_AUTO_TEST_CASE(testRollbackWithFullPrefetch) txsession.commit(); } +QPID_AUTO_TEST_CASE(testCloseAndConcurrentFetch) +{ + QueueFixture fix; + Receiver receiver = fix.session.createReceiver(fix.queue); + Fetcher fetcher(receiver); + qpid::sys::Thread runner(fetcher); + ::usleep(500); + receiver.close(); + runner.join(); + BOOST_CHECK(!fetcher.timedOut); +} + +QPID_AUTO_TEST_CASE(testCloseAndMultipleConcurrentFetches) +{ + QueueFixture fix; + Receiver receiver = fix.session.createReceiver(fix.queue); + Receiver receiver2 = fix.session.createReceiver("amq.fanout"); + Receiver receiver3 = fix.session.createReceiver("amq.fanout"); + Fetcher fetcher(receiver); + Fetcher fetcher2(receiver2); + Fetcher fetcher3(receiver3); + qpid::sys::Thread runner(fetcher); + qpid::sys::Thread runner2(fetcher2); + qpid::sys::Thread runner3(fetcher3); + ::usleep(500); + receiver.close(); + Message message("Test"); + fix.session.createSender("amq.fanout").send(message); + runner2.join(); + BOOST_CHECK(fetcher2.result); + BOOST_CHECK_EQUAL(fetcher2.message.getContent(), message.getContent()); + runner3.join(); + BOOST_CHECK(fetcher3.result); + BOOST_CHECK_EQUAL(fetcher3.message.getContent(), message.getContent()); + runner.join(); + BOOST_CHECK(!fetcher.timedOut); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |
