summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-08-22 23:07:54 +0000
committerGordon Sim <gsim@apache.org>2014-08-22 23:07:54 +0000
commit0d126dd4e671808e1f0da8af0b67982760d6d14b (patch)
tree65caf557ce892f76a877ba8f45b53f3215b98a0d /cpp
parent770c0cb060147ab1f9b0c6fb8989b1b38a20feee (diff)
downloadqpid-python-0d126dd4e671808e1f0da8af0b67982760d6d14b.tar.gz
QPID-6021: prevent protons internal output buffer growing too large
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1619951 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/messaging/amqp/ConnectionContext.cpp11
-rw-r--r--cpp/src/qpid/messaging/amqp/ConnectionContext.h1
2 files changed, 11 insertions, 1 deletions
diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index e08b387e85..3bee64b9d1 100644
--- a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -89,7 +89,8 @@ ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::
readHeader(false),
haveOutput(false),
state(DISCONNECTED),
- codecAdapter(*this)
+ codecAdapter(*this),
+ notifyOnWrite(false)
{
// Concatenate all known URLs into a single URL, get rid of duplicate addresses.
sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ?
@@ -408,6 +409,13 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
checkClosed(ssn);
SenderContext::Delivery* delivery(0);
+ while (pn_transport_pending(engine) > 65536) {
+ QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written...");
+ notifyOnWrite = true;
+ wakeupDriver();
+ wait(ssn, snd);
+ notifyOnWrite = false;
+ }
while (!snd->send(message, &delivery)) {
QPID_LOG(debug, "Waiting for capacity...");
wait(ssn, snd);//wait for capacity
@@ -758,6 +766,7 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
if (n > 0) {
QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
haveOutput = true;
+ if (notifyOnWrite) lock.notifyAll();
return n;
} else if (n == PN_ERR) {
throw MessagingException(QPID_MSG("Error on output: " << getError()));
diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 651cb736fd..12f3c21e0a 100644
--- a/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -154,6 +154,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
} state;
std::auto_ptr<Sasl> sasl;
CodecAdapter codecAdapter;
+ bool notifyOnWrite;
void check();
bool checkDisconnected();