diff options
| author | Gordon Sim <gsim@apache.org> | 2013-10-24 12:37:53 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-10-24 12:37:53 +0000 |
| commit | 1188148f10e2e814c58ca10e387cf1391b70ca35 (patch) | |
| tree | 332bcb53d7dd6ce120e39fb1b3b8dfecfd4aed0b /qpid/cpp/src/tests | |
| parent | 842f77bfeb53f5ceeb63cd9dc65b80defdb2266d (diff) | |
| download | qpid-python-1188148f10e2e814c58ca10e387cf1391b70ca35.tar.gz | |
QPID-4265: test closing of receiver with concurrent fetch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1535355 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 49 |
1 files changed, 47 insertions, 2 deletions
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 323d7e8231..605faf7fdd 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1243,11 +1243,18 @@ struct Fetcher : public qpid::sys::Runnable { Receiver receiver; Message message; bool result; + qpid::messaging::Duration timeout; + bool timedOut; - Fetcher(Receiver r) : receiver(r), result(false) {} + Fetcher(Receiver r) : receiver(r), result(false), timeout(Duration::SECOND*10), timedOut(false) {} void run() { - result = receiver.fetch(message, Duration::SECOND*10); + qpid::sys::AbsTime start(qpid::sys::now()); + try { + result = receiver.fetch(message, timeout); + } catch (const MessagingException&) {} + qpid::sys::Duration timeTaken(start, qpid::sys::now()); + timedOut = (uint64_t) timeTaken >= timeout.getMilliseconds() * qpid::sys::TIME_MSEC; } }; } @@ -1387,6 +1394,44 @@ QPID_AUTO_TEST_CASE(testRollbackWithFullPrefetch) txsession.commit(); } +QPID_AUTO_TEST_CASE(testCloseAndConcurrentFetch) +{ + QueueFixture fix; + Receiver receiver = fix.session.createReceiver(fix.queue); + Fetcher fetcher(receiver); + qpid::sys::Thread runner(fetcher); + ::usleep(500); + receiver.close(); + runner.join(); + BOOST_CHECK(!fetcher.timedOut); +} + +QPID_AUTO_TEST_CASE(testCloseAndMultipleConcurrentFetches) +{ + QueueFixture fix; + Receiver receiver = fix.session.createReceiver(fix.queue); + Receiver receiver2 = fix.session.createReceiver("amq.fanout"); + Receiver receiver3 = fix.session.createReceiver("amq.fanout"); + Fetcher fetcher(receiver); + Fetcher fetcher2(receiver2); + Fetcher fetcher3(receiver3); + qpid::sys::Thread runner(fetcher); + qpid::sys::Thread runner2(fetcher2); + qpid::sys::Thread runner3(fetcher3); + ::usleep(500); + receiver.close(); + Message message("Test"); + fix.session.createSender("amq.fanout").send(message); + runner2.join(); + BOOST_CHECK(fetcher2.result); + BOOST_CHECK_EQUAL(fetcher2.message.getContent(), message.getContent()); + runner3.join(); + BOOST_CHECK(fetcher3.result); + BOOST_CHECK_EQUAL(fetcher3.message.getContent(), message.getContent()); + runner.join(); + BOOST_CHECK(!fetcher.timedOut); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |
