summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-11 15:07:08 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-11 15:07:08 +0100
commite7bc1d4c07ca924d85ffb8461077caba32b21ea3 (patch)
treec2fdc8317aa55d44a230b25d8c8ba08e3bef9e9c
parentb204aaac6e787a1b28ceaa60ba10e07ab2635a72 (diff)
downloadrabbitmq-server-git-e7bc1d4c07ca924d85ffb8461077caba32b21ea3.tar.gz
Unregister waiting consumers in QQ when channel goes down
For single active consumer. Otherwise they still show up in the management plugin. Worse, a consumer from a dead channel can end up being the single active consumer, which stops deliveries. References #1825
-rw-r--r--src/rabbit_fifo.erl98
1 files changed, 89 insertions, 9 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 1536cd1f51..cc7568720d 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -480,8 +480,8 @@ apply(_, {down, ConsumerPid, noconnection},
end,
%% TODO: should we run a checkout here?
{State#state{consumers = Cons, enqueuers = Enqs}, ok, Effects};
-apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
@@ -492,13 +492,14 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
error ->
State0
end,
+ {Effects1, State2} = maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, State1),
% return checked out messages to main queue
% Find the consumers for the down pid
DownConsumers = maps:keys(
maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
- {Effects1, State2} = lists:foldl(fun cancel_consumer/2, {[], State1},
+ {Effects2, State3} = lists:foldl(fun cancel_consumer/2, {Effects1, State2},
DownConsumers),
- checkout(State2, Effects1);
+ checkout(State3, Effects2);
apply(_, {nodeup, Node}, #state{consumers = Cons0,
enqueuers = Enqs0,
service_queue = SQ0} = State0) ->
@@ -538,6 +539,23 @@ apply(_, {nodedown, _Node}, State) ->
apply(_, #update_config{config = Conf}, State) ->
{update_config(Conf, State), ok}.
+maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = default} = State) ->
+ {[], State};
+maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = single_active,
+ waiting_consumers = []} = State) ->
+ {[], State};
+maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0} = State0) ->
+ % get cancel effects for down waiting consumers
+ DownWaitingConsumers = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, WaitingConsumers0),
+ Effects1 = lists:foldl(fun ({ConsumerId, _}, Effects) ->
+ cancel_consumer_effects(ConsumerId, State0, Effects)
+ end, [], DownWaitingConsumers),
+ % update state to have only up waiting consumers
+ WaitingConsumersStillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end, WaitingConsumers0),
+ State2 = State0#state{waiting_consumers = WaitingConsumersStillUp},
+ {Effects1, State2}.
+
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
@@ -739,8 +757,9 @@ cancel_consumer(ConsumerId,
% The cancelled consumer is not the active one
% Just remove it from idle_consumers
{value, _Consumer, WaitingConsumers1} = lists:keytake(ConsumerId, 1, WaitingConsumers0),
+ Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),
% A waiting consumer isn't supposed to have any checked out messages, so nothing special to do here
- {Effects0, State0#state{waiting_consumers = WaitingConsumers1}}
+ {Effects, State0#state{waiting_consumers = WaitingConsumers1}}
end.
cancel_consumer0(ConsumerId,
@@ -1993,7 +2012,7 @@ single_active_consumer_test() ->
?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)),
% cancelling a waiting consumer
- {State2, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
+ {State2, _, Effects1} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
% the active consumer should still be in place
?assertEqual(1, map_size(State2#state.consumers)),
?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)),
@@ -2001,30 +2020,91 @@ single_active_consumer_test() ->
?assertEqual(2, length(State2#state.waiting_consumers)),
?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#state.waiting_consumers)),
?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#state.waiting_consumers)),
+ % there are some effects to unregister the consumer
+ ?assertEqual(1, length(Effects1)),
% cancelling the active consumer
- {State3, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
+ {State3, _, Effects2} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
% the second registered consumer is now the active one
?assertEqual(1, map_size(State3#state.consumers)),
?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
% the new active consumer is no longer in the waiting list
?assertEqual(1, length(State3#state.waiting_consumers)),
?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)),
+ % there are some effects to unregister the consumer
+ ?assertEqual(1, length(Effects2)),
% cancelling the active consumer
- {State4, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
+ {State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
% the last waiting consumer became the active one
?assertEqual(1, map_size(State4#state.consumers)),
?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
% the waiting consumer list is now empty
?assertEqual(0, length(State4#state.waiting_consumers)),
+ % there are some effects to unregister the consumer
+ ?assertEqual(1, length(Effects3)),
% cancelling the last consumer
- {State5, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
+ {State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
% no active consumer anymore
?assertEqual(0, map_size(State5#state.consumers)),
% still nothing in the waiting list
?assertEqual(0, length(State5#state.waiting_consumers)),
+ % there is an effect to unregister the consumer + queue inactive effect
+ ?assertEqual(1 + 1, length(Effects4)),
+
+ ok.
+
+single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, ChannelId}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ % the channel of the active consumer goes down
+ {State2, _, Effects} = apply(#{}, {down, Pid1, doesnotmatter}, State1),
+ % fell back to another consumer
+ ?assertEqual(1, map_size(State2#state.consumers)),
+ % there are still waiting consumers
+ ?assertEqual(2, length(State2#state.waiting_consumers)),
+ % the effect to unregister the consumer is there
+ ?assertEqual(1, length(Effects)),
+
+ % the channel of the active consumer and a waiting consumer goes down
+ {State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2),
+ % fell back to another consumer
+ ?assertEqual(1, map_size(State3#state.consumers)),
+ % no more waiting consumer
+ ?assertEqual(0, length(State3#state.waiting_consumers)),
+ % effects to cancel both consumers of this channel
+ ?assertEqual(2, length(Effects2)),
+
+ % the last channel goes down
+ {State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3),
+ % no more consumers
+ ?assertEqual(0, map_size(State4#state.consumers)),
+ ?assertEqual(0, length(State4#state.waiting_consumers)),
+ % there is an effect to unregister the consumer + queue inactive effect
+ ?assertEqual(1 + 1, length(Effects3)),
ok.