summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-08-22 23:07:54 +0000
committerGordon Sim <gsim@apache.org>2014-08-22 23:07:54 +0000
commit1044edb5417c80edae3f211ec38dae4a58c4a647 (patch)
treef5cce89a0a70bc413427f5a12565bce7221fd0cb /qpid/cpp
parent50848155750de045ff7099f7ea3561f41a1887b8 (diff)
downloadqpid-python-1044edb5417c80edae3f211ec38dae4a58c4a647.tar.gz
QPID-6021: prevent protons internal output buffer growing too large
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619951 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp11
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h1
2 files changed, 11 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index e08b387e85..3bee64b9d1 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -89,7 +89,8 @@ ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::
readHeader(false),
haveOutput(false),
state(DISCONNECTED),
- codecAdapter(*this)
+ codecAdapter(*this),
+ notifyOnWrite(false)
{
// Concatenate all known URLs into a single URL, get rid of duplicate addresses.
sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ?
@@ -408,6 +409,13 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
checkClosed(ssn);
SenderContext::Delivery* delivery(0);
+ while (pn_transport_pending(engine) > 65536) {
+ QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written...");
+ notifyOnWrite = true;
+ wakeupDriver();
+ wait(ssn, snd);
+ notifyOnWrite = false;
+ }
while (!snd->send(message, &delivery)) {
QPID_LOG(debug, "Waiting for capacity...");
wait(ssn, snd);//wait for capacity
@@ -758,6 +766,7 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
if (n > 0) {
QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
haveOutput = true;
+ if (notifyOnWrite) lock.notifyAll();
return n;
} else if (n == PN_ERR) {
throw MessagingException(QPID_MSG("Error on output: " << getError()));
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 651cb736fd..12f3c21e0a 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -154,6 +154,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
} state;
std::auto_ptr<Sasl> sasl;
CodecAdapter codecAdapter;
+ bool notifyOnWrite;
void check();
bool checkDisconnected();