diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-07 13:47:12 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-07 16:39:19 +0000 |
| commit | ea122fb064c1420120f689ed469a5ae908db898f (patch) | |
| tree | a5bb74d9dc938d4a4ac75e5a1d7601dbc7da36ca /src | |
| parent | 723dcfecd1b8c46d985e671bb55789c501b7812d (diff) | |
| download | rabbitmq-server-git-ea122fb064c1420120f689ed469a5ae908db898f.tar.gz | |
QQ: Repair amqqrecord leader info on tick
It is possible that the initial update could fail and then the leader
would never be updated. Here we check periodically if the amqqueue
record leader is incorrect and update it if so.
[#163554015]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 36 |
2 files changed, 29 insertions, 9 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 450ba1ac85..1fe0b38cd9 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -660,7 +660,7 @@ tick(_Ts, #state{name = Name, EnqueueBytes, CheckoutBytes}, [{mod_call, rabbit_quorum_queue, - update_metrics, [QName, Metrics]}, {aux, emit}]. + handle_tick, [QName, Metrics]}, {aux, emit}]. -spec overview(state()) -> map(). overview(#state{consumers = Cons, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 8a6d7eee5b..d0d464cda3 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -28,7 +28,7 @@ -export([cluster_state/1, status/2]). -export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). --export([become_leader/2, update_metrics/2]). +-export([become_leader/2, handle_tick/2]). -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). @@ -62,8 +62,9 @@ single_active_consumer_ctag ]). --define(TICK_TIME, 1000). %% the ra server tick time --define(DELETE_TIMEOUT, 5000). %% the ra server tick time +-define(RPC_TIMEOUT, 1000). +-define(TICK_TIMEOUT, 5000). %% the ra server tick time +-define(DELETE_TIMEOUT, 5000). %%---------------------------------------------------------------------------- @@ -124,6 +125,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> friendly_name => FName, initial_members => ServerIds, log_init_args => #{uid => UId}, + tick_timeout => ?TICK_TIMEOUT, machine => RaMachine} end || ServerId <- ServerIds], @@ -220,7 +222,7 @@ become_leader(QName, Name) -> {ok, Q0} when ?is_amqqueue(Q0) -> Nodes = amqqueue:get_quorum_nodes(Q0), [rpc:call(Node, ?MODULE, rpc_delete_metrics, - [QName], ?TICK_TIME) + [QName], ?RPC_TIMEOUT) || Node <- Nodes, Node =/= node()]; _ -> ok @@ -232,7 +234,7 @@ rpc_delete_metrics(QName) -> ets:delete(queue_metrics, QName), ok. -update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> +handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> %% this makes calls to remote processes so cannot be run inside the %% ra server _ = spawn(fun() -> @@ -255,8 +257,26 @@ update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> {messages_unacknowledged, MU}, {reductions, R}]) end), + ok = repair_leader_record(QName), ok. +repair_leader_record(QName) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + Node = node(), + case amqqueue:get_pid(Q) of + {_, Node} -> + %% it's ok - we don't need to do anything + ok; + _ -> + rabbit_log:debug("~s: repairing leader record", + [rabbit_misc:rs(QName)]), + {_, Name} = erlang:process_info(self(), registered_name), + become_leader(QName, Name) + end, + ok. + + + reductions(Name) -> try {reductions, R} = process_info(whereis(Name), reductions), @@ -343,7 +363,7 @@ delete(Q, end, ok = delete_queue_data(QName, ActingUser), rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], - ?TICK_TIME), + ?RPC_TIMEOUT), {ok, ReadyMsgs}; {error, {no_more_servers_to_try, Errs}} -> case lists:all(fun({{error, noproc}, _}) -> true; @@ -824,7 +844,7 @@ i(memory, Q) when ?is_amqqueue(Q) -> i(state, Q) when ?is_amqqueue(Q) -> {Name, Node} = amqqueue:get_pid(Q), %% Check against the leader or last known leader - case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of + case rpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of {badrpc, _} -> down; State -> State end; @@ -899,7 +919,7 @@ format(Q) when ?is_amqqueue(Q) -> [{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}]. is_process_alive(Name, Node) -> - erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)). + erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)). -spec quorum_messages(atom()) -> non_neg_integer(). |
