diff options
| author | Gordon Sim <gsim@apache.org> | 2014-08-22 23:07:54 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2014-08-22 23:07:54 +0000 |
| commit | 1044edb5417c80edae3f211ec38dae4a58c4a647 (patch) | |
| tree | f5cce89a0a70bc413427f5a12565bce7221fd0cb /qpid/cpp | |
| parent | 50848155750de045ff7099f7ea3561f41a1887b8 (diff) | |
| download | qpid-python-1044edb5417c80edae3f211ec38dae4a58c4a647.tar.gz | |
QPID-6021: prevent protons internal output buffer growing too large
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619951 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 1 |
2 files changed, 11 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index e08b387e85..3bee64b9d1 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -89,7 +89,8 @@ ConnectionContext::ConnectionContext(const std::string& url, const qpid::types:: readHeader(false), haveOutput(false), state(DISCONNECTED), - codecAdapter(*this) + codecAdapter(*this), + notifyOnWrite(false) { // Concatenate all known URLs into a single URL, get rid of duplicate addresses. sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ? @@ -408,6 +409,13 @@ void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::share qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); checkClosed(ssn); SenderContext::Delivery* delivery(0); + while (pn_transport_pending(engine) > 65536) { + QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written..."); + notifyOnWrite = true; + wakeupDriver(); + wait(ssn, snd); + notifyOnWrite = false; + } while (!snd->send(message, &delivery)) { QPID_LOG(debug, "Waiting for capacity..."); wait(ssn, snd);//wait for capacity @@ -758,6 +766,7 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) if (n > 0) { QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) haveOutput = true; + if (notifyOnWrite) lock.notifyAll(); return n; } else if (n == PN_ERR) { throw MessagingException(QPID_MSG("Error on output: " << getError())); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 651cb736fd..12f3c21e0a 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -154,6 +154,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag } state; std::auto_ptr<Sasl> sasl; CodecAdapter codecAdapter; + bool notifyOnWrite; void check(); bool checkDisconnected(); |
