summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-07 13:20:02 +0000
committerGordon Sim <gsim@apache.org>2008-03-07 13:20:02 +0000
commit5d8a9df4ec3a4f030ed80e143ce6986c19ab800a (patch)
tree8417c3abe9dd81e6a73084aa36371981e06f9e27 /cpp/src/qpid/broker/SemanticState.cpp
parent9fd4909832e16734c47c13eebbe4aca66640b1b0 (diff)
downloadqpid-python-5d8a9df4ec3a4f030ed80e143ce6986c19ab800a.tar.gz
Altered management of delivery records to support separateion of completion (which drives flow control) and acceptance.
Converted flow control python tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp24
1 files changed, 14 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 5851eeeafb..f372c60044 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -393,7 +393,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
++end;
}
- for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1));
+ for_each(start, end, boost::bind(&SemanticState::complete, this, _1));
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -433,20 +433,23 @@ void SemanticState::requestDispatch(ConsumerImpl& c)
}
}
-void SemanticState::adjustFlow(const DeliveryRecord& delivery)
+void SemanticState::complete(DeliveryRecord& delivery)
{
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- get_pointer(i)->adjustFlow(delivery);
+ get_pointer(i)->complete(delivery);
}
}
-void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery)
+void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
{
- if (windowing) {
- if (msgCredit != 0xFFFFFFFF) msgCredit++;
- if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
+ if (!delivery.isComplete()) {
+ delivery.complete();
+ if (windowing) {
+ if (msgCredit != 0xFFFFFFFF) msgCredit++;
+ if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
+ }
}
}
@@ -662,15 +665,16 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
dtxBuffer->enlist(txAck);
}
} else {
- for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
- unacked.erase(range.start, range.end);
+ for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0));
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
}
}
void SemanticState::completed(DeliveryId first, DeliveryId last)
{
AckRange range = findRange(first, last);
- for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1));
+ for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
requestDispatch();
}