From 4d0136f757776a1860c0d08e9674cfd18529b586 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Mon, 13 Oct 2008 20:57:14 +0000 Subject: Reverted a small part of r703237 as it causes deadlocks under load. Session controls can _not_ be subject to bounds checking on the queue of outgoing frames as is done for commands. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@704245 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/client/SessionImpl.cpp | 18 ++++++++++++++++-- qpid/cpp/src/qpid/client/SessionImpl.h | 9 +++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 49dd97e324..2d0f83b894 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -59,7 +59,8 @@ SessionImpl::SessionImpl(const std::string& name, shared_ptr con connectionShared(conn), connectionWeak(conn), weakPtr(false), - proxy(out), + ioHandler(*this), + proxy(ioHandler), nextIn(0), nextOut(0) { @@ -423,10 +424,23 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread } void SessionImpl::handleOut(AMQFrame& frame) // user thread +{ + sendFrame(frame, true); +} + +void SessionImpl::proxyOut(AMQFrame& frame) // network thread +{ + //Note: this case is treated slightly differently that command + //frames sent by application; session controls should not be + //blocked by bounds checking on the outgoing frame queue. + sendFrame(frame, false); +} + +void SessionImpl::sendFrame(AMQFrame& frame, bool canBlock) { boost::shared_ptr c = connectionWeak.lock(); if (c) { - c->expand(frame.encodedSize(), true); + c->expand(frame.encodedSize(), canBlock); channel.handle(frame); } } diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h index 54ace77254..d56566ec14 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/SessionImpl.h @@ -137,6 +137,14 @@ private: void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); + /** + * Sends session controls. This case is treated slightly + * differently than command frames sent by the application via + * handleOut(); session controlsare not subject to bounds checking + * on the outgoing frame queue. + */ + void proxyOut(framing::AMQFrame& frame); + void sendFrame(framing::AMQFrame& frame, bool canBlock); void deliver(framing::AMQFrame& frame); Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); @@ -185,6 +193,7 @@ private: boost::weak_ptr connectionWeak; bool weakPtr; + framing::FrameHandler::MemFunRef ioHandler; framing::ChannelHandler channel; framing::AMQP_ServerProxy::Session proxy; -- cgit v1.2.1