diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionHandler.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 209 |
1 files changed, 31 insertions, 178 deletions
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 2ec0988fc0..2f09c6b5ac 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -21,12 +21,6 @@ #include "SessionHandler.h" #include "SessionState.h" #include "Connection.h" -#include "ConnectionState.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/constants.h" -#include "qpid/framing/ClientInvoker.h" -#include "qpid/framing/ServerInvoker.h" -#include "qpid/framing/all_method_bodies.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> @@ -38,11 +32,9 @@ using namespace std; using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) - : InOutHandler(0, &out), - connection(c), channel(ch, &c.getOutput()), - proxy(out), // Via my own handleOut() for L2 data. - peerSession(channel), // Direct to channel for L2 commands. - ignoring(false) + : amqp_0_10::SessionHandler(&c.getOutput(), ch), + connection(c), + proxy(out) {} SessionHandler::~SessionHandler() {} @@ -52,191 +44,52 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } } // namespace -void SessionHandler::handleIn(AMQFrame& f) { - // Note on channel states: a channel is attached if session != 0 - AMQMethodBody* m = f.getBody()->getMethod(); - try { - if (ignoring && !(m && m->isA<SessionDetachedBody>())) { - return; - } - if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { - //frame was a valid session control and has been handled - return; - } else if (session.get()) { - //we are attached and frame was not a session control so it is for upper layers - session->handle(f); - } else if (m && m->isA<SessionDetachedBody>()) { - handleDetach(); - connection.closeChannel(channel.get()); - } else { - throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached")); - } - }catch(const ChannelException& e){ - QPID_LOG(error, "Session detached due to: " << e.what()); - peerSession.detached(name, e.code); +void SessionHandler::channelException(uint16_t, const std::string&) { handleDetach(); - connection.closeChannel(channel.get()); - }catch(const ConnectionException& e){ - connection.close(e.code, e.what(), classId(m), methodId(m)); - }catch(const std::exception& e){ - connection.close(501, e.what(), classId(m), methodId(m)); - } } -bool SessionHandler::isValid(AMQMethodBody* m) { - return session.get() || m->isA<SessionAttachBody>() || m->isA<SessionAttachedBody>(); -} - -void SessionHandler::handleOut(AMQFrame& f) { - channel.handle(f); // Send it. - if (session->sent(f)) - peerSession.flush(false, false, true); -} - -void SessionHandler::assertAttached(const char* method) const { - if (!session.get()) { - std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl; - throw NotAttachedException( - QPID_MSG(method << " failed: No session for channel " - << getChannel())); - } -} - -void SessionHandler::assertClosed(const char* method) const { - if (session.get()) - throw SessionBusyException( - QPID_MSG(method << " failed: channel " << channel.get() - << " is already open.")); +void SessionHandler::connectionException(uint16_t code, const std::string& msg) { + connection.close(code, msg, 0, 0); } ConnectionState& SessionHandler::getConnection() { return connection; } -const ConnectionState& SessionHandler::getConnection() const { return connection; } - -//new methods: -void SessionHandler::attach(const std::string& _name, bool /*force*/) -{ - name = _name;//TODO: this should be used in conjunction with - //userid for connection as sessions identity - //TODO: need to revise session manager to support resume as well - assertClosed("attach"); - session.reset(new SessionState(0, this, 0, 0, name)); - peerSession.attached(name); - peerSession.commandPoint(session->nextOut, 0); -} - -void SessionHandler::attached(const std::string& _name) -{ - name = _name;//TODO: this should be used in conjunction with - //userid for connection as sessions identity - session.reset(new SessionState(0, this, 0, 0, name)); - peerSession.commandPoint(session->nextOut, 0); -} +const ConnectionState& SessionHandler::getConnection() const { return connection; } -void SessionHandler::detach(const std::string& name) -{ - assertAttached("detach"); - peerSession.detached(name, session::NORMAL); - handleDetach(); +void SessionHandler::handleDetach() { + amqp_0_10::SessionHandler::handleDetach(); assert(&connection.getChannel(channel.get()) == this); + if (session.get()) + connection.getBroker().getSessionManager().detach(session); + assert(!session.get()); connection.closeChannel(channel.get()); } -void SessionHandler::detached(const std::string& name, uint8_t code) -{ - ignoring = false; - handleDetach(); - if (code) { - //no error - } else { - //error occured - QPID_LOG(warning, "Received session.closed: "<< name << " " << code); - } - connection.closeChannel(channel.get()); -} - -void SessionHandler::handleDetach() -{ - if (session.get()) { - session->detach(); - session.reset(); - } -} - -void SessionHandler::requestDetach() -{ - //TODO: request timeout when python can handle it - //peerSession.requestTimeout(0); - ignoring = true; - peerSession.detach(name); -} - -void SessionHandler::requestTimeout(uint32_t t) -{ - session->setTimeout(t); - peerSession.timeout(t); -} - -void SessionHandler::timeout(uint32_t t) -{ - session->setTimeout(t); -} - -void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset) -{ - if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); - - session->nextIn = id; -} - -void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments) -{ - if (!commands.empty() || fragments.size()) { - throw NotImplementedException("Session resumption not yet supported"); - } +void SessionHandler::setState(const std::string& name, bool force) { + assert(!session.get()); + SessionId id(connection.getUserId(), name); + session = connection.broker.getSessionManager().attach(*this, id, force); } -void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/) -{ - //don't really care too much about this yet -} +FrameHandler* SessionHandler::getInHandler() { return session.get(); } +qpid::SessionState* SessionHandler::getState() { return session.get(); } -void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply) -{ - session->complete(commands); - if (timelyReply) { - peerSession.knownCompleted(session->knownCompleted); - session->knownCompleted.clear(); - } +void SessionHandler::readyToSend() { + if (session.get()) session->readyToSend(); } -void SessionHandler::knownCompleted(const framing::SequenceSet& commands) -{ - session->completed.remove(commands); -} - -void SessionHandler::flush(bool expected, bool confirmed, bool completed) -{ - if (expected) { - peerSession.expected(SequenceSet(session->nextIn), Array()); - } - if (confirmed) { - peerSession.confirmed(session->completed, Array()); - } - if (completed) { - peerSession.completed(session->completed, true); - } -} - - -void SessionHandler::sendCompletion() -{ - peerSession.completed(session->completed, true); +// TODO aconway 2008-05-12: hacky - handle attached for bridge clients. +// We need to integrate the client code so we can run a real client +// in the bridge. +// +void SessionHandler::attached(const std::string& name) { + if (session.get()) + checkName(name); + else { + SessionId id(connection.getUserId(), name); + SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); + session.reset(new SessionState(connection.getBroker(), *this, id, config)); } - -void SessionHandler::gap(const framing::SequenceSet& /*commands*/) -{ - throw NotImplementedException("gap not yet supported"); } }} // namespace qpid::broker |