From afefc741a9ad4c6299a47805a45a1c81a048e0a2 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 20 Jan 2009 13:30:08 +0000 Subject: QPID-1567: added 'exactly-once' guarantee to asynchronous replication of queue state * altered replication protocol to detect and eliminate duplicates * added support for acknowledged transfer over inter-broker bridges * added option to qpid-route to control this git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736018 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/SessionState.cpp | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) (limited to 'cpp/src/qpid/broker/SessionState.cpp') diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 4f088fdf4c..d0804d66b9 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -175,7 +175,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN } if (method->isSync()) { incomplete.process(enqueuedOp, true); - sendCompletion(); + sendAcceptAndCompletion(); } } @@ -207,18 +207,27 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) //hold up execution until async enqueue is complete if (msg->getFrames().getMethod()->isSync()) { incomplete.process(enqueuedOp, true); - sendCompletion(); + sendAcceptAndCompletion(); } else { incomplete.process(enqueuedOp, false); } } } +void SessionState::sendAcceptAndCompletion() +{ + if (!accepted.empty()) { + getProxy().getMessage().accept(accepted); + accepted.clear(); + } + sendCompletion(); +} + void SessionState::enqueued(boost::intrusive_ptr msg) { receiverCompleted(msg->getCommandId()); - if (msg->requiresAccept()) - getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); + if (msg->requiresAccept()) + accepted.add(msg->getCommandId()); } void SessionState::handleIn(AMQFrame& frame) { @@ -240,16 +249,23 @@ void SessionState::handleOut(AMQFrame& frame) { handler->out(frame); } -void SessionState::deliver(DeliveryRecord& msg) +void SessionState::deliver(DeliveryRecord& msg, bool sync) { uint32_t maxFrameSize = getConnection().getFrameMax(); assert(senderGetCommandPoint().offset == 0); SequenceNumber commandId = senderGetCommandPoint().command; msg.deliver(getProxy().getHandler(), commandId, maxFrameSize); assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. + if (sync) { + AMQP_ClientProxy::Execution& p(getProxy().getExecution()); + Proxy::ScopedSync s(p); + p.sync(); + } } -void SessionState::sendCompletion() { handler->sendCompletion(); } +void SessionState::sendCompletion() { + handler->sendCompletion(); +} void SessionState::senderCompleted(const SequenceSet& commands) { qpid::SessionState::senderCompleted(commands); -- cgit v1.2.1