diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 58 |
1 files changed, 12 insertions, 46 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 60c6a5cc10..2dd7861e4a 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -37,13 +37,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -SemanticHandler::SemanticHandler(Session& s) : - session(s), - connection(s.getAdapter()->getConnection()), - adapter(s, static_cast<ChannelAdapter&>(*this)) -{ - init(s.getAdapter()->getChannel(), s.out, 0); -} +SemanticHandler::SemanticHandler(Session& s) : HandlerImpl(s) {} void SemanticHandler::handle(framing::AMQFrame& frame) { @@ -86,24 +80,24 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran if (outgoing.lwm < mark) { outgoing.lwm = mark; //ack messages: - session.ackCumulative(mark.getValue()); + getSession().ackCumulative(mark.getValue()); } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { - session.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); + getSession().ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); } } } void SemanticHandler::sendCompletion() { - if (isOpen()) { + if (getSessionHandler()) { SequenceNumber mark = incoming.getMark(); SequenceNumberSet range = incoming.getRange(); Mutex::ScopedLock l(outLock); - ChannelAdapter::send(ExecutionCompleteBody(getVersion(), mark.getValue(), range)); + getProxy().getExecution().complete(mark.getValue(), range); } } void SemanticHandler::flush() @@ -129,7 +123,8 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) void SemanticHandler::handleCommand(framing::AMQMethodBody* method) { - SequenceNumber id = incoming.next(); + SequenceNumber id = incoming.next(); + BrokerAdapter adapter(getSession()); InvocationVisitor v(&adapter); method->accept(v); incoming.complete(id); @@ -137,7 +132,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) if (!v.wasHandled()) { throw ConnectionException(540, "Not implemented"); } else if (v.hasResult()) { - ChannelAdapter::send(ExecutionResultBody(getVersion(), id.getValue(), v.getResult())); + getProxy().getExecution().result(id.getValue(), v.getResult()); } //TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -159,45 +154,24 @@ void SemanticHandler::handleContent(AMQFrame& frame) } msgBuilder.handle(frame); if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags - msg->setPublisher(&connection); - session.handle(msg); + msg->setPublisher(&getConnection()); + getSession().handle(msg); msgBuilder.end(); incoming.track(msg); //TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); } } } -bool SemanticHandler::isOpen() const { - // FIXME aconway 2007-08-30: remove. - return true; -} - DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { Mutex::ScopedLock l(outLock); - MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax()); + MessageDelivery::deliver(msg, getSessionHandler()->out, ++outgoing.hwm, token, getConnection().getFrameMax()); return outgoing.hwm; } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) { - MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax()); -} - -void SemanticHandler::send(const AMQBody& body) -{ - Mutex::ScopedLock l(outLock); - // FIXME aconway 2007-08-31: SessionHandler should not send - // channel/session commands via the semantic handler, it should shortcut - // directly to its own output handler. That will make the CLASS_ID - // part of the test unnecessary. - // - if (body.getMethod() && - body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID) - { - ++outgoing.hwm; - } - ChannelAdapter::send(body); + MessageDelivery::deliver(msg, getSessionHandler()->out, tag, token, getConnection().getFrameMax()); } SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) @@ -225,11 +199,3 @@ SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) throw Exception("Could not determine track"); } -//ChannelAdapter virtual methods, no longer used: -void SemanticHandler::handleMethod(framing::AMQMethodBody*){} - -void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {} - -void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {} - -void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {} |
