summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp28
1 files changed, 22 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 4f088fdf4c..d0804d66b9 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -175,7 +175,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN
}
if (method->isSync()) {
incomplete.process(enqueuedOp, true);
- sendCompletion();
+ sendAcceptAndCompletion();
}
}
@@ -207,18 +207,27 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
//hold up execution until async enqueue is complete
if (msg->getFrames().getMethod()->isSync()) {
incomplete.process(enqueuedOp, true);
- sendCompletion();
+ sendAcceptAndCompletion();
} else {
incomplete.process(enqueuedOp, false);
}
}
}
+void SessionState::sendAcceptAndCompletion()
+{
+ if (!accepted.empty()) {
+ getProxy().getMessage().accept(accepted);
+ accepted.clear();
+ }
+ sendCompletion();
+}
+
void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
{
receiverCompleted(msg->getCommandId());
- if (msg->requiresAccept())
- getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));
+ if (msg->requiresAccept())
+ accepted.add(msg->getCommandId());
}
void SessionState::handleIn(AMQFrame& frame) {
@@ -240,16 +249,23 @@ void SessionState::handleOut(AMQFrame& frame) {
handler->out(frame);
}
-void SessionState::deliver(DeliveryRecord& msg)
+void SessionState::deliver(DeliveryRecord& msg, bool sync)
{
uint32_t maxFrameSize = getConnection().getFrameMax();
assert(senderGetCommandPoint().offset == 0);
SequenceNumber commandId = senderGetCommandPoint().command;
msg.deliver(getProxy().getHandler(), commandId, maxFrameSize);
assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
+ if (sync) {
+ AMQP_ClientProxy::Execution& p(getProxy().getExecution());
+ Proxy::ScopedSync s(p);
+ p.sync();
+ }
}
-void SessionState::sendCompletion() { handler->sendCompletion(); }
+void SessionState::sendCompletion() {
+ handler->sendCompletion();
+}
void SessionState::senderCompleted(const SequenceSet& commands) {
qpid::SessionState::senderCompleted(commands);