diff options
| author | Alan Conway <aconway@apache.org> | 2007-08-31 20:51:22 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-08-31 20:51:22 +0000 |
| commit | 761e10501fe5ea51f9d8c40d9a200ae27193ab23 (patch) | |
| tree | e2d4bdfdc0b9383661947378a1f183387501637c /cpp/src/qpid/broker/SemanticHandler.cpp | |
| parent | 655b3b5806bafdd784f6a9c242e26341bd6aeccc (diff) | |
| download | qpid-python-761e10501fe5ea51f9d8c40d9a200ae27193ab23.tar.gz | |
* Summary:
- Moved BrokerChannel functionality into Session.
- Moved ChannelHandler methods handling into SessionAdapter.
- Updated all handlers to use session.
(We're still using AMQP channel methods in SessionAdapter)
Roles & responsibilities:
Session:
- represents an _open_ session, may be active or suspended.
- ows all session state including handler chains.
- attahced to SessionAdapter when active, not when suspended.
SessionAdapter:
- reprents the association of a channel with a session.
- owned by Connection, kept in the session map.
- channel open == SessionAdapter.getSessio() != 0
Anything that depends on attachment to a channel, connection or
protocol should be in SessionAdpater. Anything that suvives a
session suspend belongs in Session.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@571575 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 114 |
1 files changed, 42 insertions, 72 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 5e9106c1dd..f1bdc68899 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -20,25 +20,29 @@ */ #include "SemanticHandler.h" - -#include "boost/format.hpp" +#include "Session.h" +#include "SessionAdapter.h" #include "BrokerAdapter.h" #include "MessageDelivery.h" -#include "qpid/framing/ChannelAdapter.h" -#include "qpid/framing/ChannelCloseOkBody.h" +#include "Connection.h" +#include "Session.h" #include "qpid/framing/ExecutionCompleteBody.h" #include "qpid/framing/ExecutionResultBody.h" +#include "qpid/framing/ChannelOpenBody.h" #include "qpid/framing/InvocationVisitor.h" +#include <boost/format.hpp> + using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : - connection(c), channel(c, *this, id) +SemanticHandler::SemanticHandler(Session& s) : + session(s), + connection(s.getAdapter()->getConnection()), + adapter(s, static_cast<ChannelAdapter&>(*this)) { - init(id, connection.getOutput(), connection.getVersion()); - adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this)); + init(s.getAdapter()->getChannel(), s.out, 0); } void SemanticHandler::handle(framing::AMQFrame& frame) @@ -60,35 +64,18 @@ void SemanticHandler::handle(framing::AMQFrame& frame) //open. execute it (i.e. out-of order execution with respect to //the command id sequence) or queue it up? - try{ - - TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header - - switch(track) { - case SESSION_CONTROL_TRACK://TODO: L2 should be handled by separate handler - handleL2(frame.castBody<AMQMethodBody>()); - break; - case EXECUTION_CONTROL_TRACK: - handleL3(frame.castBody<AMQMethodBody>()); - break; - case MODEL_COMMAND_TRACK: - if (!isOpen()) { - throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str()); - } - handleCommand(frame.castBody<AMQMethodBody>()); - break; - case MODEL_CONTENT_TRACK: - handleContent(frame); - break; - } + TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header - }catch(const ChannelException& e){ - adapter->getProxy().getChannel().close(e.code, e.toString(), getClassId(frame), getMethodId(frame)); - connection.closeChannel(getId()); - }catch(const ConnectionException& e){ - connection.close(e.code, e.toString(), getClassId(frame), getMethodId(frame)); - }catch(const std::exception& e){ - connection.close(541/*internal error*/, e.what(), getClassId(frame), getMethodId(frame)); + switch(track) { + case EXECUTION_CONTROL_TRACK: + handleL3(frame.getMethod()); + break; + case MODEL_COMMAND_TRACK: + handleCommand(frame.getMethod()); + break; + case MODEL_CONTENT_TRACK: + handleContent(frame); + break; } } @@ -99,13 +86,13 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran if (outgoing.lwm < mark) { outgoing.lwm = mark; //ack messages: - channel.ackCumulative(mark.getValue()); + session.ackCumulative(mark.getValue()); } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { - channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); + session.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); } } } @@ -141,7 +128,7 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) void SemanticHandler::handleCommand(framing::AMQMethodBody* method) { ++(incoming.lwm); - InvocationVisitor v(adapter.get()); + InvocationVisitor v(&adapter); method->accept(v); //TODO: need to account for async store operations and interleaving ++(incoming.hwm); @@ -153,17 +140,6 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) } } -void SemanticHandler::handleL2(framing::AMQMethodBody* method) -{ - if(!method->isA<ChannelOpenBody>() && !isOpen()) { - if (!method->isA<ChannelCloseOkBody>()) { - throw ConnectionException(504, (boost::format("Attempt to use unopened channel: %g") % getId()).str()); - } - } else { - method->invoke(adapter->getChannelHandler()); - } -} - void SemanticHandler::handleL3(framing::AMQMethodBody* method) { if (!method->invoke(this)) { @@ -181,16 +157,16 @@ void SemanticHandler::handleContent(AMQFrame& frame) msgBuilder.handle(frame); if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags msg->setPublisher(&connection); - channel.handle(msg); + session.handle(msg); msgBuilder.end(); //TODO: need to account for async store operations and interleaving ++(incoming.hwm); } } -bool SemanticHandler::isOpen() const -{ - return channel.isOpen(); +bool SemanticHandler::isOpen() const { + // FIXME aconway 2007-08-30: remove. + return true; } DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) @@ -210,45 +186,39 @@ void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ void SemanticHandler::send(const AMQBody& body) { Mutex::ScopedLock l(outLock); - if (body.getMethod() && body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID) { - //temporary hack until channel management is moved to its own handler: + // FIXME aconway 2007-08-31: SessionAdapter should not send + // channel/session commands via the semantic handler, it should shortcut + // directly to its own output handler. That will make the CLASS_ID + // part of the test unnecessary. + // + if (body.getMethod() && + body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID) + { ++outgoing.hwm; } ChannelAdapter::send(body); } -uint16_t SemanticHandler::getClassId(const AMQFrame& frame) -{ - return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpClassId() : 0; -} - -uint16_t SemanticHandler::getMethodId(const AMQFrame& frame) -{ - return frame.getBody()->type() == METHOD_BODY ? frame.castBody<AMQMethodBody>()->amqpMethodId() : 0; -} - SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) { //will be replaced by field in 0-10 frame header uint8_t type = frame.getBody()->type(); uint16_t classId; switch(type) { - case METHOD_BODY: + case METHOD_BODY: if (frame.castBody<AMQMethodBody>()->isContentBearing()) { return MODEL_CONTENT_TRACK; } classId = frame.castBody<AMQMethodBody>()->amqpClassId(); switch (classId) { - case ChannelOpenBody::CLASS_ID: - return SESSION_CONTROL_TRACK; - case ExecutionCompleteBody::CLASS_ID: + case ExecutionCompleteBody::CLASS_ID: return EXECUTION_CONTROL_TRACK; } return MODEL_COMMAND_TRACK; - case HEADER_BODY: - case CONTENT_BODY: + case HEADER_BODY: + case CONTENT_BODY: return MODEL_CONTENT_TRACK; } throw Exception("Could not determine track"); |
