diff options
| author | Alan Conway <aconway@apache.org> | 2014-12-01 17:41:09 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-12-01 17:41:09 +0000 |
| commit | 78d7f0727227f13da826180b2fe98f799160a93a (patch) | |
| tree | b0f91c3e7cc560e3f4842484b05a2957600dc449 /qpid/cpp/src | |
| parent | c32720a3ac1a909c92a4417184753ce98ddb3a59 (diff) | |
| download | qpid-python-78d7f0727227f13da826180b2fe98f799160a93a.tar.gz | |
QPID-6252: AMQP 1.0 browsing client generates large number of errors on broker.
The problem was that messages for browsing receivers were being recorded on the
client SessionContext unacked list. This is incorrect since you don't ack
browsed messages. They remained on the list after the browsing receiver was
closed, and every subsequent call to acknowledge() on the client would attempt
to ack these messages for a no-longer-existing link. Fix is to not record
browsed messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1642720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
6 files changed, 10 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h index 3ee58cad8d..66aee1ae22 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -44,6 +44,7 @@ class AddressHelper const qpid::types::Variant::Map& getNodeProperties() const; bool getLinkSource(std::string& out) const; bool getLinkTarget(std::string& out) const; + bool getBrowse() const { return browse; } const qpid::types::Variant::Map& getLinkProperties() const; static std::string getLinkName(const Address& address); private: diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index fedab4286f..9e3f95742b 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -292,7 +292,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: "); encoded->init(impl); impl.setEncoded(encoded); - impl.setInternalId(ssn->record(current)); + impl.setInternalId(ssn->record(current, lnk->getBrowse())); pn_link_advance(lnk->receiver); if (lnk->capacity) { pn_link_flow(lnk->receiver, 1); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 08cc130a9e..454106149d 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -36,7 +36,9 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co address(a), helper(address), receiver(pn_receiver(session, name.c_str())), - capacity(0), used(0) {} + capacity(0), used(0) +{} + ReceiverContext::~ReceiverContext() { //pn_link_free(receiver); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h index 2b4e8e1986..8ded487bf3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -60,6 +60,8 @@ class ReceiverContext void verify(); Address getAddress() const; bool hasCurrent(); + bool getBrowse() const { return helper.getBrowse(); } + private: friend class ConnectionContext; const std::string name; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 4e5d71f788..f2b7b24b4c 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -110,10 +110,10 @@ uint32_t SessionContext::getUnsettledAcks() return 0;//TODO } -qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) +qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery, bool browse) { qpid::framing::SequenceNumber id = next++; - unacked[id] = delivery; + if (!browse) unacked[id] = delivery; QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery); return id; } diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index 8c2bb040a6..b347c327c5 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -75,7 +75,7 @@ class SessionContext qpid::framing::SequenceNumber next; std::string name; - qpid::framing::SequenceNumber record(pn_delivery_t*); + qpid::framing::SequenceNumber record(pn_delivery_t*, bool browse); void acknowledge(); void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); |
