From 761e10501fe5ea51f9d8c40d9a200ae27193ab23 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 31 Aug 2007 20:51:22 +0000 Subject: * Summary: - Moved BrokerChannel functionality into Session. - Moved ChannelHandler methods handling into SessionAdapter. - Updated all handlers to use session. (We're still using AMQP channel methods in SessionAdapter) Roles & responsibilities: Session: - represents an _open_ session, may be active or suspended. - ows all session state including handler chains. - attahced to SessionAdapter when active, not when suspended. SessionAdapter: - reprents the association of a channel with a session. - owned by Connection, kept in the session map. - channel open == SessionAdapter.getSessio() != 0 Anything that depends on attachment to a channel, connection or protocol should be in SessionAdpater. Anything that suvives a session suspend belongs in Session. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@571575 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/SessionAdapter.cpp | 115 +++++++++++++++++++++++++++++++-- 1 file changed, 108 insertions(+), 7 deletions(-) (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp') diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 44245f9689..f9d352aa6a 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -19,27 +19,128 @@ */ #include "SessionAdapter.h" +#include "Session.h" +#include "Connection.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/constants.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { using namespace framing; +// FIXME aconway 2007-08-31: the SessionAdapter should create its +// private proxy directly on the connections out handler. +// Session/channel methods should not go thru the other layers. +// Need to get rid of ChannelAdapter and allow proxies to be created +// directly on output handlers. +// +framing::AMQP_ClientProxy& SessionAdapter::getProxy() { + return session->getProxy(); +} + SessionAdapter::SessionAdapter(Connection& c, ChannelId ch) - : connection(c), channel(ch) + : connection(c), channel(ch), ignoring(false) { - // FIXME aconway 2007-08-29: When we handle session commands, - // do this on open. - session.reset(new Session(*this, 0)); + in = this; + out = &c.getOutput(); } SessionAdapter::~SessionAdapter() {} +namespace { +ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } +MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } +} // namespace void SessionAdapter::handle(AMQFrame& f) { - // FIXME aconway 2007-08-29: handle session commands here, forward - // other frames. - session->in(f); + // Note on channel states: a channel is open if session != 0. A + // channel that is closed (session == 0) can be in the "ignoring" + // state. This is a temporary state after we have sent a channel + // exception, where extra frames might arrive that should be + // ignored. + // + AMQMethodBody* m=f.getMethod(); + try { + if (m && m->invoke(static_cast(this))) + return; + else if (session) + session->in(f); + else if (!ignoring) + throw ChannelErrorException( + QPID_MSG("Channel " << channel << " is not open")); + } catch(const ChannelException& e){ + getProxy().getChannel().close( + e.code, e.toString(), classId(m), methodId(m)); + session.reset(); + ignoring=true; // Ignore trailing frames sent by client. + }catch(const ConnectionException& e){ + connection.close(e.code, e.what(), classId(m), methodId(m)); + }catch(const std::exception& e){ + connection.close( + framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m)); + } +} + +void SessionAdapter::assertOpen(const char* method) { + if (!session) + throw ChannelErrorException( + QPID_MSG(""<flow(active); + getProxy().getChannel().flowOk(active); +} + +void SessionAdapter::flowOk(bool /*active*/){} + +void SessionAdapter::close(uint16_t replyCode, + const string& replyText, + uint16_t classId, uint16_t methodId) +{ + // FIXME aconway 2007-08-31: Extend constants.h to map codes & ids + // to text names. + QPID_LOG(warning, "Received session.close("<