diff options
| -rw-r--r-- | src/rabbit_fifo_client.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 51 |
2 files changed, 38 insertions, 23 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 9cdb1dfbe7..5e5ad105e4 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -714,7 +714,11 @@ resend_command(Node, Correlation, Command, ok = ra:pipeline_command(Node, Command, Seq), State#state{pending = Pending#{Seq => {Correlation, Command}}}. -add_command(_Cid, _Tag, [], Acc) -> +add_command(_, _, [], Acc) -> Acc; -add_command(Cid, Tag, MsgIds, Acc) -> - [{Tag, MsgIds, Cid} | Acc]. +add_command(Cid, settle, MsgIds, Acc) -> + [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]; +add_command(Cid, return, MsgIds, Acc) -> + [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]; +add_command(Cid, discard, MsgIds, Acc) -> + [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 2394822763..963ef7df01 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -82,6 +82,8 @@ open_files ]). +-define(TICK_TIME, 1000). %% the ra server tick time + %%---------------------------------------------------------------------------- -spec init_state(ra_server_id(), rabbit_types:r('queue')) -> @@ -185,7 +187,8 @@ become_leader(QName, Name) -> end), case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{quorum_nodes = Nodes}} -> - [rpc:call(Node, ?MODULE, rpc_delete_metrics, [QName]) + [rpc:call(Node, ?MODULE, rpc_delete_metrics, + [QName], ?TICK_TIME) || Node <- Nodes, Node =/= node()]; _ -> ok @@ -198,22 +201,29 @@ rpc_delete_metrics(QName) -> ok. update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> - R = reductions(Name), - rabbit_core_metrics:queue_stats(QName, MR, MU, M, R), - Util = case C of - 0 -> 0; - _ -> rabbit_fifo:usage(Name) - end, - Infos = [{consumers, C}, {consumer_utilisation, Util}, - {message_bytes_ready, MsgBytesReady}, - {message_bytes_unacknowledged, MsgBytesUnack}, - {message_bytes, MsgBytesReady + MsgBytesUnack} | infos(QName)], - rabbit_core_metrics:queue_stats(QName, Infos), - rabbit_event:notify(queue_stats, Infos ++ [{name, QName}, - {messages, M}, - {messages_ready, MR}, - {messages_unacknowledged, MU}, - {reductions, R}]). + %% this makes calls to remote processes so cannot be run inside the + %% ra server + _ = spawn(fun() -> + R = reductions(Name), + rabbit_core_metrics:queue_stats(QName, MR, MU, M, R), + Util = case C of + 0 -> 0; + _ -> rabbit_fifo:usage(Name) + end, + Infos = [{consumers, C}, {consumer_utilisation, Util}, + {message_bytes_ready, MsgBytesReady}, + {message_bytes_unacknowledged, MsgBytesUnack}, + {message_bytes, MsgBytesReady + MsgBytesUnack} + | infos(QName)], + rabbit_core_metrics:queue_stats(QName, Infos), + rabbit_event:notify(queue_stats, + Infos ++ [{name, QName}, + {messages, M}, + {messages_ready, MR}, + {messages_unacknowledged, MU}, + {reductions, R}]) + end), + ok. reductions(Name) -> try @@ -286,7 +296,8 @@ delete(#amqqueue{type = quorum, pid = {Name, _}, {'DOWN', MRef, process, _, _} -> ok end, - rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName]), + rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], + ?TICK_TIME), {ok, Msgs}; {error, {no_more_servers_to_try, Errs}} -> case lists:all(fun({{error, noproc}, _}) -> true; @@ -657,7 +668,7 @@ i(memory, #amqqueue{pid = {Name, _}}) -> end; i(state, #amqqueue{pid = {Name, Node}}) -> %% Check against the leader or last known leader - case rpc:call(Node, ?MODULE, cluster_state, [Name]) of + case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of {badrpc, _} -> down; State -> State end; @@ -706,7 +717,7 @@ format(#amqqueue{quorum_nodes = Nodes} = Q) -> [{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}]. is_process_alive(Name, Node) -> - erlang:is_pid(rpc:call(Node, erlang, whereis, [Name])). + erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)). quorum_messages(QName) -> case ets:lookup(queue_coarse_metrics, QName) of |
