summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp58
1 files changed, 12 insertions, 46 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 60c6a5cc10..2dd7861e4a 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -37,13 +37,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-SemanticHandler::SemanticHandler(Session& s) :
- session(s),
- connection(s.getAdapter()->getConnection()),
- adapter(s, static_cast<ChannelAdapter&>(*this))
-{
- init(s.getAdapter()->getChannel(), s.out, 0);
-}
+SemanticHandler::SemanticHandler(Session& s) : HandlerImpl(s) {}
void SemanticHandler::handle(framing::AMQFrame& frame)
{
@@ -86,24 +80,24 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran
if (outgoing.lwm < mark) {
outgoing.lwm = mark;
//ack messages:
- session.ackCumulative(mark.getValue());
+ getSession().ackCumulative(mark.getValue());
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
- session.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ getSession().ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
}
}
}
void SemanticHandler::sendCompletion()
{
- if (isOpen()) {
+ if (getSessionHandler()) {
SequenceNumber mark = incoming.getMark();
SequenceNumberSet range = incoming.getRange();
Mutex::ScopedLock l(outLock);
- ChannelAdapter::send(ExecutionCompleteBody(getVersion(), mark.getValue(), range));
+ getProxy().getExecution().complete(mark.getValue(), range);
}
}
void SemanticHandler::flush()
@@ -129,7 +123,8 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
- SequenceNumber id = incoming.next();
+ SequenceNumber id = incoming.next();
+ BrokerAdapter adapter(getSession());
InvocationVisitor v(&adapter);
method->accept(v);
incoming.complete(id);
@@ -137,7 +132,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
if (!v.wasHandled()) {
throw ConnectionException(540, "Not implemented");
} else if (v.hasResult()) {
- ChannelAdapter::send(ExecutionResultBody(getVersion(), id.getValue(), v.getResult()));
+ getProxy().getExecution().result(id.getValue(), v.getResult());
}
//TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); }
//TODO: if window gets too large send unsolicited completion
@@ -159,45 +154,24 @@ void SemanticHandler::handleContent(AMQFrame& frame)
}
msgBuilder.handle(frame);
if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
- msg->setPublisher(&connection);
- session.handle(msg);
+ msg->setPublisher(&getConnection());
+ getSession().handle(msg);
msgBuilder.end();
incoming.track(msg);
//TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); }
}
}
-bool SemanticHandler::isOpen() const {
- // FIXME aconway 2007-08-30: remove.
- return true;
-}
-
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
Mutex::ScopedLock l(outLock);
- MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax());
+ MessageDelivery::deliver(msg, getSessionHandler()->out, ++outgoing.hwm, token, getConnection().getFrameMax());
return outgoing.hwm;
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
{
- MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax());
-}
-
-void SemanticHandler::send(const AMQBody& body)
-{
- Mutex::ScopedLock l(outLock);
- // FIXME aconway 2007-08-31: SessionHandler should not send
- // channel/session commands via the semantic handler, it should shortcut
- // directly to its own output handler. That will make the CLASS_ID
- // part of the test unnecessary.
- //
- if (body.getMethod() &&
- body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID)
- {
- ++outgoing.hwm;
- }
- ChannelAdapter::send(body);
+ MessageDelivery::deliver(msg, getSessionHandler()->out, tag, token, getConnection().getFrameMax());
}
SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
@@ -225,11 +199,3 @@ SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
throw Exception("Could not determine track");
}
-//ChannelAdapter virtual methods, no longer used:
-void SemanticHandler::handleMethod(framing::AMQMethodBody*){}
-
-void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {}
-
-void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {}
-
-void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {}