summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-07 13:47:12 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-07 16:39:19 +0000
commitea122fb064c1420120f689ed469a5ae908db898f (patch)
treea5bb74d9dc938d4a4ac75e5a1d7601dbc7da36ca /src
parent723dcfecd1b8c46d985e671bb55789c501b7812d (diff)
downloadrabbitmq-server-git-ea122fb064c1420120f689ed469a5ae908db898f.tar.gz
QQ: Repair amqqrecord leader info on tick
It is possible that the initial update could fail and then the leader would never be updated. Here we check periodically if the amqqueue record leader is incorrect and update it if so. [#163554015]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl2
-rw-r--r--src/rabbit_quorum_queue.erl36
2 files changed, 29 insertions, 9 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 450ba1ac85..1fe0b38cd9 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -660,7 +660,7 @@ tick(_Ts, #state{name = Name,
EnqueueBytes,
CheckoutBytes},
[{mod_call, rabbit_quorum_queue,
- update_metrics, [QName, Metrics]}, {aux, emit}].
+ handle_tick, [QName, Metrics]}, {aux, emit}].
-spec overview(state()) -> map().
overview(#state{consumers = Cons,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 8a6d7eee5b..d0d464cda3 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -28,7 +28,7 @@
-export([cluster_state/1, status/2]).
-export([update_consumer_handler/8, update_consumer/9]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
--export([become_leader/2, update_metrics/2]).
+-export([become_leader/2, handle_tick/2]).
-export([rpc_delete_metrics/1]).
-export([format/1]).
-export([open_files/1]).
@@ -62,8 +62,9 @@
single_active_consumer_ctag
]).
--define(TICK_TIME, 1000). %% the ra server tick time
--define(DELETE_TIMEOUT, 5000). %% the ra server tick time
+-define(RPC_TIMEOUT, 1000).
+-define(TICK_TIMEOUT, 5000). %% the ra server tick time
+-define(DELETE_TIMEOUT, 5000).
%%----------------------------------------------------------------------------
@@ -124,6 +125,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
friendly_name => FName,
initial_members => ServerIds,
log_init_args => #{uid => UId},
+ tick_timeout => ?TICK_TIMEOUT,
machine => RaMachine}
end || ServerId <- ServerIds],
@@ -220,7 +222,7 @@ become_leader(QName, Name) ->
{ok, Q0} when ?is_amqqueue(Q0) ->
Nodes = amqqueue:get_quorum_nodes(Q0),
[rpc:call(Node, ?MODULE, rpc_delete_metrics,
- [QName], ?TICK_TIME)
+ [QName], ?RPC_TIMEOUT)
|| Node <- Nodes, Node =/= node()];
_ ->
ok
@@ -232,7 +234,7 @@ rpc_delete_metrics(QName) ->
ets:delete(queue_metrics, QName),
ok.
-update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
+handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
%% this makes calls to remote processes so cannot be run inside the
%% ra server
_ = spawn(fun() ->
@@ -255,8 +257,26 @@ update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
{messages_unacknowledged, MU},
{reductions, R}])
end),
+ ok = repair_leader_record(QName),
ok.
+repair_leader_record(QName) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ Node = node(),
+ case amqqueue:get_pid(Q) of
+ {_, Node} ->
+ %% it's ok - we don't need to do anything
+ ok;
+ _ ->
+ rabbit_log:debug("~s: repairing leader record",
+ [rabbit_misc:rs(QName)]),
+ {_, Name} = erlang:process_info(self(), registered_name),
+ become_leader(QName, Name)
+ end,
+ ok.
+
+
+
reductions(Name) ->
try
{reductions, R} = process_info(whereis(Name), reductions),
@@ -343,7 +363,7 @@ delete(Q,
end,
ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
- ?TICK_TIME),
+ ?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
@@ -824,7 +844,7 @@ i(memory, Q) when ?is_amqqueue(Q) ->
i(state, Q) when ?is_amqqueue(Q) ->
{Name, Node} = amqqueue:get_pid(Q),
%% Check against the leader or last known leader
- case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of
+ case rpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
{badrpc, _} -> down;
State -> State
end;
@@ -899,7 +919,7 @@ format(Q) when ?is_amqqueue(Q) ->
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].
is_process_alive(Name, Node) ->
- erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)).
+ erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).
-spec quorum_messages(atom()) -> non_neg_integer().