diff options
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 4f088fdf4c..d0804d66b9 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -175,7 +175,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN } if (method->isSync()) { incomplete.process(enqueuedOp, true); - sendCompletion(); + sendAcceptAndCompletion(); } } @@ -207,18 +207,27 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) //hold up execution until async enqueue is complete if (msg->getFrames().getMethod()->isSync()) { incomplete.process(enqueuedOp, true); - sendCompletion(); + sendAcceptAndCompletion(); } else { incomplete.process(enqueuedOp, false); } } } +void SessionState::sendAcceptAndCompletion() +{ + if (!accepted.empty()) { + getProxy().getMessage().accept(accepted); + accepted.clear(); + } + sendCompletion(); +} + void SessionState::enqueued(boost::intrusive_ptr<Message> msg) { receiverCompleted(msg->getCommandId()); - if (msg->requiresAccept()) - getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); + if (msg->requiresAccept()) + accepted.add(msg->getCommandId()); } void SessionState::handleIn(AMQFrame& frame) { @@ -240,16 +249,23 @@ void SessionState::handleOut(AMQFrame& frame) { handler->out(frame); } -void SessionState::deliver(DeliveryRecord& msg) +void SessionState::deliver(DeliveryRecord& msg, bool sync) { uint32_t maxFrameSize = getConnection().getFrameMax(); assert(senderGetCommandPoint().offset == 0); SequenceNumber commandId = senderGetCommandPoint().command; msg.deliver(getProxy().getHandler(), commandId, maxFrameSize); assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. + if (sync) { + AMQP_ClientProxy::Execution& p(getProxy().getExecution()); + Proxy::ScopedSync s(p); + p.sync(); + } } -void SessionState::sendCompletion() { handler->sendCompletion(); } +void SessionState::sendCompletion() { + handler->sendCompletion(); +} void SessionState::senderCompleted(const SequenceSet& commands) { qpid::SessionState::senderCompleted(commands); |
