summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp37
1 files changed, 33 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 9b44f31e14..e012d693fb 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -387,7 +387,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
++end;
}
- for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
+ for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1));
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -427,16 +427,16 @@ void SemanticState::requestDispatch(ConsumerImpl& c)
}
}
-void SemanticState::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::adjustFlow(const DeliveryRecord& delivery)
{
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- get_pointer(i)->acknowledged(delivery);
+ get_pointer(i)->adjustFlow(delivery);
}
}
-void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery)
{
if (windowing) {
if (msgCredit != 0xFFFFFFFF) msgCredit++;
@@ -639,4 +639,33 @@ void SemanticState::ConsumerImpl::notify()
parent->outputTasks.activateOutput();
}
+
+void SemanticState::accepted(DeliveryId first, DeliveryId last)
+{
+ AckRange range = findRange(first, last);
+ if (txBuffer.get()) {
+ //in transactional mode, don't dequeue or remove, just
+ //maintain set of acknowledged messages:
+ accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet
+
+ if (dtxBuffer.get()) {
+ //if enlisted in a dtx, remove the relevant slice from
+ //unacked and record it against that transaction
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ }
+ } else {
+ for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+ unacked.erase(range.start, range.end);
+ }
+}
+
+void SemanticState::completed(DeliveryId first, DeliveryId last)
+{
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1));
+ requestDispatch();
+}
+
}} // namespace qpid::broker