From df00f9ea35f786b9a8c7186e40c24a3c64c84cff Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 28 Nov 2013 17:59:44 +0000 Subject: QPID-5378: track outstanding fetches and for receivers with zero capaicty, reissue credit correctly on reconnect git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1546415 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 11 ++++++++++- qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h | 2 ++ 2 files changed, 12 insertions(+), 1 deletion(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 9d8cf960ca..e1d72a3af7 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -140,6 +140,11 @@ void ConnectionContext::close() bool ConnectionContext::fetch(boost::shared_ptr ssn, boost::shared_ptr lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) { + /** + * For fetch() on a receiver with zero capacity, need to reissue the + * credit on reconnect, so track the fetches in progress. + */ + qpid::sys::AtomicCount::ScopedIncrement track(lnk->fetching); { qpid::sys::ScopedLock l(lock); checkClosed(ssn, lnk); @@ -535,7 +540,11 @@ void ConnectionContext::restartSession(boost::shared_ptr s) } for (SessionContext::ReceiverMap::iterator i = s->receivers.begin(); i != s->receivers.end(); ++i) { QPID_LOG(debug, id << " reattaching receiver " << i->first); - attach(s, i->second->receiver, i->second->capacity); + if (i->second->capacity) { + attach(s, i->second->receiver, i->second->capacity); + } else { + attach(s, i->second->receiver, (uint32_t) i->second->fetching); + } i->second->verify(); QPID_LOG(debug, id << " receiver " << i->first << " reattached"); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h index c68ea10ba3..59c0533c9a 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -24,6 +24,7 @@ #include "qpid/messaging/Address.h" #include "qpid/messaging/amqp/AddressHelper.h" #include +#include "qpid/sys/AtomicCount.h" #include "qpid/sys/IntegerTypes.h" struct pn_link_t; @@ -65,6 +66,7 @@ class ReceiverContext AddressHelper helper; pn_link_t* receiver; uint32_t capacity; + qpid::sys::AtomicCount fetching; void configure(pn_terminus_t*); }; }}} // namespace qpid::messaging::amqp -- cgit v1.2.1