diff options
| author | Alan Conway <aconway@apache.org> | 2007-08-29 23:27:40 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-08-29 23:27:40 +0000 |
| commit | e183227707d150b1f42e750df0e90cd7dac8744e (patch) | |
| tree | a9156083c1890852c2d4013d4a856f9f28762946 /cpp/src/qpid/broker | |
| parent | 7422e57391a89bc2493cba18ca2ef0a84fec7baa (diff) | |
| download | qpid-python-e183227707d150b1f42e750df0e90cd7dac8744e.tar.gz | |
* src/qpid/broker/Session.h, .cpp: Session holds all state of a session including
handlers created for that session. Session is not directly associated with a channel.
* src/qpid/broker/SessionAdapter.h, .cpp: SessionAdapter is bound to a channel
managed by the Connection. It can be attached to and detatched from a Session.
* src/qpid/broker/Connection.cpp, .h: Use SessionAdapter.
* src/qpid/framing/Handler.h: Removed use of shared_ptr. Handlers belong
either to a Session or a Connection and are destroyed with it.
* src/qpid/framing/InputHandler.h, OutputHandler.h: Both now inherit from
FrameHandler and can be used as FrameHandlers. Intermediate step to removing
them entirely.
* src/qpid/broker/ConnectionAdapter.h:
* src/qpid/client/ConnectionHandler.h:
* src/qpid/framing/ChannelAdapter.cpp, .h:
Minor changes required by Handler changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 26 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionAdapter.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionAdapter.h | 39 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageDelivery.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Session.cpp | 40 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Session.h | 60 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 69 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 48 |
9 files changed, 186 insertions, 128 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index f082c5cdb6..08d5ba0ab3 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -28,6 +28,8 @@ #include "BrokerAdapter.h" #include "SemanticHandler.h" +#include <boost/utility/in_place_factory.hpp> + using namespace boost; using namespace qpid::sys; using namespace qpid::framing; @@ -50,11 +52,12 @@ void Connection::received(framing::AMQFrame& frame){ if (frame.getChannel() == 0) { adapter.handle(frame); } else { + // FIXME aconway 2007-08-29: review shutdown, not more shared_ptr. + // OLD COMMENT: // Assign handler to new shared_ptr, as it may be erased // from the map by handle() if frame is a ChannelClose. // - FrameHandler::Chain handler=getChannel((frame.getChannel())).in; - handler->handle(frame); + getChannel((frame.getChannel())).in(frame); } } @@ -97,15 +100,16 @@ void Connection::closeChannel(uint16_t id) { FrameHandler::Chains& Connection::getChannel(ChannelId id) { - ChannelMap::iterator i = channels.find(id); - if (i == channels.end()) { - FrameHandler::Chains chains( - new SemanticHandler(id, *this), - new OutputHandlerFrameHandler(*out)); - broker.update(id, chains); - i = channels.insert(ChannelMap::value_type(id, chains)).first; - } - return i->second; + // FIXME aconway 2007-08-29: Assuming session on construction, + // move this to SessionAdapter::open. + boost::optional<SessionAdapter>& ch = channels[id]; + if (!ch) { + ch = boost::in_place(boost::ref(*this), id); // FIXME aconway 2007-08-29: + assert(ch->getSession()); + broker.update(id, *ch->getSession()); + } + assert(ch->getSession()); + return *ch->getSession(); } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index b552267452..08beb0a3ea 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -38,6 +38,9 @@ #include "qpid/Exception.h" #include "BrokerChannel.h" #include "ConnectionAdapter.h" +#include "SessionAdapter.h" + +#include <boost/optional.hpp> namespace qpid { namespace broker { @@ -82,8 +85,8 @@ class Connection : public sys::ConnectionInputHandler, void closed(); private: - typedef std::map<framing::ChannelId, framing::FrameHandler::Chains> ChannelMap; - + // Use boost::optional to allow default-constructed uninitialized entries in the map. + typedef std::map<framing::ChannelId, boost::optional<SessionAdapter> >ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; framing::ProtocolVersion version; diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp index 175f57df7d..7672daed10 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.cpp +++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp @@ -66,7 +66,7 @@ framing::ProtocolVersion ConnectionAdapter::getVersion() const void ConnectionAdapter::handle(framing::AMQFrame& frame) { - getHandlers().in->handle(frame); + getHandlers().in(frame); } ConnectionAdapter::ConnectionAdapter(Connection& connection) @@ -74,27 +74,27 @@ ConnectionAdapter::ConnectionAdapter(Connection& connection) handler = std::auto_ptr<Handler>(new Handler(connection, *this)); } -Handler::Handler(Connection& c, ConnectionAdapter& a) : +ConnectionAdapter::Handler:: Handler(Connection& c, ConnectionAdapter& a) : proxy(a), client(proxy.getConnection()), connection(c) {} -void Handler::startOk(const FieldTable& /*clientProperties*/, +void ConnectionAdapter::Handler::startOk(const FieldTable& /*clientProperties*/, const string& /*mechanism*/, const string& /*response*/, const string& /*locale*/) { client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat()); } -void Handler::secureOk(const string& /*response*/){} +void ConnectionAdapter::Handler::secureOk(const string& /*response*/){} -void Handler::tuneOk(uint16_t /*channelmax*/, +void ConnectionAdapter::Handler::tuneOk(uint16_t /*channelmax*/, uint32_t framemax, uint16_t heartbeat) { connection.setFrameMax(framemax); connection.setHeartbeat(heartbeat); } -void Handler::open(const string& /*virtualHost*/, +void ConnectionAdapter::Handler::open(const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/) { string knownhosts; @@ -102,13 +102,13 @@ void Handler::open(const string& /*virtualHost*/, } -void Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/, +void ConnectionAdapter::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/, uint16_t /*classId*/, uint16_t /*methodId*/) { client.closeOk(); connection.getOutput().close(); } -void Handler::closeOk(){ +void ConnectionAdapter::Handler::closeOk(){ connection.getOutput().close(); } diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h index 9aa3d130e8..e3102faf59 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.h +++ b/cpp/src/qpid/broker/ConnectionAdapter.h @@ -35,12 +35,29 @@ namespace qpid { namespace broker { class Connection; -struct Handler; class ConnectionAdapter : public framing::ChannelAdapter, public framing::AMQP_ServerOperations { + struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler + { + framing::AMQP_ClientProxy proxy; + framing::AMQP_ClientProxy::Connection client; + Connection& connection; + + Handler(Connection& connection, ConnectionAdapter& adapter); + void startOk(const qpid::framing::FieldTable& clientProperties, + const std::string& mechanism, const std::string& response, + const std::string& locale); + void secureOk(const std::string& response); + void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat); + void open(const std::string& virtualHost, + const std::string& capabilities, bool insist); + void close(uint16_t replyCode, const std::string& replyText, + uint16_t classId, uint16_t methodId); + void closeOk(); + }; std::auto_ptr<Handler> handler; -public: + public: ConnectionAdapter(Connection& connection); void init(const framing::ProtocolInitiation& header); void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId); @@ -74,24 +91,6 @@ public: framing::ProtocolVersion getVersion() const; }; -struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler -{ - framing::AMQP_ClientProxy proxy; - framing::AMQP_ClientProxy::Connection client; - Connection& connection; - - Handler(Connection& connection, ConnectionAdapter& adapter); - void startOk(const qpid::framing::FieldTable& clientProperties, - const std::string& mechanism, const std::string& response, - const std::string& locale); - void secureOk(const std::string& response); - void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat); - void open(const std::string& virtualHost, - const std::string& capabilities, bool insist); - void close(uint16_t replyCode, const std::string& replyText, - uint16_t classId, uint16_t methodId); - void closeOk(); -}; }} diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp index 09ab8ec465..b259aa6b8f 100644 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -131,10 +131,7 @@ void MessageDelivery::deliver(Message::shared_ptr msg, boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token); t->sendMethod(msg, channel, id); - boost::shared_ptr<FrameHandler> handler = channel.getHandlers().out; - //send header - msg->sendHeader(*handler, channel.getId(), framesize); - - //send content - msg->sendContent(*handler, channel.getId(), framesize); + FrameHandler& handler = channel.getHandlers().out; + msg->sendHeader(handler, channel.getId(), framesize); + msg->sendContent(handler, channel.getId(), framesize); } diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp new file mode 100644 index 0000000000..2940c8cccb --- /dev/null +++ b/cpp/src/qpid/broker/Session.cpp @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Session.h" +#include "SemanticHandler.h" +#include "SessionAdapter.h" + +namespace qpid { +namespace broker { + +Session::Session(SessionAdapter& a, uint32_t t) + : adapter(&a), timeout(t) +{ + assert(adapter); + // FIXME aconway 2007-08-29: handler to get Session, not connection. + handlers.push_back(new SemanticHandler(adapter->getChannel(), adapter->getConnection())); + in = &handlers[0]; + out = &adapter->getConnection().getOutput(); +} + + + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h new file mode 100644 index 0000000000..927d197390 --- /dev/null +++ b/cpp/src/qpid/broker/Session.h @@ -0,0 +1,60 @@ +#ifndef QPID_BROKER_SESSION_H +#define QPID_BROKER_SESSION_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FrameHandler.h" + +#include <boost/ptr_container/ptr_vector.hpp> + +namespace qpid { +namespace broker { + +class SessionAdapter; + +/** + * Session holds the state of an open session, whether attached to a + * channel or suspended. It also holds the handler chains associated + * with the session. + */ +class Session : public framing::FrameHandler::Chains, + private boost::noncopyable +{ + public: + Session(SessionAdapter&, uint32_t timeout); + + /** Returns 0 if this session is not currently attached */ + SessionAdapter* getAdapter() { return adapter; } + const SessionAdapter* getAdapter() const { return adapter; } + + uint32_t getTimeout() const { return timeout; } + + private: + SessionAdapter* adapter; + uint32_t timeout; + boost::ptr_vector<framing::FrameHandler> handlers; +}; + +}} // namespace qpid::broker + + + +#endif /*!QPID_BROKER_SESSION_H*/ diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 2c471ff098..44245f9689 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -24,69 +24,22 @@ namespace qpid { namespace broker { using namespace framing; -SessionAdapter::~SessionAdapter() { +SessionAdapter::SessionAdapter(Connection& c, ChannelId ch) + : connection(c), channel(ch) +{ + // FIXME aconway 2007-08-29: When we handle session commands, + // do this on open. + session.reset(new Session(*this, 0)); } -SessionAdapter::SessionAdapter() { - // FIXME aconway 2007-08-27: Implement -} - -void SessionAdapter::visit(const SessionOpenBody&) { - // FIXME aconway 2007-08-27: Implement -} - -void SessionAdapter::visit(const SessionAckBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionAttachedBody&) { - // FIXME aconway 2007-08-27: Implement -} +SessionAdapter::~SessionAdapter() {} -void SessionAdapter::visit(const SessionCloseBody&) { - // FIXME aconway 2007-08-27: Implement +void SessionAdapter::handle(AMQFrame& f) { + // FIXME aconway 2007-08-29: handle session commands here, forward + // other frames. + session->in(f); } -void SessionAdapter::visit(const SessionClosedBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionDetachedBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionFlowBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionFlowOkBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionHighWaterMarkBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionResumeBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionSolicitAckBody&) { - // FIXME aconway 2007-08-27: Implement -} - - -void SessionAdapter::visit(const SessionSuspendBody&) { - // FIXME aconway 2007-08-27: Implement -} - }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index a190a7f2b7..237e2c8b64 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -22,43 +22,45 @@ * */ -#include "qpid/framing/FrameDefaultVisitor.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/broker/SuspendedSessions.h" +#include "qpid/broker/Session.h" +#include "qpid/framing/amqp_types.h" namespace qpid { namespace broker { +class Connection; +class Session; + /** - * Session Handler: Handles frames arriving for a session. - * Implements AMQP session class commands, forwards other traffic - * to the next handler in the chain. + * A SessionAdapter is associated with each active channel. It + * receives incoming frames, handles session commands and manages the + * association between the channel and a session. + * + * SessionAdapters can be stored in a map by value. */ -class SessionAdapter : public framing::FrameVisitorHandler +class SessionAdapter : public framing::FrameHandler { public: - SessionAdapter(); + SessionAdapter(Connection&, framing::ChannelId); ~SessionAdapter(); - protected: - void visit(const framing::SessionAckBody&); - void visit(const framing::SessionAttachedBody&); - void visit(const framing::SessionCloseBody&); - void visit(const framing::SessionClosedBody&); - void visit(const framing::SessionDetachedBody&); - void visit(const framing::SessionFlowBody&); - void visit(const framing::SessionFlowOkBody&); - void visit(const framing::SessionHighWaterMarkBody&); - void visit(const framing::SessionOpenBody&); - void visit(const framing::SessionResumeBody&); - void visit(const framing::SessionSolicitAckBody&); - void visit(const framing::SessionSuspendBody&); + /** Handle AMQP session methods, pass other frames to the session + * if there is one. Frames channel must be == getChannel() + */ + void handle(framing::AMQFrame&); + + /** Returns 0 if not attached to a session */ + Session* getSession() const { return session.get(); } - using FrameDefaultVisitor::visit; + framing::ChannelId getChannel() const { return channel; } + Connection& getConnection() { return connection; } + const Connection& getConnection() const { return connection; } private: - SessionState state; - SuspendedSessions* suspended; + Connection& connection; + const framing::ChannelId channel; + shared_ptr<Session> session; }; }} // namespace qpid::broker |
