summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-05-30 18:41:21 +0000
committerGordon Sim <gsim@apache.org>2013-05-30 18:41:21 +0000
commit99c51ceecc1a6b0cb46abbb509d2cecc41460b95 (patch)
tree81e6f9cffe3dc535595671cbe4ebd48c652082af /cpp
parent21c68d7e86d6b10ec1ccbf8f91cfca778226bacc (diff)
downloadqpid-python-99c51ceecc1a6b0cb46abbb509d2cecc41460b95.tar.gz
QPID-4893: prevent uninitialised buffered messages being transfered
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1487960 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/amqp/Relay.cpp19
-rw-r--r--cpp/src/qpid/broker/amqp/Relay.h5
2 files changed, 16 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/amqp/Relay.cpp b/cpp/src/qpid/broker/amqp/Relay.cpp
index 70d12455cd..a08971cb5c 100644
--- a/cpp/src/qpid/broker/amqp/Relay.cpp
+++ b/cpp/src/qpid/broker/amqp/Relay.cpp
@@ -28,7 +28,7 @@ namespace qpid {
namespace broker {
namespace amqp {
-Relay::Relay(size_t max_) : credit(0), max(max_), current(0), isDetached(false), out(0), in(0) {}
+Relay::Relay(size_t max_) : credit(0), max(max_), head(0), tail(0), isDetached(false), out(0), in(0) {}
void Relay::check()
{
if (isDetached) throw qpid::Exception("other end of relay has been detached");
@@ -38,8 +38,8 @@ bool Relay::send(pn_link_t* link)
BufferedTransfer* c(0);
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
- if (current < buffer.size()) {
- c = &buffer[current++];
+ if (head < tail) {
+ c = &buffer[head++];
} else {
return false;
}
@@ -59,6 +59,10 @@ void Relay::received(pn_link_t* link, pn_delivery_t* delivery)
{
BufferedTransfer& received = push();
received.initIn(link, delivery);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ ++tail;
+ }
if (out) out->wakeup();
}
size_t Relay::size() const
@@ -66,7 +70,7 @@ size_t Relay::size() const
qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
return buffer.size();
}
-BufferedTransfer& Relay::head()
+BufferedTransfer& Relay::front()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
return buffer.front();
@@ -75,7 +79,8 @@ void Relay::pop()
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
buffer.pop_front();
- if (current) --current;
+ if (head) --head;
+ if (tail) --tail;
}
void Relay::setCredit(int c)
{
@@ -100,12 +105,14 @@ void Relay::detached(Outgoing*)
{
out = 0;
isDetached = true;
+ std::cerr << "Outgoing link detached from relay" << std::endl;
if (in) in->wakeup();
}
void Relay::detached(Incoming*)
{
in = 0;
isDetached = true;
+ std::cerr << "Incoming link detached from relay" << std::endl;
if (out) out->wakeup();
}
@@ -182,7 +189,7 @@ IncomingToRelay::IncomingToRelay(pn_link_t* link, Broker& broker, Session& paren
bool IncomingToRelay::settle()
{
bool result(false);
- while (relay->size() && relay->head().settle()) {
+ while (relay->size() && relay->front().settle()) {
result = true;
relay->pop();
}
diff --git a/cpp/src/qpid/broker/amqp/Relay.h b/cpp/src/qpid/broker/amqp/Relay.h
index 19e0c2e3fe..0c2d48b346 100644
--- a/cpp/src/qpid/broker/amqp/Relay.h
+++ b/cpp/src/qpid/broker/amqp/Relay.h
@@ -68,7 +68,7 @@ class Relay
Relay(size_t max);
void check();
size_t size() const;
- BufferedTransfer& head();
+ BufferedTransfer& front();
void pop();
bool send(pn_link_t*);
void received(pn_link_t* link, pn_delivery_t* delivery);
@@ -82,7 +82,8 @@ class Relay
std::deque<BufferedTransfer> buffer;//TODO: optimise by replacing with simple circular array
int credit;//issued by outgoing peer, decremented everytime we send a message on outgoing link
size_t max;
- size_t current;
+ size_t head;
+ size_t tail;
bool isDetached;
Outgoing* out;
Incoming* in;