summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo_client.erl10
-rw-r--r--src/rabbit_quorum_queue.erl51
2 files changed, 38 insertions, 23 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 9cdb1dfbe7..5e5ad105e4 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -714,7 +714,11 @@ resend_command(Node, Correlation, Command,
ok = ra:pipeline_command(Node, Command, Seq),
State#state{pending = Pending#{Seq => {Correlation, Command}}}.
-add_command(_Cid, _Tag, [], Acc) ->
+add_command(_, _, [], Acc) ->
Acc;
-add_command(Cid, Tag, MsgIds, Acc) ->
- [{Tag, MsgIds, Cid} | Acc].
+add_command(Cid, settle, MsgIds, Acc) ->
+ [rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
+add_command(Cid, return, MsgIds, Acc) ->
+ [rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
+add_command(Cid, discard, MsgIds, Acc) ->
+ [rabbit_fifo:make_settle(Cid, MsgIds) | Acc].
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 2394822763..963ef7df01 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -82,6 +82,8 @@
open_files
]).
+-define(TICK_TIME, 1000). %% the ra server tick time
+
%%----------------------------------------------------------------------------
-spec init_state(ra_server_id(), rabbit_types:r('queue')) ->
@@ -185,7 +187,8 @@ become_leader(QName, Name) ->
end),
case rabbit_amqqueue:lookup(QName) of
{ok, #amqqueue{quorum_nodes = Nodes}} ->
- [rpc:call(Node, ?MODULE, rpc_delete_metrics, [QName])
+ [rpc:call(Node, ?MODULE, rpc_delete_metrics,
+ [QName], ?TICK_TIME)
|| Node <- Nodes, Node =/= node()];
_ ->
ok
@@ -198,22 +201,29 @@ rpc_delete_metrics(QName) ->
ok.
update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
- R = reductions(Name),
- rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
- Util = case C of
- 0 -> 0;
- _ -> rabbit_fifo:usage(Name)
- end,
- Infos = [{consumers, C}, {consumer_utilisation, Util},
- {message_bytes_ready, MsgBytesReady},
- {message_bytes_unacknowledged, MsgBytesUnack},
- {message_bytes, MsgBytesReady + MsgBytesUnack} | infos(QName)],
- rabbit_core_metrics:queue_stats(QName, Infos),
- rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
- {messages, M},
- {messages_ready, MR},
- {messages_unacknowledged, MU},
- {reductions, R}]).
+ %% this makes calls to remote processes so cannot be run inside the
+ %% ra server
+ _ = spawn(fun() ->
+ R = reductions(Name),
+ rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
+ Util = case C of
+ 0 -> 0;
+ _ -> rabbit_fifo:usage(Name)
+ end,
+ Infos = [{consumers, C}, {consumer_utilisation, Util},
+ {message_bytes_ready, MsgBytesReady},
+ {message_bytes_unacknowledged, MsgBytesUnack},
+ {message_bytes, MsgBytesReady + MsgBytesUnack}
+ | infos(QName)],
+ rabbit_core_metrics:queue_stats(QName, Infos),
+ rabbit_event:notify(queue_stats,
+ Infos ++ [{name, QName},
+ {messages, M},
+ {messages_ready, MR},
+ {messages_unacknowledged, MU},
+ {reductions, R}])
+ end),
+ ok.
reductions(Name) ->
try
@@ -286,7 +296,8 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
{'DOWN', MRef, process, _, _} ->
ok
end,
- rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName]),
+ rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
+ ?TICK_TIME),
{ok, Msgs};
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
@@ -657,7 +668,7 @@ i(memory, #amqqueue{pid = {Name, _}}) ->
end;
i(state, #amqqueue{pid = {Name, Node}}) ->
%% Check against the leader or last known leader
- case rpc:call(Node, ?MODULE, cluster_state, [Name]) of
+ case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of
{badrpc, _} -> down;
State -> State
end;
@@ -706,7 +717,7 @@ format(#amqqueue{quorum_nodes = Nodes} = Q) ->
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].
is_process_alive(Name, Node) ->
- erlang:is_pid(rpc:call(Node, erlang, whereis, [Name])).
+ erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)).
quorum_messages(QName) ->
case ets:lookup(queue_coarse_metrics, QName) of