summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp12
1 files changed, 8 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index d9896b388b..f9f75679e5 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -256,7 +256,9 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
arguments(_arguments),
msgCredit(0),
byteCredit(0),
- notifyEnabled(true) {}
+ notifyEnabled(true),
+ syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
+ deliveryCount(0) {}
OwnershipToken* SemanticState::ConsumerImpl::getSession()
{
@@ -267,7 +269,9 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
allocateCredit(msg.payload);
DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
- parent->deliver(record);
+ bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
+ if (sync) deliveryCount = 0;//reset
+ parent->deliver(record, sync);
if (!ackExpected) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
parent->record(record);
@@ -449,9 +453,9 @@ void SemanticState::recover(bool requeue)
}
}
-void SemanticState::deliver(DeliveryRecord& msg)
+void SemanticState::deliver(DeliveryRecord& msg, bool sync)
{
- return deliveryAdapter.deliver(msg);
+ return deliveryAdapter.deliver(msg, sync);
}
SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)