diff options
| author | Gordon Sim <gsim@apache.org> | 2008-03-07 13:20:02 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-03-07 13:20:02 +0000 |
| commit | 5d8a9df4ec3a4f030ed80e143ce6986c19ab800a (patch) | |
| tree | 8417c3abe9dd81e6a73084aa36371981e06f9e27 /cpp/src/qpid/broker/SemanticState.cpp | |
| parent | 9fd4909832e16734c47c13eebbe4aca66640b1b0 (diff) | |
| download | qpid-python-5d8a9df4ec3a4f030ed80e143ce6986c19ab800a.tar.gz | |
Altered management of delivery records to support separateion of completion (which drives flow control) and acceptance.
Converted flow control python tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 5851eeeafb..f372c60044 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -393,7 +393,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) ++end; } - for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1)); + for_each(start, end, boost::bind(&SemanticState::complete, this, _1)); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -433,20 +433,23 @@ void SemanticState::requestDispatch(ConsumerImpl& c) } } -void SemanticState::adjustFlow(const DeliveryRecord& delivery) +void SemanticState::complete(DeliveryRecord& delivery) { delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - get_pointer(i)->adjustFlow(delivery); + get_pointer(i)->complete(delivery); } } -void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery) +void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) { - if (windowing) { - if (msgCredit != 0xFFFFFFFF) msgCredit++; - if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit); + if (!delivery.isComplete()) { + delivery.complete(); + if (windowing) { + if (msgCredit != 0xFFFFFFFF) msgCredit++; + if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); + } } } @@ -662,15 +665,16 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last) dtxBuffer->enlist(txAck); } } else { - for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); - unacked.erase(range.start, range.end); + for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0)); + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); } } void SemanticState::completed(DeliveryId first, DeliveryId last) { AckRange range = findRange(first, last); - for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1)); + for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1)); + unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant)); requestDispatch(); } |
