summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-08-29 23:27:40 +0000
committerAlan Conway <aconway@apache.org>2007-08-29 23:27:40 +0000
commite183227707d150b1f42e750df0e90cd7dac8744e (patch)
treea9156083c1890852c2d4013d4a856f9f28762946 /cpp/src/qpid/broker
parent7422e57391a89bc2493cba18ca2ef0a84fec7baa (diff)
downloadqpid-python-e183227707d150b1f42e750df0e90cd7dac8744e.tar.gz
* src/qpid/broker/Session.h, .cpp: Session holds all state of a session including
handlers created for that session. Session is not directly associated with a channel. * src/qpid/broker/SessionAdapter.h, .cpp: SessionAdapter is bound to a channel managed by the Connection. It can be attached to and detatched from a Session. * src/qpid/broker/Connection.cpp, .h: Use SessionAdapter. * src/qpid/framing/Handler.h: Removed use of shared_ptr. Handlers belong either to a Session or a Connection and are destroyed with it. * src/qpid/framing/InputHandler.h, OutputHandler.h: Both now inherit from FrameHandler and can be used as FrameHandlers. Intermediate step to removing them entirely. * src/qpid/broker/ConnectionAdapter.h: * src/qpid/client/ConnectionHandler.h: * src/qpid/framing/ChannelAdapter.cpp, .h: Minor changes required by Handler changes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp26
-rw-r--r--cpp/src/qpid/broker/Connection.h7
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.cpp16
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.h39
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp9
-rw-r--r--cpp/src/qpid/broker/Session.cpp40
-rw-r--r--cpp/src/qpid/broker/Session.h60
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp69
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h48
9 files changed, 186 insertions, 128 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index f082c5cdb6..08d5ba0ab3 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -28,6 +28,8 @@
#include "BrokerAdapter.h"
#include "SemanticHandler.h"
+#include <boost/utility/in_place_factory.hpp>
+
using namespace boost;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -50,11 +52,12 @@ void Connection::received(framing::AMQFrame& frame){
if (frame.getChannel() == 0) {
adapter.handle(frame);
} else {
+ // FIXME aconway 2007-08-29: review shutdown, not more shared_ptr.
+ // OLD COMMENT:
// Assign handler to new shared_ptr, as it may be erased
// from the map by handle() if frame is a ChannelClose.
//
- FrameHandler::Chain handler=getChannel((frame.getChannel())).in;
- handler->handle(frame);
+ getChannel((frame.getChannel())).in(frame);
}
}
@@ -97,15 +100,16 @@ void Connection::closeChannel(uint16_t id) {
FrameHandler::Chains& Connection::getChannel(ChannelId id) {
- ChannelMap::iterator i = channels.find(id);
- if (i == channels.end()) {
- FrameHandler::Chains chains(
- new SemanticHandler(id, *this),
- new OutputHandlerFrameHandler(*out));
- broker.update(id, chains);
- i = channels.insert(ChannelMap::value_type(id, chains)).first;
- }
- return i->second;
+ // FIXME aconway 2007-08-29: Assuming session on construction,
+ // move this to SessionAdapter::open.
+ boost::optional<SessionAdapter>& ch = channels[id];
+ if (!ch) {
+ ch = boost::in_place(boost::ref(*this), id); // FIXME aconway 2007-08-29:
+ assert(ch->getSession());
+ broker.update(id, *ch->getSession());
+ }
+ assert(ch->getSession());
+ return *ch->getSession();
}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index b552267452..08beb0a3ea 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -38,6 +38,9 @@
#include "qpid/Exception.h"
#include "BrokerChannel.h"
#include "ConnectionAdapter.h"
+#include "SessionAdapter.h"
+
+#include <boost/optional.hpp>
namespace qpid {
namespace broker {
@@ -82,8 +85,8 @@ class Connection : public sys::ConnectionInputHandler,
void closed();
private:
- typedef std::map<framing::ChannelId, framing::FrameHandler::Chains> ChannelMap;
-
+ // Use boost::optional to allow default-constructed uninitialized entries in the map.
+ typedef std::map<framing::ChannelId, boost::optional<SessionAdapter> >ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
framing::ProtocolVersion version;
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp
index 175f57df7d..7672daed10 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.cpp
+++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp
@@ -66,7 +66,7 @@ framing::ProtocolVersion ConnectionAdapter::getVersion() const
void ConnectionAdapter::handle(framing::AMQFrame& frame)
{
- getHandlers().in->handle(frame);
+ getHandlers().in(frame);
}
ConnectionAdapter::ConnectionAdapter(Connection& connection)
@@ -74,27 +74,27 @@ ConnectionAdapter::ConnectionAdapter(Connection& connection)
handler = std::auto_ptr<Handler>(new Handler(connection, *this));
}
-Handler::Handler(Connection& c, ConnectionAdapter& a) :
+ConnectionAdapter::Handler:: Handler(Connection& c, ConnectionAdapter& a) :
proxy(a), client(proxy.getConnection()), connection(c) {}
-void Handler::startOk(const FieldTable& /*clientProperties*/,
+void ConnectionAdapter::Handler::startOk(const FieldTable& /*clientProperties*/,
const string& /*mechanism*/,
const string& /*response*/, const string& /*locale*/)
{
client.tune(framing::CHANNEL_MAX, connection.getFrameMax(), connection.getHeartbeat());
}
-void Handler::secureOk(const string& /*response*/){}
+void ConnectionAdapter::Handler::secureOk(const string& /*response*/){}
-void Handler::tuneOk(uint16_t /*channelmax*/,
+void ConnectionAdapter::Handler::tuneOk(uint16_t /*channelmax*/,
uint32_t framemax, uint16_t heartbeat)
{
connection.setFrameMax(framemax);
connection.setHeartbeat(heartbeat);
}
-void Handler::open(const string& /*virtualHost*/,
+void ConnectionAdapter::Handler::open(const string& /*virtualHost*/,
const string& /*capabilities*/, bool /*insist*/)
{
string knownhosts;
@@ -102,13 +102,13 @@ void Handler::open(const string& /*virtualHost*/,
}
-void Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/,
+void ConnectionAdapter::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/,
uint16_t /*classId*/, uint16_t /*methodId*/)
{
client.closeOk();
connection.getOutput().close();
}
-void Handler::closeOk(){
+void ConnectionAdapter::Handler::closeOk(){
connection.getOutput().close();
}
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h
index 9aa3d130e8..e3102faf59 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.h
+++ b/cpp/src/qpid/broker/ConnectionAdapter.h
@@ -35,12 +35,29 @@ namespace qpid {
namespace broker {
class Connection;
-struct Handler;
class ConnectionAdapter : public framing::ChannelAdapter, public framing::AMQP_ServerOperations
{
+ struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler
+ {
+ framing::AMQP_ClientProxy proxy;
+ framing::AMQP_ClientProxy::Connection client;
+ Connection& connection;
+
+ Handler(Connection& connection, ConnectionAdapter& adapter);
+ void startOk(const qpid::framing::FieldTable& clientProperties,
+ const std::string& mechanism, const std::string& response,
+ const std::string& locale);
+ void secureOk(const std::string& response);
+ void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat);
+ void open(const std::string& virtualHost,
+ const std::string& capabilities, bool insist);
+ void close(uint16_t replyCode, const std::string& replyText,
+ uint16_t classId, uint16_t methodId);
+ void closeOk();
+ };
std::auto_ptr<Handler> handler;
-public:
+ public:
ConnectionAdapter(Connection& connection);
void init(const framing::ProtocolInitiation& header);
void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
@@ -74,24 +91,6 @@ public:
framing::ProtocolVersion getVersion() const;
};
-struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler
-{
- framing::AMQP_ClientProxy proxy;
- framing::AMQP_ClientProxy::Connection client;
- Connection& connection;
-
- Handler(Connection& connection, ConnectionAdapter& adapter);
- void startOk(const qpid::framing::FieldTable& clientProperties,
- const std::string& mechanism, const std::string& response,
- const std::string& locale);
- void secureOk(const std::string& response);
- void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat);
- void open(const std::string& virtualHost,
- const std::string& capabilities, bool insist);
- void close(uint16_t replyCode, const std::string& replyText,
- uint16_t classId, uint16_t methodId);
- void closeOk();
-};
}}
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
index 09ab8ec465..b259aa6b8f 100644
--- a/cpp/src/qpid/broker/MessageDelivery.cpp
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -131,10 +131,7 @@ void MessageDelivery::deliver(Message::shared_ptr msg,
boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token);
t->sendMethod(msg, channel, id);
- boost::shared_ptr<FrameHandler> handler = channel.getHandlers().out;
- //send header
- msg->sendHeader(*handler, channel.getId(), framesize);
-
- //send content
- msg->sendContent(*handler, channel.getId(), framesize);
+ FrameHandler& handler = channel.getHandlers().out;
+ msg->sendHeader(handler, channel.getId(), framesize);
+ msg->sendContent(handler, channel.getId(), framesize);
}
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp
new file mode 100644
index 0000000000..2940c8cccb
--- /dev/null
+++ b/cpp/src/qpid/broker/Session.cpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Session.h"
+#include "SemanticHandler.h"
+#include "SessionAdapter.h"
+
+namespace qpid {
+namespace broker {
+
+Session::Session(SessionAdapter& a, uint32_t t)
+ : adapter(&a), timeout(t)
+{
+ assert(adapter);
+ // FIXME aconway 2007-08-29: handler to get Session, not connection.
+ handlers.push_back(new SemanticHandler(adapter->getChannel(), adapter->getConnection()));
+ in = &handlers[0];
+ out = &adapter->getConnection().getOutput();
+}
+
+
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h
new file mode 100644
index 0000000000..927d197390
--- /dev/null
+++ b/cpp/src/qpid/broker/Session.h
@@ -0,0 +1,60 @@
+#ifndef QPID_BROKER_SESSION_H
+#define QPID_BROKER_SESSION_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FrameHandler.h"
+
+#include <boost/ptr_container/ptr_vector.hpp>
+
+namespace qpid {
+namespace broker {
+
+class SessionAdapter;
+
+/**
+ * Session holds the state of an open session, whether attached to a
+ * channel or suspended. It also holds the handler chains associated
+ * with the session.
+ */
+class Session : public framing::FrameHandler::Chains,
+ private boost::noncopyable
+{
+ public:
+ Session(SessionAdapter&, uint32_t timeout);
+
+ /** Returns 0 if this session is not currently attached */
+ SessionAdapter* getAdapter() { return adapter; }
+ const SessionAdapter* getAdapter() const { return adapter; }
+
+ uint32_t getTimeout() const { return timeout; }
+
+ private:
+ SessionAdapter* adapter;
+ uint32_t timeout;
+ boost::ptr_vector<framing::FrameHandler> handlers;
+};
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!QPID_BROKER_SESSION_H*/
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 2c471ff098..44245f9689 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -24,69 +24,22 @@ namespace qpid {
namespace broker {
using namespace framing;
-SessionAdapter::~SessionAdapter() {
+SessionAdapter::SessionAdapter(Connection& c, ChannelId ch)
+ : connection(c), channel(ch)
+{
+ // FIXME aconway 2007-08-29: When we handle session commands,
+ // do this on open.
+ session.reset(new Session(*this, 0));
}
-SessionAdapter::SessionAdapter() {
- // FIXME aconway 2007-08-27: Implement
-}
-
-void SessionAdapter::visit(const SessionOpenBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-void SessionAdapter::visit(const SessionAckBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionAttachedBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
+SessionAdapter::~SessionAdapter() {}
-void SessionAdapter::visit(const SessionCloseBody&) {
- // FIXME aconway 2007-08-27: Implement
+void SessionAdapter::handle(AMQFrame& f) {
+ // FIXME aconway 2007-08-29: handle session commands here, forward
+ // other frames.
+ session->in(f);
}
-void SessionAdapter::visit(const SessionClosedBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionDetachedBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionFlowBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionFlowOkBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionHighWaterMarkBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionResumeBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionSolicitAckBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
-
-void SessionAdapter::visit(const SessionSuspendBody&) {
- // FIXME aconway 2007-08-27: Implement
-}
-
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index a190a7f2b7..237e2c8b64 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -22,43 +22,45 @@
*
*/
-#include "qpid/framing/FrameDefaultVisitor.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/broker/SuspendedSessions.h"
+#include "qpid/broker/Session.h"
+#include "qpid/framing/amqp_types.h"
namespace qpid {
namespace broker {
+class Connection;
+class Session;
+
/**
- * Session Handler: Handles frames arriving for a session.
- * Implements AMQP session class commands, forwards other traffic
- * to the next handler in the chain.
+ * A SessionAdapter is associated with each active channel. It
+ * receives incoming frames, handles session commands and manages the
+ * association between the channel and a session.
+ *
+ * SessionAdapters can be stored in a map by value.
*/
-class SessionAdapter : public framing::FrameVisitorHandler
+class SessionAdapter : public framing::FrameHandler
{
public:
- SessionAdapter();
+ SessionAdapter(Connection&, framing::ChannelId);
~SessionAdapter();
- protected:
- void visit(const framing::SessionAckBody&);
- void visit(const framing::SessionAttachedBody&);
- void visit(const framing::SessionCloseBody&);
- void visit(const framing::SessionClosedBody&);
- void visit(const framing::SessionDetachedBody&);
- void visit(const framing::SessionFlowBody&);
- void visit(const framing::SessionFlowOkBody&);
- void visit(const framing::SessionHighWaterMarkBody&);
- void visit(const framing::SessionOpenBody&);
- void visit(const framing::SessionResumeBody&);
- void visit(const framing::SessionSolicitAckBody&);
- void visit(const framing::SessionSuspendBody&);
+ /** Handle AMQP session methods, pass other frames to the session
+ * if there is one. Frames channel must be == getChannel()
+ */
+ void handle(framing::AMQFrame&);
+
+ /** Returns 0 if not attached to a session */
+ Session* getSession() const { return session.get(); }
- using FrameDefaultVisitor::visit;
+ framing::ChannelId getChannel() const { return channel; }
+ Connection& getConnection() { return connection; }
+ const Connection& getConnection() const { return connection; }
private:
- SessionState state;
- SuspendedSessions* suspended;
+ Connection& connection;
+ const framing::ChannelId channel;
+ shared_ptr<Session> session;
};
}} // namespace qpid::broker