diff options
| author | Gordon Sim <gsim@apache.org> | 2008-03-03 14:49:06 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-03-03 14:49:06 +0000 |
| commit | b4dac41573e33e1a04a2b7b8c9a35f5e72b662bc (patch) | |
| tree | 15e25a2dc7b42f3788f9dccdd7e632dcf8c346d2 /cpp/src/qpid/broker/SemanticState.cpp | |
| parent | 928699508993a5ccc59254027773281883d1e973 (diff) | |
| download | qpid-python-b4dac41573e33e1a04a2b7b8c9a35f5e72b662bc.tar.gz | |
A further step to final 0-10 spec.
The extra.xml fragment adds class defs for connection in session that are in line with latest spec but use old schema.
The preview codepath (99-0) remains unaltered.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@633108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 37 |
1 files changed, 33 insertions, 4 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 9b44f31e14..e012d693fb 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -387,7 +387,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) ++end; } - for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1)); + for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1)); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -427,16 +427,16 @@ void SemanticState::requestDispatch(ConsumerImpl& c) } } -void SemanticState::acknowledged(const DeliveryRecord& delivery) +void SemanticState::adjustFlow(const DeliveryRecord& delivery) { delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - get_pointer(i)->acknowledged(delivery); + get_pointer(i)->adjustFlow(delivery); } } -void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) +void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery) { if (windowing) { if (msgCredit != 0xFFFFFFFF) msgCredit++; @@ -639,4 +639,33 @@ void SemanticState::ConsumerImpl::notify() parent->outputTasks.activateOutput(); } + +void SemanticState::accepted(DeliveryId first, DeliveryId last) +{ + AckRange range = findRange(first, last); + if (txBuffer.get()) { + //in transactional mode, don't dequeue or remove, just + //maintain set of acknowledged messages: + accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet + + if (dtxBuffer.get()) { + //if enlisted in a dtx, remove the relevant slice from + //unacked and record it against that transaction + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + } + } else { + for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); + unacked.erase(range.start, range.end); + } +} + +void SemanticState::completed(DeliveryId first, DeliveryId last) +{ + AckRange range = findRange(first, last); + for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1)); + requestDispatch(); +} + }} // namespace qpid::broker |
