summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2019-02-08 09:32:36 +0000
committerGitHub <noreply@github.com>2019-02-08 09:32:36 +0000
commitfcac9ca6e9ae011158bf12ded8ec3671b4b5c3a1 (patch)
tree0b41452095982f7ea39392780543187d73263269 /src
parent8f0bf2e26b8409d7d8bf13075e871e2d39c5516f (diff)
parentea122fb064c1420120f689ed469a5ae908db898f (diff)
downloadrabbitmq-server-git-fcac9ca6e9ae011158bf12ded8ec3671b4b5c3a1.tar.gz
Merge pull request #1871 from rabbitmq/leader-tracking
QQ: Repair amqqrecord leader info on tick
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl2
-rw-r--r--src/rabbit_quorum_queue.erl36
2 files changed, 29 insertions, 9 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 450ba1ac85..1fe0b38cd9 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -660,7 +660,7 @@ tick(_Ts, #state{name = Name,
EnqueueBytes,
CheckoutBytes},
[{mod_call, rabbit_quorum_queue,
- update_metrics, [QName, Metrics]}, {aux, emit}].
+ handle_tick, [QName, Metrics]}, {aux, emit}].
-spec overview(state()) -> map().
overview(#state{consumers = Cons,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 8a6d7eee5b..d0d464cda3 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -28,7 +28,7 @@
-export([cluster_state/1, status/2]).
-export([update_consumer_handler/8, update_consumer/9]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
--export([become_leader/2, update_metrics/2]).
+-export([become_leader/2, handle_tick/2]).
-export([rpc_delete_metrics/1]).
-export([format/1]).
-export([open_files/1]).
@@ -62,8 +62,9 @@
single_active_consumer_ctag
]).
--define(TICK_TIME, 1000). %% the ra server tick time
--define(DELETE_TIMEOUT, 5000). %% the ra server tick time
+-define(RPC_TIMEOUT, 1000).
+-define(TICK_TIMEOUT, 5000). %% the ra server tick time
+-define(DELETE_TIMEOUT, 5000).
%%----------------------------------------------------------------------------
@@ -124,6 +125,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
friendly_name => FName,
initial_members => ServerIds,
log_init_args => #{uid => UId},
+ tick_timeout => ?TICK_TIMEOUT,
machine => RaMachine}
end || ServerId <- ServerIds],
@@ -220,7 +222,7 @@ become_leader(QName, Name) ->
{ok, Q0} when ?is_amqqueue(Q0) ->
Nodes = amqqueue:get_quorum_nodes(Q0),
[rpc:call(Node, ?MODULE, rpc_delete_metrics,
- [QName], ?TICK_TIME)
+ [QName], ?RPC_TIMEOUT)
|| Node <- Nodes, Node =/= node()];
_ ->
ok
@@ -232,7 +234,7 @@ rpc_delete_metrics(QName) ->
ets:delete(queue_metrics, QName),
ok.
-update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
+handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
%% this makes calls to remote processes so cannot be run inside the
%% ra server
_ = spawn(fun() ->
@@ -255,8 +257,26 @@ update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
{messages_unacknowledged, MU},
{reductions, R}])
end),
+ ok = repair_leader_record(QName),
ok.
+repair_leader_record(QName) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ Node = node(),
+ case amqqueue:get_pid(Q) of
+ {_, Node} ->
+ %% it's ok - we don't need to do anything
+ ok;
+ _ ->
+ rabbit_log:debug("~s: repairing leader record",
+ [rabbit_misc:rs(QName)]),
+ {_, Name} = erlang:process_info(self(), registered_name),
+ become_leader(QName, Name)
+ end,
+ ok.
+
+
+
reductions(Name) ->
try
{reductions, R} = process_info(whereis(Name), reductions),
@@ -343,7 +363,7 @@ delete(Q,
end,
ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
- ?TICK_TIME),
+ ?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
@@ -824,7 +844,7 @@ i(memory, Q) when ?is_amqqueue(Q) ->
i(state, Q) when ?is_amqqueue(Q) ->
{Name, Node} = amqqueue:get_pid(Q),
%% Check against the leader or last known leader
- case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of
+ case rpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
{badrpc, _} -> down;
State -> State
end;
@@ -899,7 +919,7 @@ format(Q) when ?is_amqqueue(Q) ->
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].
is_process_alive(Name, Node) ->
- erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)).
+ erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).
-spec quorum_messages(atom()) -> non_neg_integer().