diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 53 |
1 files changed, 27 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 609fa0111c..d5893e9d7f 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -331,32 +331,33 @@ apply(Meta, {down, Pid, noconnection}, waiting_consumers = Waiting0, enqueuers = Enqs0} = State0) -> Node = node(Pid), - %% if the pid refers to the active consumer, mark it as suspected and return - %% it to the waiting queue + %% if the pid refers to an active or cancelled consumer, + %% mark it as suspected and return it to the waiting queue {State1, Effects0} = - case maps:to_list(Cons0) of - [{{_, P} = Cid, C0}] when node(P) =:= Node -> - %% the consumer should be returned to waiting - %% and checked out messages should be returned - Effs = consumer_update_active_effects( - State0, Cid, C0, false, suspected_down, []), - Checked = C0#consumer.checked_out, - Credit = increase_credit(C0, maps:size(Checked)), - {St, Effs1} = return_all(State0, Effs, - Cid, C0#consumer{credit = Credit}), - %% if the consumer was cancelled there is a chance it got - %% removed when returning hence we need to be defensive here - Waiting = case St#?MODULE.consumers of - #{Cid := C} -> - Waiting0 ++ [{Cid, C}]; - _ -> - Waiting0 - end, - {St#?MODULE{consumers = #{}, - waiting_consumers = Waiting}, - Effs1}; - _ -> {State0, []} - end, + maps:fold(fun({_, P} = Cid, C0, {S0, E0}) + when node(P) =:= Node -> + %% the consumer should be returned to waiting + %% and checked out messages should be returned + Effs = consumer_update_active_effects( + S0, Cid, C0, false, suspected_down, E0), + Checked = C0#consumer.checked_out, + Credit = increase_credit(C0, maps:size(Checked)), + {St, Effs1} = return_all(S0, Effs, + Cid, C0#consumer{credit = Credit}), + %% if the consumer was cancelled there is a chance it got + %% removed when returning hence we need to be defensive here + Waiting = case St#?MODULE.consumers of + #{Cid := C} -> + Waiting0 ++ [{Cid, C}]; + _ -> + Waiting0 + end, + {St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers), + waiting_consumers = Waiting}, + Effs1}; + (_, _, S) -> + S + end, {State0, []}, Cons0), WaitingConsumers = update_waiting_consumer_status(Node, State1, suspected_down), @@ -822,7 +823,7 @@ cancel_consumer(ConsumerId, Effects0, Reason), activate_next_consumer(State1, Effects1); false -> - % The cancelled consumer is not the active one + % The cancelled consumer is not active or cancelled % Just remove it from idle_consumers Waiting = lists:keydelete(ConsumerId, 1, Waiting0), Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), |
