diff options
Diffstat (limited to 'cpp/lib/broker/BrokerChannel.cpp')
| -rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 29 |
1 files changed, 21 insertions, 8 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 74e5504f17..674d0e9505 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -235,24 +235,37 @@ void Channel::complete(Message::shared_ptr msg) { // TODO astitcher 2007-02-08 This only deals correctly with non batched responses void Channel::ack(){ - ack(getRequestInProgress(), false); + ack(getFirstAckRequest(), getLastAckRequest()); } -void Channel::ack(u_int64_t deliveryTag, bool multiple) -{ +// Used by Basic +void Channel::ack(u_int64_t deliveryTag, bool multiple){ + if (multiple) + ack(0, deliveryTag); + else + ack(deliveryTag, deliveryTag); +} + +void Channel::ack(u_int64_t firstTag, u_int64_t lastTag){ if(transactional){ - accumulatedAck.update(deliveryTag, multiple); + //FIXME astitcher This only works for Basic style acks + accumulatedAck.update(lastTag, lastTag); + //TODO: I think the outstanding prefetch size & count should be updated at this point... //TODO: ...this may then necessitate dispatching to consumers }else{ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery - ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag)); + ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag)); + ack_iterator j = (firstTag == 0) ? + unacked.begin() : + find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag)); + if(i == unacked.end()){ - throw InvalidAckException(); - }else if(multiple){ + throw ConnectionException(530, "Received ack for unrecognised delivery tag"); + }else if(i!=j){ ack_iterator end = ++i; - for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard)); + for_each(j, end, mem_fun_ref(&DeliveryRecord::discard)); unacked.erase(unacked.begin(), end); //recalculate the prefetch: |
