diff options
| author | Gordon Sim <gsim@apache.org> | 2010-05-27 18:09:13 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2010-05-27 18:09:13 +0000 |
| commit | c95b2615abf0883f7d92aad73138a4dda14e1311 (patch) | |
| tree | 7eb2195eab5c7ecafab17e553635a434b20dee64 /cpp/src/qpid/client | |
| parent | 91491e533896be58438ba2dc0e199461b4320653 (diff) | |
| download | qpid-python-c95b2615abf0883f7d92aad73138a4dda14e1311.tar.gz | |
QPID-2631: For blocking Bounds::expand() calls, only increase the current count when there is space. In SessionImpl::send() expand bounds before queueing frame. Expand bounds for all frames sent (including connection frames and cluster specific frames).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@948936 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/Bounds.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionHandler.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 2 |
5 files changed, 21 insertions, 10 deletions
diff --git a/cpp/src/qpid/client/Bounds.cpp b/cpp/src/qpid/client/Bounds.cpp index abb983a62e..cc2577d5fc 100644 --- a/cpp/src/qpid/client/Bounds.cpp +++ b/cpp/src/qpid/client/Bounds.cpp @@ -33,19 +33,19 @@ Bounds::Bounds(size_t maxSize) : max(maxSize), current(0) {} bool Bounds::expand(size_t sizeRequired, bool block) { if (!max) return true; Waitable::ScopedLock l(lock); - current += sizeRequired; if (block) { Waitable::ScopedWait w(lock); - while (current > max) + while (current + sizeRequired > max) lock.wait(); } + current += sizeRequired; return current <= max; } void Bounds::reduce(size_t size) { if (!max || size == 0) return; Waitable::ScopedLock l(lock); - if (current == 0) return; + assert(current >= size); current -= std::min(size, current); if (current < max && lock.hasWaiters()) { lock.notifyAll(); diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 9d68448d9d..6aea4c4acf 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -22,6 +22,7 @@ #include "qpid/client/ConnectionHandler.h" #include "qpid/client/SaslFactory.h" +#include "qpid/client/Bounds.h" #include "qpid/framing/amqp_framing.h" #include "qpid/framing/all_method_bodies.h" #include "qpid/framing/ClientInvoker.h" @@ -70,8 +71,15 @@ CloseCode ConnectionHandler::convert(uint16_t replyCode) } } -ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v) - : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), +ConnectionHandler::Adapter::Adapter(ConnectionHandler& h, Bounds& b) : handler(h), bounds(b) {} +void ConnectionHandler::Adapter::handle(framing::AMQFrame& f) +{ + bounds.expand(f.encodedSize(), false); + handler.out(f); +} + +ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v, Bounds& b) + : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this, b), proxy(outHandler), errorCode(CLOSE_CODE_NORMAL), version(v) { insist = true; diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index 5f4b454f53..61709db174 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -47,6 +47,8 @@ struct SecuritySettings; namespace client { +class Bounds; + class ConnectionHandler : private StateManager, public ConnectionSettings, public ChainableFrameHandler, @@ -60,9 +62,10 @@ class ConnectionHandler : private StateManager, class Adapter : public framing::FrameHandler { ConnectionHandler& handler; + Bounds& bounds; public: - Adapter(ConnectionHandler& h) : handler(h) {} - void handle(framing::AMQFrame& f) { handler.out(f); } + Adapter(ConnectionHandler& h, Bounds& bounds); + void handle(framing::AMQFrame& f); }; Adapter outHandler; @@ -102,7 +105,7 @@ public: typedef boost::function<void(uint16_t, const std::string&)> ErrorListener; typedef boost::function<const qpid::sys::SecuritySettings*()> GetSecuritySettings; - ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&); + ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&, Bounds&); void received(framing::AMQFrame& f) { incoming(f); } diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index d5fe7489d3..99f4411977 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -182,7 +182,7 @@ boost::shared_ptr<ConnectionImpl> ConnectionImpl::create(framing::ProtocolVersio ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), - handler(settings, v), + handler(settings, v, *this), version(v), nextChannel(1), shutdownComplete(false), diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index b7ff4307b6..b507625b11 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -510,8 +510,8 @@ void SessionImpl::proxyOut(AMQFrame& frame) // network thread void SessionImpl::sendFrame(AMQFrame& frame, bool canBlock) { - channel.handle(frame); connection->expand(frame.encodedSize(), canBlock); + channel.handle(frame); } void SessionImpl::deliver(AMQFrame& frame) // network thread |
