diff options
| author | Pavel Moravec <pmoravec@apache.org> | 2014-07-03 12:56:51 +0000 |
|---|---|---|
| committer | Pavel Moravec <pmoravec@apache.org> | 2014-07-03 12:56:51 +0000 |
| commit | 149d2d80191504cada7cb46454d5a364c1131a82 (patch) | |
| tree | 63a3101347e47e92f2440cb6a06d0e85811d6aa8 /qpid/cpp/src | |
| parent | 0b84fbe843ec6b15bbf9bcf2e159a48328f4c2f4 (diff) | |
| download | qpid-python-149d2d80191504cada7cb46454d5a364c1131a82.tar.gz | |
[QPID-5866]: [C++ client] AMQP 1.0 closing session without closing receiver first marks further messages as redelivered (previous commit not complete)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1607628 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 19 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 1 |
2 files changed, 11 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index d29b2eae6f..9ed3713920 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -140,13 +140,9 @@ void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { //explicitly release messages that have yet to be fetched - for (boost::shared_ptr<ReceiverContext> lnk = ssn->nextReceiver(); lnk != boost::shared_ptr<ReceiverContext>(); lnk = ssn->nextReceiver()) { - for (pn_delivery_t* d = pn_link_current(lnk->receiver); d; d = pn_link_current(lnk->receiver)) { - pn_link_advance(lnk->receiver); - pn_delivery_update(d, PN_RELEASED); - pn_delivery_settle(d); - } - } + for (SessionContext::ReceiverMap::iterator i = ssn->receivers.begin(); i != ssn->receivers.end(); ++i) { + drain_and_release_messages(ssn, i->second); + } //wait for outstanding sends to settle while (!ssn->settled()) { QPID_LOG(debug, "Waiting for sends to settle before closing"); @@ -338,9 +334,8 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha ssn->removeSender(lnk->getName()); } -void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +void ConnectionContext::drain_and_release_messages(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); pn_link_drain(lnk->receiver, 0); wakeupDriver(); //Not all implementations handle drain correctly, so limit the @@ -356,6 +351,12 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha pn_delivery_update(d, PN_RELEASED); pn_delivery_settle(d); } +} + +void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + drain_and_release_messages(ssn, lnk); if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) { lnk->close(); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 59270f445d..651cb736fd 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -78,6 +78,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); + void drain_and_release_messages(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); bool isClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync); bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); |
