summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl24
1 files changed, 14 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 97c7a346de..ba3ef78191 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -472,21 +472,25 @@ run_message_queue(Blocked, State) ->
end.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
- Props, Delivered, State) ->
+ Props, Delivered, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case rabbit_queue_consumers:deliver(
- fun (true, State2 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ fun (true, BQS) ->
true = BQ:is_empty(BQS),
{AckTag, BQS1} = BQ:publish_delivered(
Message, Props, SenderPid, BQS),
- {{Message, Delivered, AckTag},
- State2#q{backing_queue_state = BQS1}};
- (false, State2) ->
- {{Message, Delivered, undefined}, discard(Delivery, State2)}
- end, qname(State), State, State#q.consumers) of
- {delivered, Blocked, State1, Consumers} ->
+ {{Message, Delivered, AckTag}, {ack, BQS1}};
+ (false, _BQS) ->
+ {{Message, Delivered, undefined}, noack}
+ end, qname(State), BQS, State#q.consumers) of
+ {delivered, Blocked, {ack, BQS2}, Consumers} ->
{delivered, notify_decorators(
- Blocked, State1#q{consumers = Consumers})};
+ Blocked, State#q{backing_queue_state = BQS2,
+ consumers = Consumers})};
+ {delivered, Blocked, noack, Consumers} ->
+ {delivered, notify_decorators(
+ Blocked, discard(Delivery,
+ State#q{consumers = Consumers}))};
{undelivered, Blocked, Consumers} ->
{undelivered, notify_decorators(
Blocked, State#q{consumers = Consumers})}