summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-12-01 17:41:09 +0000
committerAlan Conway <aconway@apache.org>2014-12-01 17:41:09 +0000
commit78d7f0727227f13da826180b2fe98f799160a93a (patch)
treeb0f91c3e7cc560e3f4842484b05a2957600dc449 /qpid/cpp/src
parentc32720a3ac1a909c92a4417184753ce98ddb3a59 (diff)
downloadqpid-python-78d7f0727227f13da826180b2fe98f799160a93a.tar.gz
QPID-6252: AMQP 1.0 browsing client generates large number of errors on broker.
The problem was that messages for browsing receivers were being recorded on the client SessionContext unacked list. This is incorrect since you don't ack browsed messages. They remained on the list after the browsing receiver was closed, and every subsequent call to acknowledge() on the client would attempt to ack these messages for a no-longer-existing link. Fix is to not record browsed messages. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1642720 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.h2
6 files changed, 10 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
index 3ee58cad8d..66aee1ae22 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
@@ -44,6 +44,7 @@ class AddressHelper
const qpid::types::Variant::Map& getNodeProperties() const;
bool getLinkSource(std::string& out) const;
bool getLinkTarget(std::string& out) const;
+ bool getBrowse() const { return browse; }
const qpid::types::Variant::Map& getLinkProperties() const;
static std::string getLinkName(const Address& address);
private:
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index fedab4286f..9e3f95742b 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -292,7 +292,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: ");
encoded->init(impl);
impl.setEncoded(encoded);
- impl.setInternalId(ssn->record(current));
+ impl.setInternalId(ssn->record(current, lnk->getBrowse()));
pn_link_advance(lnk->receiver);
if (lnk->capacity) {
pn_link_flow(lnk->receiver, 1);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index 08cc130a9e..454106149d 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -36,7 +36,9 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co
address(a),
helper(address),
receiver(pn_receiver(session, name.c_str())),
- capacity(0), used(0) {}
+ capacity(0), used(0)
+{}
+
ReceiverContext::~ReceiverContext()
{
//pn_link_free(receiver);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
index 2b4e8e1986..8ded487bf3 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
@@ -60,6 +60,8 @@ class ReceiverContext
void verify();
Address getAddress() const;
bool hasCurrent();
+ bool getBrowse() const { return helper.getBrowse(); }
+
private:
friend class ConnectionContext;
const std::string name;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index 4e5d71f788..f2b7b24b4c 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -110,10 +110,10 @@ uint32_t SessionContext::getUnsettledAcks()
return 0;//TODO
}
-qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery)
+qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery, bool browse)
{
qpid::framing::SequenceNumber id = next++;
- unacked[id] = delivery;
+ if (!browse) unacked[id] = delivery;
QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery);
return id;
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
index 8c2bb040a6..b347c327c5 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
@@ -75,7 +75,7 @@ class SessionContext
qpid::framing::SequenceNumber next;
std::string name;
- qpid::framing::SequenceNumber record(pn_delivery_t*);
+ qpid::framing::SequenceNumber record(pn_delivery_t*, bool browse);
void acknowledge();
void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);