diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 97c7a346de..ba3ef78191 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -472,21 +472,25 @@ run_message_queue(Blocked, State) -> end. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, - Props, Delivered, State) -> + Props, Delivered, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> case rabbit_queue_consumers:deliver( - fun (true, State2 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + fun (true, BQS) -> true = BQ:is_empty(BQS), {AckTag, BQS1} = BQ:publish_delivered( Message, Props, SenderPid, BQS), - {{Message, Delivered, AckTag}, - State2#q{backing_queue_state = BQS1}}; - (false, State2) -> - {{Message, Delivered, undefined}, discard(Delivery, State2)} - end, qname(State), State, State#q.consumers) of - {delivered, Blocked, State1, Consumers} -> + {{Message, Delivered, AckTag}, {ack, BQS1}}; + (false, _BQS) -> + {{Message, Delivered, undefined}, noack} + end, qname(State), BQS, State#q.consumers) of + {delivered, Blocked, {ack, BQS2}, Consumers} -> {delivered, notify_decorators( - Blocked, State1#q{consumers = Consumers})}; + Blocked, State#q{backing_queue_state = BQS2, + consumers = Consumers})}; + {delivered, Blocked, noack, Consumers} -> + {delivered, notify_decorators( + Blocked, discard(Delivery, + State#q{consumers = Consumers}))}; {undelivered, Blocked, Consumers} -> {undelivered, notify_decorators( Blocked, State#q{consumers = Consumers})} |
