diff options
| author | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
| commit | 80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch) | |
| tree | 13677bf773bf25db03144aa72c97a49d2810240d /cpp/src/qpid/broker/SemanticHandler.cpp | |
| parent | a9232d5a02a19f093f212cb0b76772a20b45cb1b (diff) | |
| download | qpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz | |
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses.
Some refactoring around message delivery.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 52 |
1 files changed, 46 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 2b1de1bbc0..e9ec698400 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -25,10 +25,11 @@ using namespace qpid::broker; using namespace qpid::framing; +using namespace qpid::sys; SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : connection(c), - channel(c, id, &c.broker.getStore()) + channel(c, *this, id, &c.broker.getStore()) { init(id, connection.getOutput(), connection.getVersion()); adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this)); @@ -75,10 +76,24 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ } } -void SemanticHandler::complete(uint32_t mark, uint16_t /*range- not decoded correctly yet*/) +void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range) { - //just record it for now (will eventually need to use it to ack messages): - outgoing.lwm = SequenceNumber(mark); + //record: + SequenceNumber mark(cumulative); + if (outgoing.lwm < mark) { + outgoing.lwm = mark; + //ack messages: + channel.ack(mark.getValue(), true); + //std::cout << "[" << this << "] acknowledged: " << mark << std::endl; + } + if (range.size() % 2) { //must be even number + throw ConnectionException(530, "Received odd number of elements in ranged mark"); + } else { + //TODO: need to keep a record of the full range previously acked + for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) { + channel.ack((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); + } + } } void SemanticHandler::flush() @@ -86,8 +101,8 @@ void SemanticHandler::flush() //flush doubles as a sync to begin with - send an execution.complete incoming.lwm = incoming.hwm; if (isOpen()) { - /*use dummy value for range which is not yet encoded correctly*/ - send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), 0))); + Mutex::ScopedLock l(outLock); + ChannelAdapter::send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()))); } } @@ -140,3 +155,28 @@ void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartb channel.handleHeartbeat(body); } +DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) +{ + Mutex::ScopedLock l(outLock); + SequenceNumber copy(outgoing.hwm); + ++copy; + msg->deliver(*this, copy.getValue(), token, connection.getFrameMax()); + //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl; + return outgoing.hwm.getValue(); +} + +void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) +{ + msg->deliver(*this, tag, token, connection.getFrameMax()); +} + +RequestId SemanticHandler::send(shared_ptr<AMQBody> body, Correlator::Action action) +{ + Mutex::ScopedLock l(outLock); + uint8_t type(body->type()); + if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) { + ++outgoing.hwm; + //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl; + } + return ChannelAdapter::send(body, action); +} |
