From 99c51ceecc1a6b0cb46abbb509d2cecc41460b95 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 30 May 2013 18:41:21 +0000 Subject: QPID-4893: prevent uninitialised buffered messages being transfered git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1487960 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/amqp/Relay.cpp | 19 +++++++++++++------ cpp/src/qpid/broker/amqp/Relay.h | 5 +++-- 2 files changed, 16 insertions(+), 8 deletions(-) (limited to 'cpp') diff --git a/cpp/src/qpid/broker/amqp/Relay.cpp b/cpp/src/qpid/broker/amqp/Relay.cpp index 70d12455cd..a08971cb5c 100644 --- a/cpp/src/qpid/broker/amqp/Relay.cpp +++ b/cpp/src/qpid/broker/amqp/Relay.cpp @@ -28,7 +28,7 @@ namespace qpid { namespace broker { namespace amqp { -Relay::Relay(size_t max_) : credit(0), max(max_), current(0), isDetached(false), out(0), in(0) {} +Relay::Relay(size_t max_) : credit(0), max(max_), head(0), tail(0), isDetached(false), out(0), in(0) {} void Relay::check() { if (isDetached) throw qpid::Exception("other end of relay has been detached"); @@ -38,8 +38,8 @@ bool Relay::send(pn_link_t* link) BufferedTransfer* c(0); { qpid::sys::ScopedLock l(lock); - if (current < buffer.size()) { - c = &buffer[current++]; + if (head < tail) { + c = &buffer[head++]; } else { return false; } @@ -59,6 +59,10 @@ void Relay::received(pn_link_t* link, pn_delivery_t* delivery) { BufferedTransfer& received = push(); received.initIn(link, delivery); + { + qpid::sys::ScopedLock l(lock); + ++tail; + } if (out) out->wakeup(); } size_t Relay::size() const @@ -66,7 +70,7 @@ size_t Relay::size() const qpid::sys::ScopedLock l(lock); return buffer.size(); } -BufferedTransfer& Relay::head() +BufferedTransfer& Relay::front() { qpid::sys::ScopedLock l(lock); return buffer.front(); @@ -75,7 +79,8 @@ void Relay::pop() { qpid::sys::ScopedLock l(lock); buffer.pop_front(); - if (current) --current; + if (head) --head; + if (tail) --tail; } void Relay::setCredit(int c) { @@ -100,12 +105,14 @@ void Relay::detached(Outgoing*) { out = 0; isDetached = true; + std::cerr << "Outgoing link detached from relay" << std::endl; if (in) in->wakeup(); } void Relay::detached(Incoming*) { in = 0; isDetached = true; + std::cerr << "Incoming link detached from relay" << std::endl; if (out) out->wakeup(); } @@ -182,7 +189,7 @@ IncomingToRelay::IncomingToRelay(pn_link_t* link, Broker& broker, Session& paren bool IncomingToRelay::settle() { bool result(false); - while (relay->size() && relay->head().settle()) { + while (relay->size() && relay->front().settle()) { result = true; relay->pop(); } diff --git a/cpp/src/qpid/broker/amqp/Relay.h b/cpp/src/qpid/broker/amqp/Relay.h index 19e0c2e3fe..0c2d48b346 100644 --- a/cpp/src/qpid/broker/amqp/Relay.h +++ b/cpp/src/qpid/broker/amqp/Relay.h @@ -68,7 +68,7 @@ class Relay Relay(size_t max); void check(); size_t size() const; - BufferedTransfer& head(); + BufferedTransfer& front(); void pop(); bool send(pn_link_t*); void received(pn_link_t* link, pn_delivery_t* delivery); @@ -82,7 +82,8 @@ class Relay std::deque buffer;//TODO: optimise by replacing with simple circular array int credit;//issued by outgoing peer, decremented everytime we send a message on outgoing link size_t max; - size_t current; + size_t head; + size_t tail; bool isDetached; Outgoing* out; Incoming* in; -- cgit v1.2.1