summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl103
-rw-r--r--src/rabbit_quorum_queue.erl22
-rw-r--r--test/rabbit_fifo_SUITE.erl64
-rw-r--r--test/rabbit_fifo_int_SUITE.erl2
4 files changed, 158 insertions, 33 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index fb6f9ce770..1d53061a8a 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -62,6 +62,7 @@
make_discard/2,
make_credit/4,
make_purge/0,
+ make_purge_nodes/1,
make_update_config/1
]).
@@ -83,6 +84,7 @@
delivery_count :: non_neg_integer(),
drain :: boolean()}).
-record(purge, {}).
+-record(purge_nodes, {nodes :: [node()]}).
-record(update_config, {config :: config()}).
-opaque protocol() ::
@@ -93,6 +95,7 @@
#discard{} |
#credit{} |
#purge{} |
+ #purge_nodes{} |
#update_config{}.
-type command() :: protocol() | ra_machine:builtin_command().
@@ -396,28 +399,9 @@ apply(Meta, {down, Pid, noconnection},
_ ->
[{monitor, node, Node}]
end ++ Effects1,
- %% TODO: should we run a checkout here?
checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
-apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
- % Remove any enqueuer for the same pid and enqueue any pending messages
- % This should be ok as we won't see any more enqueues from this pid
- State1 = case maps:take(Pid, Enqs0) of
- {#enqueuer{pending = Pend}, Enqs} ->
- lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
- enqueue(RIdx, RawMsg, S)
- end, State0#?MODULE{enqueuers = Enqs}, Pend);
- error ->
- State0
- end,
- {Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
- % return checked out messages to main queue
- % Find the consumers for the down pid
- DownConsumers = maps:keys(
- maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
- {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) ->
- cancel_consumer(ConsumerId, S, E, down)
- end, {State2, Effects1}, DownConsumers),
+apply(Meta, {down, Pid, _Info}, State0) ->
+ {State, Effects} = handle_down(Pid, State0),
checkout(Meta, State, Effects);
apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
@@ -448,16 +432,50 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
Waiting = update_waiting_consumer_status(Node, State0, up),
- State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1,
+ State1 = State0#?MODULE{consumers = Cons1,
+ enqueuers = Enqs1,
service_queue = SQ,
waiting_consumers = Waiting},
{State, Effects} = activate_next_consumer(State1, Effects1),
checkout(Meta, State, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
+apply(_, #purge_nodes{nodes = Nodes}, State0) ->
+ {State, Effects} = lists:foldl(fun(Node, {S, E}) ->
+ purge_node(Node, S, E)
+ end, {State0, []}, Nodes),
+ {State, ok, Effects};
apply(Meta, #update_config{config = Conf}, State) ->
checkout(Meta, update_config(Conf, State), []).
+purge_node(Node, State, Effects) ->
+ lists:foldl(fun(Pid, {S0, E0}) ->
+ {S, E} = handle_down(Pid, S0),
+ {S, E0 ++ E}
+ end, {State, Effects}, all_pids_for(Node, State)).
+
+%% any downs that re not noconnection
+handle_down(Pid, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
+ % Remove any enqueuer for the same pid and enqueue any pending messages
+ % This should be ok as we won't see any more enqueues from this pid
+ State1 = case maps:take(Pid, Enqs0) of
+ {#enqueuer{pending = Pend}, Enqs} ->
+ lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
+ enqueue(RIdx, RawMsg, S)
+ end, State0#?MODULE{enqueuers = Enqs}, Pend);
+ error ->
+ State0
+ end,
+ {Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
+ % return checked out messages to main queue
+ % Find the consumers for the down pid
+ DownConsumers = maps:keys(
+ maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
+ lists:foldl(fun(ConsumerId, {S, E}) ->
+ cancel_consumer(ConsumerId, S, E, down)
+ end, {State2, Effects1}, DownConsumers).
+
consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
consumer_update_active_effects(State, ConsumerId, Consumer, Active,
@@ -556,8 +574,10 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
query_consumer_count(State), % Consumers
EnqueueBytes,
CheckoutBytes},
+ %% TODO: call a handler that works out if any known nodes need to be
+ %% purged and emit a command effect to append this to the log
[{mod_call, rabbit_quorum_queue,
- handle_tick, [QName, Metrics]}, {aux, emit}].
+ handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}].
-spec overview(state()) -> map().
overview(#?MODULE{consumers = Cons,
@@ -1495,6 +1515,10 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
-spec make_purge() -> protocol().
make_purge() -> #purge{}.
+-spec make_purge_nodes([node()]) -> protocol().
+make_purge_nodes(Nodes) ->
+ #purge_nodes{nodes = Nodes}.
+
-spec make_update_config(config()) -> protocol().
make_update_config(Config) ->
#update_config{config = Config}.
@@ -1532,6 +1556,39 @@ message_size(Msg) ->
%% probably only hit this for testing so ok to use erts_debug
erts_debug:size(Msg).
+all_nodes(#?MODULE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Nodes0 = maps:fold(fun({_, P}, _, Acc) ->
+ Acc#{node(P) => ok}
+ end, #{}, Cons0),
+ Nodes1 = maps:fold(fun(P, _, Acc) ->
+ Acc#{node(P) => ok}
+ end, Nodes0, Enqs0),
+ maps:keys(
+ lists:foldl(fun({{_, P}, _}, Acc) ->
+ Acc#{node(P) => ok}
+ end, Nodes1, WaitingConsumers0)).
+
+all_pids_for(Node, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Cons = maps:fold(fun({_, P}, _, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, [], Cons0),
+ Enqs = maps:fold(fun(P, _, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, Cons, Enqs0),
+ lists:foldl(fun({{_, P}, _}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, Acc) -> Acc
+ end, Enqs, WaitingConsumers0).
+
suspected_pids_for(Node, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 260e36d510..76cd194d91 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -28,7 +28,7 @@
-export([cluster_state/1, status/2]).
-export([update_consumer_handler/8, update_consumer/9]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
--export([become_leader/2, handle_tick/2]).
+-export([become_leader/2, handle_tick/3]).
-export([rpc_delete_metrics/1]).
-export([format/1]).
-export([open_files/1]).
@@ -243,7 +243,9 @@ rpc_delete_metrics(QName) ->
ets:delete(queue_metrics, QName),
ok.
-handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
+handle_tick(QName,
+ {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
+ Nodes) ->
%% this makes calls to remote processes so cannot be run inside the
%% ra server
Self = self(),
@@ -266,7 +268,21 @@ handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
{messages_ready, MR},
{messages_unacknowledged, MU},
{reductions, R}]),
- ok = repair_leader_record(QName, Self)
+ ok = repair_leader_record(QName, Self),
+ ExpectedNodes = rabbit_mnesia:cluster_nodes(all),
+ case Nodes -- ExpectedNodes of
+ [] ->
+ ok;
+ Stale ->
+ rabbit_log:info("~s: stale nodes detected. Purging ~w~n",
+ [rabbit_misc:rs(QName), Stale]),
+ %% pipeline purge command
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ ok = ra:pipeline_command(amqqueue:get_pid(Q),
+ rabbit_fifo:make_purge_nodes(Stale)),
+
+ ok
+ end
end),
ok.
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 6582104708..310553dc56 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -480,9 +480,12 @@ tick_test(_) ->
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
{S4, _, _} = apply(meta(5), rabbit_fifo:make_return(Cid, [MsgId]), S3),
- [{mod_call, _, _,
+ [{mod_call, rabbit_quorum_queue, handle_tick,
[#resource{},
- {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = rabbit_fifo:tick(1, S4),
+ {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3},
+ [_Node]
+ ]},
+ {aux, emit}] = rabbit_fifo:tick(1, S4),
ok.
@@ -921,10 +924,11 @@ single_active_consumer_all_disconnected_test(_) ->
single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
+ queue_resource =>
+ rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
DummyFunction = fun() -> ok end,
Pid1 = spawn(DummyFunction),
@@ -1203,6 +1207,54 @@ single_active_with_credited_test(_) ->
State3#rabbit_fifo.waiting_consumers),
ok.
+purge_nodes_test(_) ->
+ Node = purged@node,
+ ThisNode = node(),
+ EnqPid = test_util:fake_pid(Node),
+ EnqPid2 = test_util:fake_pid(node()),
+ ConPid = test_util:fake_pid(Node),
+ Cid = {<<"tag">>, ConPid},
+ % WaitingPid = test_util:fake_pid(Node),
+
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ single_active_consumer_on => false}),
+ {State1, _, _} = apply(meta(1),
+ rabbit_fifo:make_enqueue(EnqPid, 1, msg1),
+ State0),
+ {State2, _, _} = apply(meta(2),
+ rabbit_fifo:make_enqueue(EnqPid2, 1, msg2),
+ State1),
+ {State3, _} = check(Cid, 3, 1000, State2),
+ {State4, _, _} = apply(meta(4),
+ {down, EnqPid, noconnection},
+ State3),
+ ?assertMatch(
+ [{mod_call, rabbit_quorum_queue, handle_tick,
+ [#resource{}, _Metrics,
+ [ThisNode, Node]
+ ]},
+ {aux, emit}] , rabbit_fifo:tick(1, State4)),
+ %% assert there are both enqueuers and consumers
+ {State, _, _} = apply(meta(5),
+ rabbit_fifo:make_purge_nodes([Node]),
+ State4),
+
+ %% assert there are no enqueuers nor consumers
+ ?assertMatch(#rabbit_fifo{enqueuers = Enqs} when map_size(Enqs) == 1,
+ State),
+
+ ?assertMatch(#rabbit_fifo{consumers = Cons} when map_size(Cons) == 0,
+ State),
+ ?assertMatch(
+ [{mod_call, rabbit_quorum_queue, handle_tick,
+ [#resource{}, _Metrics,
+ [ThisNode]
+ ]},
+ {aux, emit}] , rabbit_fifo:tick(1, State)),
+ ok.
+
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl
index f281d15795..d4ae417a78 100644
--- a/test/rabbit_fifo_int_SUITE.erl
+++ b/test/rabbit_fifo_int_SUITE.erl
@@ -54,7 +54,7 @@ end_per_group(_, Config) ->
init_per_testcase(TestCase, Config) ->
meck:new(rabbit_quorum_queue, [passthrough]),
- meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _) -> ok end),
+ meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
fun (_, _) -> ok end),
ra_server_sup_sup:remove_all(),