summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticHandler.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
committerGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
commit80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch)
tree13677bf773bf25db03144aa72c97a49d2810240d /cpp/src/qpid/broker/SemanticHandler.cpp
parenta9232d5a02a19f093f212cb0b76772a20b45cb1b (diff)
downloadqpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses. Some refactoring around message delivery. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticHandler.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp52
1 files changed, 46 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 2b1de1bbc0..e9ec698400 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -25,10 +25,11 @@
using namespace qpid::broker;
using namespace qpid::framing;
+using namespace qpid::sys;
SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
connection(c),
- channel(c, id, &c.broker.getStore())
+ channel(c, *this, id, &c.broker.getStore())
{
init(id, connection.getOutput(), connection.getVersion());
adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this));
@@ -75,10 +76,24 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ
}
}
-void SemanticHandler::complete(uint32_t mark, uint16_t /*range- not decoded correctly yet*/)
+void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range)
{
- //just record it for now (will eventually need to use it to ack messages):
- outgoing.lwm = SequenceNumber(mark);
+ //record:
+ SequenceNumber mark(cumulative);
+ if (outgoing.lwm < mark) {
+ outgoing.lwm = mark;
+ //ack messages:
+ channel.ack(mark.getValue(), true);
+ //std::cout << "[" << this << "] acknowledged: " << mark << std::endl;
+ }
+ if (range.size() % 2) { //must be even number
+ throw ConnectionException(530, "Received odd number of elements in ranged mark");
+ } else {
+ //TODO: need to keep a record of the full range previously acked
+ for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) {
+ channel.ack((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ }
+ }
}
void SemanticHandler::flush()
@@ -86,8 +101,8 @@ void SemanticHandler::flush()
//flush doubles as a sync to begin with - send an execution.complete
incoming.lwm = incoming.hwm;
if (isOpen()) {
- /*use dummy value for range which is not yet encoded correctly*/
- send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), 0)));
+ Mutex::ScopedLock l(outLock);
+ ChannelAdapter::send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
}
}
@@ -140,3 +155,28 @@ void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartb
channel.handleHeartbeat(body);
}
+DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
+{
+ Mutex::ScopedLock l(outLock);
+ SequenceNumber copy(outgoing.hwm);
+ ++copy;
+ msg->deliver(*this, copy.getValue(), token, connection.getFrameMax());
+ //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl;
+ return outgoing.hwm.getValue();
+}
+
+void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
+{
+ msg->deliver(*this, tag, token, connection.getFrameMax());
+}
+
+RequestId SemanticHandler::send(shared_ptr<AMQBody> body, Correlator::Action action)
+{
+ Mutex::ScopedLock l(outLock);
+ uint8_t type(body->type());
+ if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) {
+ ++outgoing.hwm;
+ //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl;
+ }
+ return ChannelAdapter::send(body, action);
+}