summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-03-18 08:06:59 +0000
committerkjnilsson <knilsson@pivotal.io>2019-03-27 16:44:42 +0000
commit9dfa931d3275513da2d63e2913eef94d2c936311 (patch)
treebf4da8fd710f55f9e50eb1568df09d2db378f029 /src
parentc722c96ee6a9afe6d2fcf972e32f9ed1f6251291 (diff)
downloadrabbitmq-server-git-9dfa931d3275513da2d63e2913eef94d2c936311.tar.gz
Purge stale node state from quorum queues
When a node is disconnected then removed from the RabbitMQ cluster it is possibly that quorum queues retain some state for this node. This change purges any enqueuer or consumer state for pids relating to nodes that are not in the RabbitMQ cluster. [#164214265]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl103
-rw-r--r--src/rabbit_quorum_queue.erl22
2 files changed, 99 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index fb6f9ce770..1d53061a8a 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -62,6 +62,7 @@
make_discard/2,
make_credit/4,
make_purge/0,
+ make_purge_nodes/1,
make_update_config/1
]).
@@ -83,6 +84,7 @@
delivery_count :: non_neg_integer(),
drain :: boolean()}).
-record(purge, {}).
+-record(purge_nodes, {nodes :: [node()]}).
-record(update_config, {config :: config()}).
-opaque protocol() ::
@@ -93,6 +95,7 @@
#discard{} |
#credit{} |
#purge{} |
+ #purge_nodes{} |
#update_config{}.
-type command() :: protocol() | ra_machine:builtin_command().
@@ -396,28 +399,9 @@ apply(Meta, {down, Pid, noconnection},
_ ->
[{monitor, node, Node}]
end ++ Effects1,
- %% TODO: should we run a checkout here?
checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
-apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
- % Remove any enqueuer for the same pid and enqueue any pending messages
- % This should be ok as we won't see any more enqueues from this pid
- State1 = case maps:take(Pid, Enqs0) of
- {#enqueuer{pending = Pend}, Enqs} ->
- lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
- enqueue(RIdx, RawMsg, S)
- end, State0#?MODULE{enqueuers = Enqs}, Pend);
- error ->
- State0
- end,
- {Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
- % return checked out messages to main queue
- % Find the consumers for the down pid
- DownConsumers = maps:keys(
- maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
- {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) ->
- cancel_consumer(ConsumerId, S, E, down)
- end, {State2, Effects1}, DownConsumers),
+apply(Meta, {down, Pid, _Info}, State0) ->
+ {State, Effects} = handle_down(Pid, State0),
checkout(Meta, State, Effects);
apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
@@ -448,16 +432,50 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
Waiting = update_waiting_consumer_status(Node, State0, up),
- State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1,
+ State1 = State0#?MODULE{consumers = Cons1,
+ enqueuers = Enqs1,
service_queue = SQ,
waiting_consumers = Waiting},
{State, Effects} = activate_next_consumer(State1, Effects1),
checkout(Meta, State, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
+apply(_, #purge_nodes{nodes = Nodes}, State0) ->
+ {State, Effects} = lists:foldl(fun(Node, {S, E}) ->
+ purge_node(Node, S, E)
+ end, {State0, []}, Nodes),
+ {State, ok, Effects};
apply(Meta, #update_config{config = Conf}, State) ->
checkout(Meta, update_config(Conf, State), []).
+purge_node(Node, State, Effects) ->
+ lists:foldl(fun(Pid, {S0, E0}) ->
+ {S, E} = handle_down(Pid, S0),
+ {S, E0 ++ E}
+ end, {State, Effects}, all_pids_for(Node, State)).
+
+%% any downs that re not noconnection
+handle_down(Pid, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
+ % Remove any enqueuer for the same pid and enqueue any pending messages
+ % This should be ok as we won't see any more enqueues from this pid
+ State1 = case maps:take(Pid, Enqs0) of
+ {#enqueuer{pending = Pend}, Enqs} ->
+ lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
+ enqueue(RIdx, RawMsg, S)
+ end, State0#?MODULE{enqueuers = Enqs}, Pend);
+ error ->
+ State0
+ end,
+ {Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
+ % return checked out messages to main queue
+ % Find the consumers for the down pid
+ DownConsumers = maps:keys(
+ maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
+ lists:foldl(fun(ConsumerId, {S, E}) ->
+ cancel_consumer(ConsumerId, S, E, down)
+ end, {State2, Effects1}, DownConsumers).
+
consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
consumer_update_active_effects(State, ConsumerId, Consumer, Active,
@@ -556,8 +574,10 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
query_consumer_count(State), % Consumers
EnqueueBytes,
CheckoutBytes},
+ %% TODO: call a handler that works out if any known nodes need to be
+ %% purged and emit a command effect to append this to the log
[{mod_call, rabbit_quorum_queue,
- handle_tick, [QName, Metrics]}, {aux, emit}].
+ handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}].
-spec overview(state()) -> map().
overview(#?MODULE{consumers = Cons,
@@ -1495,6 +1515,10 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
-spec make_purge() -> protocol().
make_purge() -> #purge{}.
+-spec make_purge_nodes([node()]) -> protocol().
+make_purge_nodes(Nodes) ->
+ #purge_nodes{nodes = Nodes}.
+
-spec make_update_config(config()) -> protocol().
make_update_config(Config) ->
#update_config{config = Config}.
@@ -1532,6 +1556,39 @@ message_size(Msg) ->
%% probably only hit this for testing so ok to use erts_debug
erts_debug:size(Msg).
+all_nodes(#?MODULE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Nodes0 = maps:fold(fun({_, P}, _, Acc) ->
+ Acc#{node(P) => ok}
+ end, #{}, Cons0),
+ Nodes1 = maps:fold(fun(P, _, Acc) ->
+ Acc#{node(P) => ok}
+ end, Nodes0, Enqs0),
+ maps:keys(
+ lists:foldl(fun({{_, P}, _}, Acc) ->
+ Acc#{node(P) => ok}
+ end, Nodes1, WaitingConsumers0)).
+
+all_pids_for(Node, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Cons = maps:fold(fun({_, P}, _, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, [], Cons0),
+ Enqs = maps:fold(fun(P, _, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, Cons, Enqs0),
+ lists:foldl(fun({{_, P}, _}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, Acc) -> Acc
+ end, Enqs, WaitingConsumers0).
+
suspected_pids_for(Node, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 260e36d510..76cd194d91 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -28,7 +28,7 @@
-export([cluster_state/1, status/2]).
-export([update_consumer_handler/8, update_consumer/9]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
--export([become_leader/2, handle_tick/2]).
+-export([become_leader/2, handle_tick/3]).
-export([rpc_delete_metrics/1]).
-export([format/1]).
-export([open_files/1]).
@@ -243,7 +243,9 @@ rpc_delete_metrics(QName) ->
ets:delete(queue_metrics, QName),
ok.
-handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
+handle_tick(QName,
+ {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
+ Nodes) ->
%% this makes calls to remote processes so cannot be run inside the
%% ra server
Self = self(),
@@ -266,7 +268,21 @@ handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
{messages_ready, MR},
{messages_unacknowledged, MU},
{reductions, R}]),
- ok = repair_leader_record(QName, Self)
+ ok = repair_leader_record(QName, Self),
+ ExpectedNodes = rabbit_mnesia:cluster_nodes(all),
+ case Nodes -- ExpectedNodes of
+ [] ->
+ ok;
+ Stale ->
+ rabbit_log:info("~s: stale nodes detected. Purging ~w~n",
+ [rabbit_misc:rs(QName), Stale]),
+ %% pipeline purge command
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ ok = ra:pipeline_command(amqqueue:get_pid(Q),
+ rabbit_fifo:make_purge_nodes(Stale)),
+
+ ok
+ end
end),
ok.