diff options
| author | Karl Nilsson <kjnilsson@gmail.com> | 2019-02-08 09:32:36 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-08 09:32:36 +0000 |
| commit | fcac9ca6e9ae011158bf12ded8ec3671b4b5c3a1 (patch) | |
| tree | 0b41452095982f7ea39392780543187d73263269 /src | |
| parent | 8f0bf2e26b8409d7d8bf13075e871e2d39c5516f (diff) | |
| parent | ea122fb064c1420120f689ed469a5ae908db898f (diff) | |
| download | rabbitmq-server-git-fcac9ca6e9ae011158bf12ded8ec3671b4b5c3a1.tar.gz | |
Merge pull request #1871 from rabbitmq/leader-tracking
QQ: Repair amqqrecord leader info on tick
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 36 |
2 files changed, 29 insertions, 9 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 450ba1ac85..1fe0b38cd9 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -660,7 +660,7 @@ tick(_Ts, #state{name = Name, EnqueueBytes, CheckoutBytes}, [{mod_call, rabbit_quorum_queue, - update_metrics, [QName, Metrics]}, {aux, emit}]. + handle_tick, [QName, Metrics]}, {aux, emit}]. -spec overview(state()) -> map(). overview(#state{consumers = Cons, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 8a6d7eee5b..d0d464cda3 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -28,7 +28,7 @@ -export([cluster_state/1, status/2]). -export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). --export([become_leader/2, update_metrics/2]). +-export([become_leader/2, handle_tick/2]). -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). @@ -62,8 +62,9 @@ single_active_consumer_ctag ]). --define(TICK_TIME, 1000). %% the ra server tick time --define(DELETE_TIMEOUT, 5000). %% the ra server tick time +-define(RPC_TIMEOUT, 1000). +-define(TICK_TIMEOUT, 5000). %% the ra server tick time +-define(DELETE_TIMEOUT, 5000). %%---------------------------------------------------------------------------- @@ -124,6 +125,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> friendly_name => FName, initial_members => ServerIds, log_init_args => #{uid => UId}, + tick_timeout => ?TICK_TIMEOUT, machine => RaMachine} end || ServerId <- ServerIds], @@ -220,7 +222,7 @@ become_leader(QName, Name) -> {ok, Q0} when ?is_amqqueue(Q0) -> Nodes = amqqueue:get_quorum_nodes(Q0), [rpc:call(Node, ?MODULE, rpc_delete_metrics, - [QName], ?TICK_TIME) + [QName], ?RPC_TIMEOUT) || Node <- Nodes, Node =/= node()]; _ -> ok @@ -232,7 +234,7 @@ rpc_delete_metrics(QName) -> ets:delete(queue_metrics, QName), ok. -update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> +handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> %% this makes calls to remote processes so cannot be run inside the %% ra server _ = spawn(fun() -> @@ -255,8 +257,26 @@ update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> {messages_unacknowledged, MU}, {reductions, R}]) end), + ok = repair_leader_record(QName), ok. +repair_leader_record(QName) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + Node = node(), + case amqqueue:get_pid(Q) of + {_, Node} -> + %% it's ok - we don't need to do anything + ok; + _ -> + rabbit_log:debug("~s: repairing leader record", + [rabbit_misc:rs(QName)]), + {_, Name} = erlang:process_info(self(), registered_name), + become_leader(QName, Name) + end, + ok. + + + reductions(Name) -> try {reductions, R} = process_info(whereis(Name), reductions), @@ -343,7 +363,7 @@ delete(Q, end, ok = delete_queue_data(QName, ActingUser), rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], - ?TICK_TIME), + ?RPC_TIMEOUT), {ok, ReadyMsgs}; {error, {no_more_servers_to_try, Errs}} -> case lists:all(fun({{error, noproc}, _}) -> true; @@ -824,7 +844,7 @@ i(memory, Q) when ?is_amqqueue(Q) -> i(state, Q) when ?is_amqqueue(Q) -> {Name, Node} = amqqueue:get_pid(Q), %% Check against the leader or last known leader - case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of + case rpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of {badrpc, _} -> down; State -> State end; @@ -899,7 +919,7 @@ format(Q) when ?is_amqqueue(Q) -> [{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}]. is_process_alive(Name, Node) -> - erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)). + erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)). -spec quorum_messages(atom()) -> non_neg_integer(). |
