From 1044edb5417c80edae3f211ec38dae4a58c4a647 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 22 Aug 2014 23:07:54 +0000 Subject: QPID-6021: prevent protons internal output buffer growing too large git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619951 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 11 ++++++++++- qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 1 + 2 files changed, 11 insertions(+), 1 deletion(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index e08b387e85..3bee64b9d1 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -89,7 +89,8 @@ ConnectionContext::ConnectionContext(const std::string& url, const qpid::types:: readHeader(false), haveOutput(false), state(DISCONNECTED), - codecAdapter(*this) + codecAdapter(*this), + notifyOnWrite(false) { // Concatenate all known URLs into a single URL, get rid of duplicate addresses. sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ? @@ -408,6 +409,13 @@ void ConnectionContext::send(boost::shared_ptr ssn, boost::share qpid::sys::ScopedLock l(lock); checkClosed(ssn); SenderContext::Delivery* delivery(0); + while (pn_transport_pending(engine) > 65536) { + QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written..."); + notifyOnWrite = true; + wakeupDriver(); + wait(ssn, snd); + notifyOnWrite = false; + } while (!snd->send(message, &delivery)) { QPID_LOG(debug, "Waiting for capacity..."); wait(ssn, snd);//wait for capacity @@ -758,6 +766,7 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) if (n > 0) { QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) haveOutput = true; + if (notifyOnWrite) lock.notifyAll(); return n; } else if (n == PN_ERR) { throw MessagingException(QPID_MSG("Error on output: " << getError())); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 651cb736fd..12f3c21e0a 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -154,6 +154,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag } state; std::auto_ptr sasl; CodecAdapter codecAdapter; + bool notifyOnWrite; void check(); bool checkDisconnected(); -- cgit v1.2.1