summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl48
1 files changed, 22 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ba3ef78191..b5922bb6f3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -454,20 +454,19 @@ discard(#delivery{sender = SenderPid,
run_message_queue(State) -> run_message_queue([], State).
-run_message_queue(Blocked, State) ->
+run_message_queue(Blocked, State = #q{consumers = Consumers}) ->
case is_empty(State) of
- true -> notify_decorators(lists:append(Blocked), State);
+ true -> blocked(lists:append(Blocked), Consumers, State);
false -> case rabbit_queue_consumers:deliver(
fun(AckRequired, State1) ->
fetch(AckRequired, State1)
- end, qname(State), State, State#q.consumers) of
- {delivered, MoreBlocked, State2, Consumers} ->
+ end, qname(State), State, Consumers) of
+ {delivered, MoreBlocked, State2, Consumers1} ->
run_message_queue([MoreBlocked | Blocked],
- State2#q{consumers = Consumers});
- {undelivered, MoreBlocked, Consumers} ->
- notify_decorators(
- lists:append([MoreBlocked | Blocked]),
- State#q{consumers = Consumers})
+ State2#q{consumers = Consumers1});
+ {undelivered, MoreBlocked, Consumers1} ->
+ blocked(lists:append([MoreBlocked | Blocked]),
+ Consumers1, State)
end
end.
@@ -475,31 +474,28 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
case rabbit_queue_consumers:deliver(
- fun (true, BQS) ->
- true = BQ:is_empty(BQS),
- {AckTag, BQS1} = BQ:publish_delivered(
- Message, Props, SenderPid, BQS),
- {{Message, Delivered, AckTag}, {ack, BQS1}};
+ fun (true, BQS1) ->
+ true = BQ:is_empty(BQS1),
+ {AckTag, BQS2} = BQ:publish_delivered(
+ Message, Props, SenderPid, BQS1),
+ {{Message, Delivered, AckTag}, {ack, BQS2}};
(false, _BQS) ->
{{Message, Delivered, undefined}, noack}
end, qname(State), BQS, State#q.consumers) of
- {delivered, Blocked, {ack, BQS2}, Consumers} ->
- {delivered, notify_decorators(
- Blocked, State#q{backing_queue_state = BQS2,
- consumers = Consumers})};
+ {delivered, Blocked, {ack, BQS3}, Consumers} ->
+ {delivered, blocked(Blocked, Consumers,
+ State#q{backing_queue_state = BQS3})};
{delivered, Blocked, noack, Consumers} ->
- {delivered, notify_decorators(
- Blocked, discard(Delivery,
- State#q{consumers = Consumers}))};
+ {delivered, discard(Delivery, blocked(Blocked, Consumers, State))};
{undelivered, Blocked, Consumers} ->
- {undelivered, notify_decorators(
- Blocked, State#q{consumers = Consumers})}
+ {undelivered, blocked(Blocked, Consumers, State)}
end.
-notify_decorators(Blocked, State) ->
- [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State) ||
+blocked(Blocked, Consumers, State) ->
+ State1 = State#q{consumers = Consumers},
+ [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State1) ||
{_ChPid, CTag} <- Blocked],
- State.
+ State1.
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State = #q{backing_queue = BQ,