summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-04-29 18:02:05 +0100
committerkjnilsson <knilsson@pivotal.io>2019-04-30 10:05:49 +0100
commitde686a07acf0a24bca14cfe076a6e2b0e6e810d7 (patch)
tree021c61a538cc25bf3336c5f192a8549ef0797d35 /src
parent80df800a688440d047885c977578af38b6699760 (diff)
downloadrabbitmq-server-git-de686a07acf0a24bca14cfe076a6e2b0e6e810d7.tar.gz
QQ SAC: process all consumers on noconnection
There can be multiple single active consumers in the consumers map if all but one are cancelled. Take this into account when processing noconnections. [#165438843]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl53
1 files changed, 27 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 609fa0111c..d5893e9d7f 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -331,32 +331,33 @@ apply(Meta, {down, Pid, noconnection},
waiting_consumers = Waiting0,
enqueuers = Enqs0} = State0) ->
Node = node(Pid),
- %% if the pid refers to the active consumer, mark it as suspected and return
- %% it to the waiting queue
+ %% if the pid refers to an active or cancelled consumer,
+ %% mark it as suspected and return it to the waiting queue
{State1, Effects0} =
- case maps:to_list(Cons0) of
- [{{_, P} = Cid, C0}] when node(P) =:= Node ->
- %% the consumer should be returned to waiting
- %% and checked out messages should be returned
- Effs = consumer_update_active_effects(
- State0, Cid, C0, false, suspected_down, []),
- Checked = C0#consumer.checked_out,
- Credit = increase_credit(C0, maps:size(Checked)),
- {St, Effs1} = return_all(State0, Effs,
- Cid, C0#consumer{credit = Credit}),
- %% if the consumer was cancelled there is a chance it got
- %% removed when returning hence we need to be defensive here
- Waiting = case St#?MODULE.consumers of
- #{Cid := C} ->
- Waiting0 ++ [{Cid, C}];
- _ ->
- Waiting0
- end,
- {St#?MODULE{consumers = #{},
- waiting_consumers = Waiting},
- Effs1};
- _ -> {State0, []}
- end,
+ maps:fold(fun({_, P} = Cid, C0, {S0, E0})
+ when node(P) =:= Node ->
+ %% the consumer should be returned to waiting
+ %% and checked out messages should be returned
+ Effs = consumer_update_active_effects(
+ S0, Cid, C0, false, suspected_down, E0),
+ Checked = C0#consumer.checked_out,
+ Credit = increase_credit(C0, maps:size(Checked)),
+ {St, Effs1} = return_all(S0, Effs,
+ Cid, C0#consumer{credit = Credit}),
+ %% if the consumer was cancelled there is a chance it got
+ %% removed when returning hence we need to be defensive here
+ Waiting = case St#?MODULE.consumers of
+ #{Cid := C} ->
+ Waiting0 ++ [{Cid, C}];
+ _ ->
+ Waiting0
+ end,
+ {St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers),
+ waiting_consumers = Waiting},
+ Effs1};
+ (_, _, S) ->
+ S
+ end, {State0, []}, Cons0),
WaitingConsumers = update_waiting_consumer_status(Node, State1,
suspected_down),
@@ -822,7 +823,7 @@ cancel_consumer(ConsumerId,
Effects0, Reason),
activate_next_consumer(State1, Effects1);
false ->
- % The cancelled consumer is not the active one
+ % The cancelled consumer is not active or cancelled
% Just remove it from idle_consumers
Waiting = lists:keydelete(ConsumerId, 1, Waiting0),
Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),