summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/BrokerChannel.cpp')
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp29
1 files changed, 21 insertions, 8 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 74e5504f17..674d0e9505 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -235,24 +235,37 @@ void Channel::complete(Message::shared_ptr msg) {
// TODO astitcher 2007-02-08 This only deals correctly with non batched responses
void Channel::ack(){
- ack(getRequestInProgress(), false);
+ ack(getFirstAckRequest(), getLastAckRequest());
}
-void Channel::ack(u_int64_t deliveryTag, bool multiple)
-{
+// Used by Basic
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ if (multiple)
+ ack(0, deliveryTag);
+ else
+ ack(deliveryTag, deliveryTag);
+}
+
+void Channel::ack(u_int64_t firstTag, u_int64_t lastTag){
if(transactional){
- accumulatedAck.update(deliveryTag, multiple);
+ //FIXME astitcher This only works for Basic style acks
+ accumulatedAck.update(lastTag, lastTag);
+
//TODO: I think the outstanding prefetch size & count should be updated at this point...
//TODO: ...this may then necessitate dispatching to consumers
}else{
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
- ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
+ ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
+ ack_iterator j = (firstTag == 0) ?
+ unacked.begin() :
+ find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
+
if(i == unacked.end()){
- throw InvalidAckException();
- }else if(multiple){
+ throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+ }else if(i!=j){
ack_iterator end = ++i;
- for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard));
+ for_each(j, end, mem_fun_ref(&DeliveryRecord::discard));
unacked.erase(unacked.begin(), end);
//recalculate the prefetch: