summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticHandler.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-08-31 20:51:22 +0000
committerAlan Conway <aconway@apache.org>2007-08-31 20:51:22 +0000
commit761e10501fe5ea51f9d8c40d9a200ae27193ab23 (patch)
treee2d4bdfdc0b9383661947378a1f183387501637c /cpp/src/qpid/broker/SemanticHandler.cpp
parent655b3b5806bafdd784f6a9c242e26341bd6aeccc (diff)
downloadqpid-python-761e10501fe5ea51f9d8c40d9a200ae27193ab23.tar.gz
* Summary:
- Moved BrokerChannel functionality into Session. - Moved ChannelHandler methods handling into SessionAdapter. - Updated all handlers to use session. (We're still using AMQP channel methods in SessionAdapter) Roles & responsibilities: Session: - represents an _open_ session, may be active or suspended. - ows all session state including handler chains. - attahced to SessionAdapter when active, not when suspended. SessionAdapter: - reprents the association of a channel with a session. - owned by Connection, kept in the session map. - channel open == SessionAdapter.getSessio() != 0 Anything that depends on attachment to a channel, connection or protocol should be in SessionAdpater. Anything that suvives a session suspend belongs in Session. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@571575 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp114
1 files changed, 42 insertions, 72 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 5e9106c1dd..f1bdc68899 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -20,25 +20,29 @@
*/
#include "SemanticHandler.h"
-
-#include "boost/format.hpp"
+#include "Session.h"
+#include "SessionAdapter.h"
#include "BrokerAdapter.h"
#include "MessageDelivery.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "qpid/framing/ChannelCloseOkBody.h"
+#include "Connection.h"
+#include "Session.h"
#include "qpid/framing/ExecutionCompleteBody.h"
#include "qpid/framing/ExecutionResultBody.h"
+#include "qpid/framing/ChannelOpenBody.h"
#include "qpid/framing/InvocationVisitor.h"
+#include <boost/format.hpp>
+
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
- connection(c), channel(c, *this, id)
+SemanticHandler::SemanticHandler(Session& s) :
+ session(s),
+ connection(s.getAdapter()->getConnection()),
+ adapter(s, static_cast<ChannelAdapter&>(*this))
{
- init(id, connection.getOutput(), connection.getVersion());
- adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
+ init(s.getAdapter()->getChannel(), s.out, 0);
}
void SemanticHandler::handle(framing::AMQFrame& frame)
@@ -60,35 +64,18 @@ void SemanticHandler::handle(framing::AMQFrame& frame)
//open. execute it (i.e. out-of order execution with respect to
//the command id sequence) or queue it up?
- try{
-
- TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
-
- switch(track) {
- case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler
- handleL2(frame.castBody<AMQMethodBody>());
- break;
- case EXECUTION_CONTROL_TRACK:
- handleL3(frame.castBody<AMQMethodBody>());
- break;
- case MODEL_COMMAND_TRACK:
- if (!isOpen()) {
- throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
- }
- handleCommand(frame.castBody<AMQMethodBody>());
- break;
- case MODEL_CONTENT_TRACK:
- handleContent(frame);
- break;
- }
+ TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
- }catch(const ChannelException& e){
- adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
- connection.closeChannel(getId());
- }catch(const ConnectionException& e){
- connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame));
- }catch(const std::exception& e){
- connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame));
+ switch(track) {
+ case EXECUTION_CONTROL_TRACK:
+ handleL3(frame.getMethod());
+ break;
+ case MODEL_COMMAND_TRACK:
+ handleCommand(frame.getMethod());
+ break;
+ case MODEL_CONTENT_TRACK:
+ handleContent(frame);
+ break;
}
}
@@ -99,13 +86,13 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran
if (outgoing.lwm < mark) {
outgoing.lwm = mark;
//ack messages:
- channel.ackCumulative(mark.getValue());
+ session.ackCumulative(mark.getValue());
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
- channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ session.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
}
}
}
@@ -141,7 +128,7 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
++(incoming.lwm);
- InvocationVisitor v(adapter.get());
+ InvocationVisitor v(&adapter);
method->accept(v);
//TODO: need to account for async store operations and interleaving
++(incoming.hwm);
@@ -153,17 +140,6 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
}
}
-void SemanticHandler::handleL2(framing::AMQMethodBody* method)
-{
- if(!method->isA<ChannelOpenBody>() && !isOpen()) {
- if (!method->isA<ChannelCloseOkBody>()) {
- throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str());
- }
- } else {
- method->invoke(adapter->getChannelHandler());
- }
-}
-
void SemanticHandler::handleL3(framing::AMQMethodBody* method)
{
if (!method->invoke(this)) {
@@ -181,16 +157,16 @@ void SemanticHandler::handleContent(AMQFrame& frame)
msgBuilder.handle(frame);
if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
msg->setPublisher(&connection);
- channel.handle(msg);
+ session.handle(msg);
msgBuilder.end();
//TODO: need to account for async store operations and interleaving
++(incoming.hwm);
}
}
-bool SemanticHandler::isOpen() const
-{
- return channel.isOpen();
+bool SemanticHandler::isOpen() const {
+ // FIXME aconway 2007-08-30: remove.
+ return true;
}
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
@@ -210,45 +186,39 @@ void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_
void SemanticHandler::send(const AMQBody& body)
{
Mutex::ScopedLock l(outLock);
- if (body.getMethod() && body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID) {
- //temporary hack until channel management is moved to its own handler:
+ // FIXME aconway 2007-08-31: SessionAdapter should not send
+ // channel/session commands via the semantic handler, it should shortcut
+ // directly to its own output handler. That will make the CLASS_ID
+ // part of the test unnecessary.
+ //
+ if (body.getMethod() &&
+ body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID)
+ {
++outgoing.hwm;
}
ChannelAdapter::send(body);
}
-uint16_t SemanticHandler::getClassId(const AMQFrame& frame)
-{
- return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpClassId() : 0;
-}
-
-uint16_t SemanticHandler::getMethodId(const AMQFrame& frame)
-{
- return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpMethodId() : 0;
-}
-
SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
{
//will be replaced by field in 0-10 frame header
uint8_t type = frame.getBody()->type();
uint16_t classId;
switch(type) {
- case METHOD_BODY:
+ case METHOD_BODY:
if (frame.castBody<AMQMethodBody>()->isContentBearing()) {
return MODEL_CONTENT_TRACK;
}
classId = frame.castBody<AMQMethodBody>()->amqpClassId();
switch (classId) {
- case ChannelOpenBody::CLASS_ID:
- return SESSION_CONTROL_TRACK;
- case ExecutionCompleteBody::CLASS_ID:
+ case ExecutionCompleteBody::CLASS_ID:
return EXECUTION_CONTROL_TRACK;
}
return MODEL_COMMAND_TRACK;
- case HEADER_BODY:
- case CONTENT_BODY:
+ case HEADER_BODY:
+ case CONTENT_BODY:
return MODEL_CONTENT_TRACK;
}
throw Exception("Could not determine track");