summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp24
1 files changed, 14 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 5851eeeafb..f372c60044 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -393,7 +393,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
++end;
}
- for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1));
+ for_each(start, end, boost::bind(&SemanticState::complete, this, _1));
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -433,20 +433,23 @@ void SemanticState::requestDispatch(ConsumerImpl& c)
}
}
-void SemanticState::adjustFlow(const DeliveryRecord& delivery)
+void SemanticState::complete(DeliveryRecord& delivery)
{
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- get_pointer(i)->adjustFlow(delivery);
+ get_pointer(i)->complete(delivery);
}
}
-void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery)
+void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
{
- if (windowing) {
- if (msgCredit != 0xFFFFFFFF) msgCredit++;
- if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
+ if (!delivery.isComplete()) {
+ delivery.complete();
+ if (windowing) {
+ if (msgCredit != 0xFFFFFFFF) msgCredit++;
+ if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
+ }
}
}
@@ -662,15 +665,16 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
dtxBuffer->enlist(txAck);
}
} else {
- for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
- unacked.erase(range.start, range.end);
+ for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0));
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
}
}
void SemanticState::completed(DeliveryId first, DeliveryId last)
{
AckRange range = findRange(first, last);
- for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1));
+ for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
requestDispatch();
}