summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorPavel Moravec <pmoravec@apache.org>2014-07-03 12:56:51 +0000
committerPavel Moravec <pmoravec@apache.org>2014-07-03 12:56:51 +0000
commit149d2d80191504cada7cb46454d5a364c1131a82 (patch)
tree63a3101347e47e92f2440cb6a06d0e85811d6aa8 /qpid/cpp/src
parent0b84fbe843ec6b15bbf9bcf2e159a48328f4c2f4 (diff)
downloadqpid-python-149d2d80191504cada7cb46454d5a364c1131a82.tar.gz
[QPID-5866]: [C++ client] AMQP 1.0 closing session without closing receiver first marks further messages as redelivered (previous commit not complete)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1607628 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp19
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h1
2 files changed, 11 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index d29b2eae6f..9ed3713920 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -140,13 +140,9 @@ void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) {
//explicitly release messages that have yet to be fetched
- for (boost::shared_ptr<ReceiverContext> lnk = ssn->nextReceiver(); lnk != boost::shared_ptr<ReceiverContext>(); lnk = ssn->nextReceiver()) {
- for (pn_delivery_t* d = pn_link_current(lnk->receiver); d; d = pn_link_current(lnk->receiver)) {
- pn_link_advance(lnk->receiver);
- pn_delivery_update(d, PN_RELEASED);
- pn_delivery_settle(d);
- }
- }
+ for (SessionContext::ReceiverMap::iterator i = ssn->receivers.begin(); i != ssn->receivers.end(); ++i) {
+ drain_and_release_messages(ssn, i->second);
+ }
//wait for outstanding sends to settle
while (!ssn->settled()) {
QPID_LOG(debug, "Waiting for sends to settle before closing");
@@ -338,9 +334,8 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha
ssn->removeSender(lnk->getName());
}
-void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+void ConnectionContext::drain_and_release_messages(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
pn_link_drain(lnk->receiver, 0);
wakeupDriver();
//Not all implementations handle drain correctly, so limit the
@@ -356,6 +351,12 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha
pn_delivery_update(d, PN_RELEASED);
pn_delivery_settle(d);
}
+}
+
+void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ drain_and_release_messages(ssn, lnk);
if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) {
lnk->close();
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 59270f445d..651cb736fd 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -78,6 +78,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void attach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
+ void drain_and_release_messages(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
bool isClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>);
void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync);
bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout);