diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-03-18 08:06:59 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-03-27 16:44:42 +0000 |
| commit | 9dfa931d3275513da2d63e2913eef94d2c936311 (patch) | |
| tree | bf4da8fd710f55f9e50eb1568df09d2db378f029 /src | |
| parent | c722c96ee6a9afe6d2fcf972e32f9ed1f6251291 (diff) | |
| download | rabbitmq-server-git-9dfa931d3275513da2d63e2913eef94d2c936311.tar.gz | |
Purge stale node state from quorum queues
When a node is disconnected then removed from the RabbitMQ cluster it is
possibly that quorum queues retain some state for this node. This change
purges any enqueuer or consumer state for pids relating to nodes that
are not in the RabbitMQ cluster.
[#164214265]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 103 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 22 |
2 files changed, 99 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index fb6f9ce770..1d53061a8a 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -62,6 +62,7 @@ make_discard/2, make_credit/4, make_purge/0, + make_purge_nodes/1, make_update_config/1 ]). @@ -83,6 +84,7 @@ delivery_count :: non_neg_integer(), drain :: boolean()}). -record(purge, {}). +-record(purge_nodes, {nodes :: [node()]}). -record(update_config, {config :: config()}). -opaque protocol() :: @@ -93,6 +95,7 @@ #discard{} | #credit{} | #purge{} | + #purge_nodes{} | #update_config{}. -type command() :: protocol() | ra_machine:builtin_command(). @@ -396,28 +399,9 @@ apply(Meta, {down, Pid, noconnection}, _ -> [{monitor, node, Node}] end ++ Effects1, - %% TODO: should we run a checkout here? checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); -apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, - enqueuers = Enqs0} = State0) -> - % Remove any enqueuer for the same pid and enqueue any pending messages - % This should be ok as we won't see any more enqueues from this pid - State1 = case maps:take(Pid, Enqs0) of - {#enqueuer{pending = Pend}, Enqs} -> - lists:foldl(fun ({_, RIdx, RawMsg}, S) -> - enqueue(RIdx, RawMsg, S) - end, State0#?MODULE{enqueuers = Enqs}, Pend); - error -> - State0 - end, - {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), - % return checked out messages to main queue - % Find the consumers for the down pid - DownConsumers = maps:keys( - maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), - {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) -> - cancel_consumer(ConsumerId, S, E, down) - end, {State2, Effects1}, DownConsumers), +apply(Meta, {down, Pid, _Info}, State0) -> + {State, Effects} = handle_down(Pid, State0), checkout(Meta, State, Effects); apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, enqueuers = Enqs0, @@ -448,16 +432,50 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, Acc end, {Cons0, SQ0, Monitors}, Cons0), Waiting = update_waiting_consumer_status(Node, State0, up), - State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, + State1 = State0#?MODULE{consumers = Cons1, + enqueuers = Enqs1, service_queue = SQ, waiting_consumers = Waiting}, {State, Effects} = activate_next_consumer(State1, Effects1), checkout(Meta, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; +apply(_, #purge_nodes{nodes = Nodes}, State0) -> + {State, Effects} = lists:foldl(fun(Node, {S, E}) -> + purge_node(Node, S, E) + end, {State0, []}, Nodes), + {State, ok, Effects}; apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []). +purge_node(Node, State, Effects) -> + lists:foldl(fun(Pid, {S0, E0}) -> + {S, E} = handle_down(Pid, S0), + {S, E0 ++ E} + end, {State, Effects}, all_pids_for(Node, State)). + +%% any downs that re not noconnection +handle_down(Pid, #?MODULE{consumers = Cons0, + enqueuers = Enqs0} = State0) -> + % Remove any enqueuer for the same pid and enqueue any pending messages + % This should be ok as we won't see any more enqueues from this pid + State1 = case maps:take(Pid, Enqs0) of + {#enqueuer{pending = Pend}, Enqs} -> + lists:foldl(fun ({_, RIdx, RawMsg}, S) -> + enqueue(RIdx, RawMsg, S) + end, State0#?MODULE{enqueuers = Enqs}, Pend); + error -> + State0 + end, + {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), + % return checked out messages to main queue + % Find the consumers for the down pid + DownConsumers = maps:keys( + maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), + lists:foldl(fun(ConsumerId, {S, E}) -> + cancel_consumer(ConsumerId, S, E, down) + end, {State2, Effects1}, DownConsumers). + consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, @@ -556,8 +574,10 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name, query_consumer_count(State), % Consumers EnqueueBytes, CheckoutBytes}, + %% TODO: call a handler that works out if any known nodes need to be + %% purged and emit a command effect to append this to the log [{mod_call, rabbit_quorum_queue, - handle_tick, [QName, Metrics]}, {aux, emit}]. + handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}]. -spec overview(state()) -> map(). overview(#?MODULE{consumers = Cons, @@ -1495,6 +1515,10 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> -spec make_purge() -> protocol(). make_purge() -> #purge{}. +-spec make_purge_nodes([node()]) -> protocol(). +make_purge_nodes(Nodes) -> + #purge_nodes{nodes = Nodes}. + -spec make_update_config(config()) -> protocol(). make_update_config(Config) -> #update_config{config = Config}. @@ -1532,6 +1556,39 @@ message_size(Msg) -> %% probably only hit this for testing so ok to use erts_debug erts_debug:size(Msg). +all_nodes(#?MODULE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Nodes0 = maps:fold(fun({_, P}, _, Acc) -> + Acc#{node(P) => ok} + end, #{}, Cons0), + Nodes1 = maps:fold(fun(P, _, Acc) -> + Acc#{node(P) => ok} + end, Nodes0, Enqs0), + maps:keys( + lists:foldl(fun({{_, P}, _}, Acc) -> + Acc#{node(P) => ok} + end, Nodes1, WaitingConsumers0)). + +all_pids_for(Node, #?MODULE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Cons = maps:fold(fun({_, P}, _, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, [], Cons0), + Enqs = maps:fold(fun(P, _, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, Cons, Enqs0), + lists:foldl(fun({{_, P}, _}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, Acc) -> Acc + end, Enqs, WaitingConsumers0). + suspected_pids_for(Node, #?MODULE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 260e36d510..76cd194d91 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -28,7 +28,7 @@ -export([cluster_state/1, status/2]). -export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). --export([become_leader/2, handle_tick/2]). +-export([become_leader/2, handle_tick/3]). -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). @@ -243,7 +243,9 @@ rpc_delete_metrics(QName) -> ets:delete(queue_metrics, QName), ok. -handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> +handle_tick(QName, + {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}, + Nodes) -> %% this makes calls to remote processes so cannot be run inside the %% ra server Self = self(), @@ -266,7 +268,21 @@ handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> {messages_ready, MR}, {messages_unacknowledged, MU}, {reductions, R}]), - ok = repair_leader_record(QName, Self) + ok = repair_leader_record(QName, Self), + ExpectedNodes = rabbit_mnesia:cluster_nodes(all), + case Nodes -- ExpectedNodes of + [] -> + ok; + Stale -> + rabbit_log:info("~s: stale nodes detected. Purging ~w~n", + [rabbit_misc:rs(QName), Stale]), + %% pipeline purge command + {ok, Q} = rabbit_amqqueue:lookup(QName), + ok = ra:pipeline_command(amqqueue:get_pid(Q), + rabbit_fifo:make_purge_nodes(Stale)), + + ok + end end), ok. |
