diff options
| author | Gordon Sim <gsim@apache.org> | 2013-05-30 18:41:21 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-05-30 18:41:21 +0000 |
| commit | 99c51ceecc1a6b0cb46abbb509d2cecc41460b95 (patch) | |
| tree | 81e6f9cffe3dc535595671cbe4ebd48c652082af /cpp | |
| parent | 21c68d7e86d6b10ec1ccbf8f91cfca778226bacc (diff) | |
| download | qpid-python-99c51ceecc1a6b0cb46abbb509d2cecc41460b95.tar.gz | |
QPID-4893: prevent uninitialised buffered messages being transfered
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1487960 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/broker/amqp/Relay.cpp | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/amqp/Relay.h | 5 |
2 files changed, 16 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/amqp/Relay.cpp b/cpp/src/qpid/broker/amqp/Relay.cpp index 70d12455cd..a08971cb5c 100644 --- a/cpp/src/qpid/broker/amqp/Relay.cpp +++ b/cpp/src/qpid/broker/amqp/Relay.cpp @@ -28,7 +28,7 @@ namespace qpid { namespace broker { namespace amqp { -Relay::Relay(size_t max_) : credit(0), max(max_), current(0), isDetached(false), out(0), in(0) {} +Relay::Relay(size_t max_) : credit(0), max(max_), head(0), tail(0), isDetached(false), out(0), in(0) {} void Relay::check() { if (isDetached) throw qpid::Exception("other end of relay has been detached"); @@ -38,8 +38,8 @@ bool Relay::send(pn_link_t* link) BufferedTransfer* c(0); { qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); - if (current < buffer.size()) { - c = &buffer[current++]; + if (head < tail) { + c = &buffer[head++]; } else { return false; } @@ -59,6 +59,10 @@ void Relay::received(pn_link_t* link, pn_delivery_t* delivery) { BufferedTransfer& received = push(); received.initIn(link, delivery); + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + ++tail; + } if (out) out->wakeup(); } size_t Relay::size() const @@ -66,7 +70,7 @@ size_t Relay::size() const qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); return buffer.size(); } -BufferedTransfer& Relay::head() +BufferedTransfer& Relay::front() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); return buffer.front(); @@ -75,7 +79,8 @@ void Relay::pop() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); buffer.pop_front(); - if (current) --current; + if (head) --head; + if (tail) --tail; } void Relay::setCredit(int c) { @@ -100,12 +105,14 @@ void Relay::detached(Outgoing*) { out = 0; isDetached = true; + std::cerr << "Outgoing link detached from relay" << std::endl; if (in) in->wakeup(); } void Relay::detached(Incoming*) { in = 0; isDetached = true; + std::cerr << "Incoming link detached from relay" << std::endl; if (out) out->wakeup(); } @@ -182,7 +189,7 @@ IncomingToRelay::IncomingToRelay(pn_link_t* link, Broker& broker, Session& paren bool IncomingToRelay::settle() { bool result(false); - while (relay->size() && relay->head().settle()) { + while (relay->size() && relay->front().settle()) { result = true; relay->pop(); } diff --git a/cpp/src/qpid/broker/amqp/Relay.h b/cpp/src/qpid/broker/amqp/Relay.h index 19e0c2e3fe..0c2d48b346 100644 --- a/cpp/src/qpid/broker/amqp/Relay.h +++ b/cpp/src/qpid/broker/amqp/Relay.h @@ -68,7 +68,7 @@ class Relay Relay(size_t max); void check(); size_t size() const; - BufferedTransfer& head(); + BufferedTransfer& front(); void pop(); bool send(pn_link_t*); void received(pn_link_t* link, pn_delivery_t* delivery); @@ -82,7 +82,8 @@ class Relay std::deque<BufferedTransfer> buffer;//TODO: optimise by replacing with simple circular array int credit;//issued by outgoing peer, decremented everytime we send a message on outgoing link size_t max; - size_t current; + size_t head; + size_t tail; bool isDetached; Outgoing* out; Incoming* in; |
