diff options
| author | Gordon Sim <gsim@apache.org> | 2009-01-20 13:30:08 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-01-20 13:30:08 +0000 |
| commit | afefc741a9ad4c6299a47805a45a1c81a048e0a2 (patch) | |
| tree | 70120255a090b5def48b4f5c72d2c1004841772d /cpp/src/qpid/broker/SessionState.cpp | |
| parent | 1d5e6b196da4ba618ebc91054ee77e6c3c005333 (diff) | |
| download | qpid-python-afefc741a9ad4c6299a47805a45a1c81a048e0a2.tar.gz | |
QPID-1567: added 'exactly-once' guarantee to asynchronous replication of queue state
* altered replication protocol to detect and eliminate duplicates
* added support for acknowledged transfer over inter-broker bridges
* added option to qpid-route to control this
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736018 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 4f088fdf4c..d0804d66b9 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -175,7 +175,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN } if (method->isSync()) { incomplete.process(enqueuedOp, true); - sendCompletion(); + sendAcceptAndCompletion(); } } @@ -207,18 +207,27 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) //hold up execution until async enqueue is complete if (msg->getFrames().getMethod()->isSync()) { incomplete.process(enqueuedOp, true); - sendCompletion(); + sendAcceptAndCompletion(); } else { incomplete.process(enqueuedOp, false); } } } +void SessionState::sendAcceptAndCompletion() +{ + if (!accepted.empty()) { + getProxy().getMessage().accept(accepted); + accepted.clear(); + } + sendCompletion(); +} + void SessionState::enqueued(boost::intrusive_ptr<Message> msg) { receiverCompleted(msg->getCommandId()); - if (msg->requiresAccept()) - getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); + if (msg->requiresAccept()) + accepted.add(msg->getCommandId()); } void SessionState::handleIn(AMQFrame& frame) { @@ -240,16 +249,23 @@ void SessionState::handleOut(AMQFrame& frame) { handler->out(frame); } -void SessionState::deliver(DeliveryRecord& msg) +void SessionState::deliver(DeliveryRecord& msg, bool sync) { uint32_t maxFrameSize = getConnection().getFrameMax(); assert(senderGetCommandPoint().offset == 0); SequenceNumber commandId = senderGetCommandPoint().command; msg.deliver(getProxy().getHandler(), commandId, maxFrameSize); assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. + if (sync) { + AMQP_ClientProxy::Execution& p(getProxy().getExecution()); + Proxy::ScopedSync s(p); + p.sync(); + } } -void SessionState::sendCompletion() { handler->sendCompletion(); } +void SessionState::sendCompletion() { + handler->sendCompletion(); +} void SessionState::senderCompleted(const SequenceSet& commands) { qpid::SessionState::senderCompleted(commands); |
