summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-01-20 13:30:08 +0000
committerGordon Sim <gsim@apache.org>2009-01-20 13:30:08 +0000
commitafefc741a9ad4c6299a47805a45a1c81a048e0a2 (patch)
tree70120255a090b5def48b4f5c72d2c1004841772d /cpp/src/qpid/broker/SemanticState.cpp
parent1d5e6b196da4ba618ebc91054ee77e6c3c005333 (diff)
downloadqpid-python-afefc741a9ad4c6299a47805a45a1c81a048e0a2.tar.gz
QPID-1567: added 'exactly-once' guarantee to asynchronous replication of queue state
* altered replication protocol to detect and eliminate duplicates * added support for acknowledged transfer over inter-broker bridges * added option to qpid-route to control this git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736018 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp12
1 files changed, 8 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index d9896b388b..f9f75679e5 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -256,7 +256,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
arguments(_arguments),
msgCredit(0),
byteCredit(0),
- notifyEnabled(true) {}
+ notifyEnabled(true),
+ syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
+ deliveryCount(0) {}
OwnershipToken* SemanticState::ConsumerImpl::getSession()
{
@@ -267,7 +269,9 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
allocateCredit(msg.payload);
DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
- parent->deliver(record);
+ bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
+ if (sync) deliveryCount = 0;//reset
+ parent->deliver(record, sync);
if (!ackExpected) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
parent->record(record);
@@ -449,9 +453,9 @@ void SemanticState::recover(bool requeue)
}
}
-void SemanticState::deliver(DeliveryRecord& msg)
+void SemanticState::deliver(DeliveryRecord& msg, bool sync)
{
- return deliveryAdapter.deliver(msg);
+ return deliveryAdapter.deliver(msg, sync);
}
SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)