diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 37 |
1 files changed, 33 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 9b44f31e14..e012d693fb 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -387,7 +387,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) ++end; } - for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1)); + for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1)); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -427,16 +427,16 @@ void SemanticState::requestDispatch(ConsumerImpl& c) } } -void SemanticState::acknowledged(const DeliveryRecord& delivery) +void SemanticState::adjustFlow(const DeliveryRecord& delivery) { delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - get_pointer(i)->acknowledged(delivery); + get_pointer(i)->adjustFlow(delivery); } } -void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) +void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery) { if (windowing) { if (msgCredit != 0xFFFFFFFF) msgCredit++; @@ -639,4 +639,33 @@ void SemanticState::ConsumerImpl::notify() parent->outputTasks.activateOutput(); } + +void SemanticState::accepted(DeliveryId first, DeliveryId last) +{ + AckRange range = findRange(first, last); + if (txBuffer.get()) { + //in transactional mode, don't dequeue or remove, just + //maintain set of acknowledged messages: + accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet + + if (dtxBuffer.get()) { + //if enlisted in a dtx, remove the relevant slice from + //unacked and record it against that transaction + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + } + } else { + for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); + unacked.erase(range.start, range.end); + } +} + +void SemanticState::completed(DeliveryId first, DeliveryId last) +{ + AckRange range = findRange(first, last); + for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1)); + requestDispatch(); +} + }} // namespace qpid::broker |
