diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-11 15:07:08 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-11 15:07:08 +0100 |
| commit | e7bc1d4c07ca924d85ffb8461077caba32b21ea3 (patch) | |
| tree | c2fdc8317aa55d44a230b25d8c8ba08e3bef9e9c | |
| parent | b204aaac6e787a1b28ceaa60ba10e07ab2635a72 (diff) | |
| download | rabbitmq-server-git-e7bc1d4c07ca924d85ffb8461077caba32b21ea3.tar.gz | |
Unregister waiting consumers in QQ when channel goes down
For single active consumer. Otherwise they still show up in the
management plugin. Worse, a consumer from a dead channel can end up
being the single active consumer, which stops deliveries.
References #1825
| -rw-r--r-- | src/rabbit_fifo.erl | 98 |
1 files changed, 89 insertions, 9 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 1536cd1f51..cc7568720d 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -480,8 +480,8 @@ apply(_, {down, ConsumerPid, noconnection}, end, %% TODO: should we run a checkout here? {State#state{consumers = Cons, enqueuers = Enqs}, ok, Effects}; -apply(_, {down, Pid, _Info}, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> +apply(_, {down, Pid, _Info}, #state{consumers = Cons0, + enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid State1 = case maps:take(Pid, Enqs0) of @@ -492,13 +492,14 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0, error -> State0 end, + {Effects1, State2} = maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, State1), % return checked out messages to main queue % Find the consumers for the down pid DownConsumers = maps:keys( maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), - {Effects1, State2} = lists:foldl(fun cancel_consumer/2, {[], State1}, + {Effects2, State3} = lists:foldl(fun cancel_consumer/2, {Effects1, State2}, DownConsumers), - checkout(State2, Effects1); + checkout(State3, Effects2); apply(_, {nodeup, Node}, #state{consumers = Cons0, enqueuers = Enqs0, service_queue = SQ0} = State0) -> @@ -538,6 +539,23 @@ apply(_, {nodedown, _Node}, State) -> apply(_, #update_config{config = Conf}, State) -> {update_config(Conf, State), ok}. +maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = default} = State) -> + {[], State}; +maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = single_active, + waiting_consumers = []} = State) -> + {[], State}; +maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers0} = State0) -> + % get cancel effects for down waiting consumers + DownWaitingConsumers = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, WaitingConsumers0), + Effects1 = lists:foldl(fun ({ConsumerId, _}, Effects) -> + cancel_consumer_effects(ConsumerId, State0, Effects) + end, [], DownWaitingConsumers), + % update state to have only up waiting consumers + WaitingConsumersStillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end, WaitingConsumers0), + State2 = State0#state{waiting_consumers = WaitingConsumersStillUp}, + {Effects1, State2}. + -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, @@ -739,8 +757,9 @@ cancel_consumer(ConsumerId, % The cancelled consumer is not the active one % Just remove it from idle_consumers {value, _Consumer, WaitingConsumers1} = lists:keytake(ConsumerId, 1, WaitingConsumers0), + Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), % A waiting consumer isn't supposed to have any checked out messages, so nothing special to do here - {Effects0, State0#state{waiting_consumers = WaitingConsumers1}} + {Effects, State0#state{waiting_consumers = WaitingConsumers1}} end. cancel_consumer0(ConsumerId, @@ -1993,7 +2012,7 @@ single_active_consumer_test() -> ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)), % cancelling a waiting consumer - {State2, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1), + {State2, _, Effects1} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1), % the active consumer should still be in place ?assertEqual(1, map_size(State2#state.consumers)), ?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)), @@ -2001,30 +2020,91 @@ single_active_consumer_test() -> ?assertEqual(2, length(State2#state.waiting_consumers)), ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#state.waiting_consumers)), ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#state.waiting_consumers)), + % there are some effects to unregister the consumer + ?assertEqual(1, length(Effects1)), % cancelling the active consumer - {State3, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2), + {State3, _, Effects2} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2), % the second registered consumer is now the active one ?assertEqual(1, map_size(State3#state.consumers)), ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)), % the new active consumer is no longer in the waiting list ?assertEqual(1, length(State3#state.waiting_consumers)), ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)), + % there are some effects to unregister the consumer + ?assertEqual(1, length(Effects2)), % cancelling the active consumer - {State4, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3), + {State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3), % the last waiting consumer became the active one ?assertEqual(1, map_size(State4#state.consumers)), ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)), % the waiting consumer list is now empty ?assertEqual(0, length(State4#state.waiting_consumers)), + % there are some effects to unregister the consumer + ?assertEqual(1, length(Effects3)), % cancelling the last consumer - {State5, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4), + {State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4), % no active consumer anymore ?assertEqual(0, map_size(State5#state.consumers)), % still nothing in the waiting list ?assertEqual(0, length(State5#state.waiting_consumers)), + % there is an effect to unregister the consumer + queue inactive effect + ?assertEqual(1 + 1, length(Effects4)), + + ok. + +single_active_consumer_cancel_consumer_when_channel_is_down_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, ChannelId}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + % the channel of the active consumer goes down + {State2, _, Effects} = apply(#{}, {down, Pid1, doesnotmatter}, State1), + % fell back to another consumer + ?assertEqual(1, map_size(State2#state.consumers)), + % there are still waiting consumers + ?assertEqual(2, length(State2#state.waiting_consumers)), + % the effect to unregister the consumer is there + ?assertEqual(1, length(Effects)), + + % the channel of the active consumer and a waiting consumer goes down + {State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2), + % fell back to another consumer + ?assertEqual(1, map_size(State3#state.consumers)), + % no more waiting consumer + ?assertEqual(0, length(State3#state.waiting_consumers)), + % effects to cancel both consumers of this channel + ?assertEqual(2, length(Effects2)), + + % the last channel goes down + {State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3), + % no more consumers + ?assertEqual(0, map_size(State4#state.consumers)), + ?assertEqual(0, length(State4#state.waiting_consumers)), + % there is an effect to unregister the consumer + queue inactive effect + ?assertEqual(1 + 1, length(Effects3)), ok. |
