summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp49
1 files changed, 13 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 573a567da6..5f04136444 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -49,7 +49,7 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2))
+ ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2))
{
getConnection().outputTasks.addOutputTask(&semanticState);
@@ -170,9 +170,9 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
void SessionState::handleCommand(framing::AMQMethodBody* method)
{
- SequenceNumber id = incoming.next();
+ SequenceNumber id = next++;
Invoker::Result invocation = invoke(adapter, *method);
- incoming.complete(id);
+ completed.add(id);
if (!invocation.wasHandled()) {
throw NotImplementedException("Not implemented");
@@ -180,7 +180,6 @@ void SessionState::handleCommand(framing::AMQMethodBody* method)
getProxy().getExecution().result(id.getValue(), invocation.getResult());
}
if (method->isSync()) {
- incoming.sync(id);
sendCompletion();
}
//TODO: if window gets too large send unsolicited completion
@@ -190,7 +189,8 @@ void SessionState::handleContent(AMQFrame& frame)
{
intrusive_ptr<Message> msg(msgBuilder.getMessage());
if (!msg) {//start of frameset will be indicated by frame flags
- msgBuilder.start(incoming.next());
+ SequenceNumber id = next++;
+ msgBuilder.start(id);
msg = msgBuilder.getMessage();
}
msgBuilder.handle(frame);
@@ -198,9 +198,9 @@ void SessionState::handleContent(AMQFrame& frame)
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
- incoming.track(msg);
+ //TODO: may want to hold up execution until async enqueue is complete
+ completed.add(msg->getCommandId());
if (msg->getFrames().getMethod()->isSync()) {
- incoming.sync(msg->getCommandId());
sendCompletion();
}
}
@@ -208,6 +208,8 @@ void SessionState::handleContent(AMQFrame& frame)
void SessionState::handle(AMQFrame& frame)
{
+ received(frame);
+
//TODO: make command handling more uniform, regardless of whether
//commands carry content. (For now, assume all single frame
//assmblies are non-content bearing and all content-bearing
@@ -229,38 +231,13 @@ DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr t
void SessionState::sendCompletion()
{
- SequenceNumber mark = incoming.getMark();
- SequenceNumberSet range = incoming.getRange();
- getProxy().getExecution().complete(mark.getValue(), range);
-}
-
-void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range)
-{
- //record:
- SequenceNumber mark(cumulative);
- if (outgoing.lwm < mark) {
- outgoing.lwm = mark;
- //ack messages:
- semanticState.ackCumulative(mark.getValue());
- }
- range.processRanges(ackOp);
-}
-
-void SessionState::flush()
-{
- incoming.flush();
- sendCompletion();
-}
-
-void SessionState::sync()
-{
- incoming.sync();
- sendCompletion();
+ handler->sendCompletion();
}
-void SessionState::noop()
+void SessionState::complete(const SequenceSet& commands)
{
- incoming.noop();
+ knownCompleted.add(commands);
+ commands.for_each(ackOp);
}