From 5d8a9df4ec3a4f030ed80e143ce6986c19ab800a Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 7 Mar 2008 13:20:02 +0000 Subject: Altered management of delivery records to support separateion of completion (which drives flow control) and acceptance. Converted flow control python tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634661 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/SemanticState.cpp | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) (limited to 'cpp/src/qpid/broker/SemanticState.cpp') diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 5851eeeafb..f372c60044 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -393,7 +393,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) ++end; } - for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1)); + for_each(start, end, boost::bind(&SemanticState::complete, this, _1)); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -433,20 +433,23 @@ void SemanticState::requestDispatch(ConsumerImpl& c) } } -void SemanticState::adjustFlow(const DeliveryRecord& delivery) +void SemanticState::complete(DeliveryRecord& delivery) { delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - get_pointer(i)->adjustFlow(delivery); + get_pointer(i)->complete(delivery); } } -void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery) +void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) { - if (windowing) { - if (msgCredit != 0xFFFFFFFF) msgCredit++; - if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit); + if (!delivery.isComplete()) { + delivery.complete(); + if (windowing) { + if (msgCredit != 0xFFFFFFFF) msgCredit++; + if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); + } } } @@ -662,15 +665,16 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last) dtxBuffer->enlist(txAck); } } else { - for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); - unacked.erase(range.start, range.end); + for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0)); + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); } } void SemanticState::completed(DeliveryId first, DeliveryId last) { AckRange range = findRange(first, last); - for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1)); + for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1)); + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); requestDispatch(); } -- cgit v1.2.1