From 8b82aef0397d65de0c7278476e4f409fcc636306 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 28 Sep 2007 16:21:34 +0000 Subject: * src/tests/ClientSessionTest.cpp: Suspend/resume tests. * broker/SessionManager.cpp, broker/SessionHandler.cpp: Implement suspend/resume * client/ScopedAssociation.h, SessionCore.h, SessionHandler.h: Simplified relationships. - Removed ScopedAssociation. - SessionHandler: is now a member of SessionCore. - SessionCore: shared_ptr ownership by Session(s) and ConnectionImpl. - Using framing::FrameHandler interfaces. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@580403 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/SessionHandler.cpp | 121 +++++++++++++++++---------------- 1 file changed, 64 insertions(+), 57 deletions(-) (limited to 'cpp/src/qpid/client/SessionHandler.cpp') diff --git a/cpp/src/qpid/client/SessionHandler.cpp b/cpp/src/qpid/client/SessionHandler.cpp index 93e628ab34..d3b04e5356 100644 --- a/cpp/src/qpid/client/SessionHandler.cpp +++ b/cpp/src/qpid/client/SessionHandler.cpp @@ -22,31 +22,44 @@ #include "SessionHandler.h" #include "qpid/framing/amqp_framing.h" #include "qpid/framing/all_method_bodies.h" +#include "qpid/client/SessionCore.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" using namespace qpid::client; using namespace qpid::framing; using namespace boost; -SessionHandler::SessionHandler() : StateManager(CLOSED), id(0) {} +namespace { +// TODO aconway 2007-09-28: hack till we have multi-version support. +ProtocolVersion version; +} + +SessionHandler::SessionHandler(SessionCore& parent) + : StateManager(CLOSED), core(parent) {} + +SessionHandler::~SessionHandler() {} -void SessionHandler::incoming(AMQFrame& frame) +void SessionHandler::handle(AMQFrame& frame) { AMQBody* body = frame.getBody(); if (getState() == OPEN) { - SessionClosedBody* closeBody= + core.checkClosed(); + SessionClosedBody* closedBody= dynamic_cast(body->getMethod()); - if (closeBody) { - setState(CLOSED_BY_PEER); - code = closeBody->getReplyCode(); - text = closeBody->getReplyText(); - if (onClose) { - onClose(closeBody->getReplyCode(), closeBody->getReplyText()); - } + if (closedBody) { + closed(); + core.closed(closedBody->getReplyCode(), closedBody->getReplyText()); } else { try { - in(frame); - }catch(ChannelException& e){ - closed(e.code, e.toString()); + next->handle(frame); + } + catch(const ChannelException& e){ + QPID_LOG(error, "Channel exception:" << e.what()); + closed(); + AMQFrame f(0, SessionClosedBody(version, e.code, e.toString())); + core.out(f); + core.closed(closedBody->getReplyCode(), closedBody->getReplyText()); } } } else { @@ -57,69 +70,63 @@ void SessionHandler::incoming(AMQFrame& frame) } } -void SessionHandler::outgoing(AMQFrame& frame) -{ - if (getState() == OPEN) { - frame.setChannel(id); - out(frame); - } else if (getState() == CLOSED) { - throw Exception(QPID_MSG("Channel not open, can't send " << frame)); - } else if (getState() == CLOSED_BY_PEER) { - throw ChannelException(code, text); - } -} - -void SessionHandler::open(uint16_t _id) +void SessionHandler::attach(const AMQMethodBody& command) { - id = _id; - setState(OPENING); - // FIXME aconway 2007-09-19: Need to get this from API. - AMQFrame f(id, SessionOpenBody(version, 0)); - out(f); - + AMQFrame f(0, command); + core.out(f); std::set states; states.insert(OPEN); - states.insert(CLOSED_BY_PEER); + states.insert(CLOSED); waitFor(states); - if (getState() != OPEN) { - throw Exception("Failed to open channel."); - } + if (getState() != OPEN) + throw Exception(QPID_MSG("Failed to attach session to channel "<isA()) { - setState(OPEN); - } else { - throw ConnectionException(504, "Channel not opened."); - } - break; + case OPENING: { + SessionAttachedBody* attached = dynamic_cast(method); + if (attached) { + core.setId(attached->getSessionId()); + setState(OPEN); + } else + throw ChannelErrorException(); + break; + } case CLOSING: - if (method->isA()) { - setState(CLOSED); - } //else just ignore it + if (method->isA() || + method->isA()) + closed(); break; + case CLOSED: - throw ConnectionException(504, "Channel is closed."); + throw ChannelErrorException(); + default: - throw Exception("Unexpected state encountered in SessionHandler!"); + assert(0); + throw InternalErrorException(QPID_MSG("Internal Error.")); } } + -- cgit v1.2.1