diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 103 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 22 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 64 | ||||
| -rw-r--r-- | test/rabbit_fifo_int_SUITE.erl | 2 |
4 files changed, 158 insertions, 33 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index fb6f9ce770..1d53061a8a 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -62,6 +62,7 @@ make_discard/2, make_credit/4, make_purge/0, + make_purge_nodes/1, make_update_config/1 ]). @@ -83,6 +84,7 @@ delivery_count :: non_neg_integer(), drain :: boolean()}). -record(purge, {}). +-record(purge_nodes, {nodes :: [node()]}). -record(update_config, {config :: config()}). -opaque protocol() :: @@ -93,6 +95,7 @@ #discard{} | #credit{} | #purge{} | + #purge_nodes{} | #update_config{}. -type command() :: protocol() | ra_machine:builtin_command(). @@ -396,28 +399,9 @@ apply(Meta, {down, Pid, noconnection}, _ -> [{monitor, node, Node}] end ++ Effects1, - %% TODO: should we run a checkout here? checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); -apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, - enqueuers = Enqs0} = State0) -> - % Remove any enqueuer for the same pid and enqueue any pending messages - % This should be ok as we won't see any more enqueues from this pid - State1 = case maps:take(Pid, Enqs0) of - {#enqueuer{pending = Pend}, Enqs} -> - lists:foldl(fun ({_, RIdx, RawMsg}, S) -> - enqueue(RIdx, RawMsg, S) - end, State0#?MODULE{enqueuers = Enqs}, Pend); - error -> - State0 - end, - {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), - % return checked out messages to main queue - % Find the consumers for the down pid - DownConsumers = maps:keys( - maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), - {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) -> - cancel_consumer(ConsumerId, S, E, down) - end, {State2, Effects1}, DownConsumers), +apply(Meta, {down, Pid, _Info}, State0) -> + {State, Effects} = handle_down(Pid, State0), checkout(Meta, State, Effects); apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, enqueuers = Enqs0, @@ -448,16 +432,50 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, Acc end, {Cons0, SQ0, Monitors}, Cons0), Waiting = update_waiting_consumer_status(Node, State0, up), - State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, + State1 = State0#?MODULE{consumers = Cons1, + enqueuers = Enqs1, service_queue = SQ, waiting_consumers = Waiting}, {State, Effects} = activate_next_consumer(State1, Effects1), checkout(Meta, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; +apply(_, #purge_nodes{nodes = Nodes}, State0) -> + {State, Effects} = lists:foldl(fun(Node, {S, E}) -> + purge_node(Node, S, E) + end, {State0, []}, Nodes), + {State, ok, Effects}; apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []). +purge_node(Node, State, Effects) -> + lists:foldl(fun(Pid, {S0, E0}) -> + {S, E} = handle_down(Pid, S0), + {S, E0 ++ E} + end, {State, Effects}, all_pids_for(Node, State)). + +%% any downs that re not noconnection +handle_down(Pid, #?MODULE{consumers = Cons0, + enqueuers = Enqs0} = State0) -> + % Remove any enqueuer for the same pid and enqueue any pending messages + % This should be ok as we won't see any more enqueues from this pid + State1 = case maps:take(Pid, Enqs0) of + {#enqueuer{pending = Pend}, Enqs} -> + lists:foldl(fun ({_, RIdx, RawMsg}, S) -> + enqueue(RIdx, RawMsg, S) + end, State0#?MODULE{enqueuers = Enqs}, Pend); + error -> + State0 + end, + {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), + % return checked out messages to main queue + % Find the consumers for the down pid + DownConsumers = maps:keys( + maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), + lists:foldl(fun(ConsumerId, {S, E}) -> + cancel_consumer(ConsumerId, S, E, down) + end, {State2, Effects1}, DownConsumers). + consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, @@ -556,8 +574,10 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name, query_consumer_count(State), % Consumers EnqueueBytes, CheckoutBytes}, + %% TODO: call a handler that works out if any known nodes need to be + %% purged and emit a command effect to append this to the log [{mod_call, rabbit_quorum_queue, - handle_tick, [QName, Metrics]}, {aux, emit}]. + handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}]. -spec overview(state()) -> map(). overview(#?MODULE{consumers = Cons, @@ -1495,6 +1515,10 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> -spec make_purge() -> protocol(). make_purge() -> #purge{}. +-spec make_purge_nodes([node()]) -> protocol(). +make_purge_nodes(Nodes) -> + #purge_nodes{nodes = Nodes}. + -spec make_update_config(config()) -> protocol(). make_update_config(Config) -> #update_config{config = Config}. @@ -1532,6 +1556,39 @@ message_size(Msg) -> %% probably only hit this for testing so ok to use erts_debug erts_debug:size(Msg). +all_nodes(#?MODULE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Nodes0 = maps:fold(fun({_, P}, _, Acc) -> + Acc#{node(P) => ok} + end, #{}, Cons0), + Nodes1 = maps:fold(fun(P, _, Acc) -> + Acc#{node(P) => ok} + end, Nodes0, Enqs0), + maps:keys( + lists:foldl(fun({{_, P}, _}, Acc) -> + Acc#{node(P) => ok} + end, Nodes1, WaitingConsumers0)). + +all_pids_for(Node, #?MODULE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Cons = maps:fold(fun({_, P}, _, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, [], Cons0), + Enqs = maps:fold(fun(P, _, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, Cons, Enqs0), + lists:foldl(fun({{_, P}, _}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, Acc) -> Acc + end, Enqs, WaitingConsumers0). + suspected_pids_for(Node, #?MODULE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 260e36d510..76cd194d91 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -28,7 +28,7 @@ -export([cluster_state/1, status/2]). -export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). --export([become_leader/2, handle_tick/2]). +-export([become_leader/2, handle_tick/3]). -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). @@ -243,7 +243,9 @@ rpc_delete_metrics(QName) -> ets:delete(queue_metrics, QName), ok. -handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> +handle_tick(QName, + {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}, + Nodes) -> %% this makes calls to remote processes so cannot be run inside the %% ra server Self = self(), @@ -266,7 +268,21 @@ handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> {messages_ready, MR}, {messages_unacknowledged, MU}, {reductions, R}]), - ok = repair_leader_record(QName, Self) + ok = repair_leader_record(QName, Self), + ExpectedNodes = rabbit_mnesia:cluster_nodes(all), + case Nodes -- ExpectedNodes of + [] -> + ok; + Stale -> + rabbit_log:info("~s: stale nodes detected. Purging ~w~n", + [rabbit_misc:rs(QName), Stale]), + %% pipeline purge command + {ok, Q} = rabbit_amqqueue:lookup(QName), + ok = ra:pipeline_command(amqqueue:get_pid(Q), + rabbit_fifo:make_purge_nodes(Stale)), + + ok + end end), ok. diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 6582104708..310553dc56 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -480,9 +480,12 @@ tick_test(_) -> {S3, {_, _}} = deq(4, Cid2, unsettled, S2), {S4, _, _} = apply(meta(5), rabbit_fifo:make_return(Cid, [MsgId]), S3), - [{mod_call, _, _, + [{mod_call, rabbit_quorum_queue, handle_tick, [#resource{}, - {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = rabbit_fifo:tick(1, S4), + {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}, + [_Node] + ]}, + {aux, emit}] = rabbit_fifo:tick(1, S4), ok. @@ -921,10 +924,11 @@ single_active_consumer_all_disconnected_test(_) -> single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), + queue_resource => + rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), DummyFunction = fun() -> ok end, Pid1 = spawn(DummyFunction), @@ -1203,6 +1207,54 @@ single_active_with_credited_test(_) -> State3#rabbit_fifo.waiting_consumers), ok. +purge_nodes_test(_) -> + Node = purged@node, + ThisNode = node(), + EnqPid = test_util:fake_pid(Node), + EnqPid2 = test_util:fake_pid(node()), + ConPid = test_util:fake_pid(Node), + Cid = {<<"tag">>, ConPid}, + % WaitingPid = test_util:fake_pid(Node), + + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + single_active_consumer_on => false}), + {State1, _, _} = apply(meta(1), + rabbit_fifo:make_enqueue(EnqPid, 1, msg1), + State0), + {State2, _, _} = apply(meta(2), + rabbit_fifo:make_enqueue(EnqPid2, 1, msg2), + State1), + {State3, _} = check(Cid, 3, 1000, State2), + {State4, _, _} = apply(meta(4), + {down, EnqPid, noconnection}, + State3), + ?assertMatch( + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, _Metrics, + [ThisNode, Node] + ]}, + {aux, emit}] , rabbit_fifo:tick(1, State4)), + %% assert there are both enqueuers and consumers + {State, _, _} = apply(meta(5), + rabbit_fifo:make_purge_nodes([Node]), + State4), + + %% assert there are no enqueuers nor consumers + ?assertMatch(#rabbit_fifo{enqueuers = Enqs} when map_size(Enqs) == 1, + State), + + ?assertMatch(#rabbit_fifo{consumers = Cons} when map_size(Cons) == 0, + State), + ?assertMatch( + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, _Metrics, + [ThisNode] + ]}, + {aux, emit}] , rabbit_fifo:tick(1, State)), + ok. + meta(Idx) -> #{index => Idx, term => 1}. diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl index f281d15795..d4ae417a78 100644 --- a/test/rabbit_fifo_int_SUITE.erl +++ b/test/rabbit_fifo_int_SUITE.erl @@ -54,7 +54,7 @@ end_per_group(_, Config) -> init_per_testcase(TestCase, Config) -> meck:new(rabbit_quorum_queue, [passthrough]), - meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _) -> ok end), + meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end), meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end), ra_server_sup_sup:remove_all(), |
