diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 14 |
3 files changed, 39 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ff83bc9d04..02a20367d9 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1073,7 +1073,7 @@ map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). is_unresponsive(Q, _Timeout) when ?amqqueue_state_is(Q, crashed) -> false; -is_unresponsive(Q, Timeout) -> +is_unresponsive(Q, Timeout) when ?amqqueue_is_classic(Q) -> QPid = amqqueue:get_pid(Q), try delegate:invoke(QPid, {gen_server2, call, [{info, [name]}, Timeout]}), @@ -1082,6 +1082,17 @@ is_unresponsive(Q, Timeout) -> %% TODO catch any exit?? exit:{timeout, _} -> true + end; +is_unresponsive(Q, Timeout) when ?amqqueue_is_quorum(Q) -> + try + case rabbit_fifo_client:stat(Q, Timeout) of + {ok, _, _} -> false; + {timeout, _} -> true; + {error, _} -> true + end + catch + exit:{timeout, _} -> + true end. format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q); diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index afd7926d80..46064a52dc 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -40,7 +40,8 @@ cluster_name/1, update_machine_state/2, pending_size/1, - stat/1 + stat/1, + stat/2 ]). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -419,8 +420,19 @@ pending_size(#state{pending = Pend}) -> stat(Leader) -> %% short timeout as we don't want to spend too long if it is going to %% fail anyway - {ok, {_, {R, C}}, _} = ra:local_query(Leader, fun rabbit_fifo:query_stat/1, 250), - {ok, R, C}. + stat(Leader, 250). + +-spec stat(ra:server_id(), non_neg_integer()) -> + {ok, non_neg_integer(), non_neg_integer()} + | {error | timeout, term()}. +stat(Leader, Timeout) -> + %% short timeout as we don't want to spend too long if it is going to + %% fail anyway + case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, Timeout) of + {ok, {_, {R, C}}, _} -> {ok, R, C}; + {error, _} = Error -> Error; + {timeout, _} = Error -> Error + end. %% @doc returns the cluster name -spec cluster_name(state()) -> cluster_name(). @@ -771,4 +783,3 @@ find_leader([Server | Servers]) -> _ -> find_leader(Servers) end. - diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 7e013fd725..33bae0919f 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -18,7 +18,7 @@ -export([init_state/2, handle_event/2]). -export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]). --export([info/1, info/2, stat/1, infos/1]). +-export([info/1, info/2, stat/1, stat/2, infos/1]). -export([ack/3, reject/4, basic_get/4, basic_consume/10, basic_cancel/4]). -export([credit/4]). -export([purge/1]). @@ -586,9 +586,19 @@ info(Q, Items) -> -spec stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. stat(Q) when ?is_amqqueue(Q) -> + %% same short default timeout as in rabbit_fifo_client:stat/1 + stat(Q, 250). + +-spec stat(amqqueue:amqqueue(), non_neg_integer()) -> {'ok', non_neg_integer(), non_neg_integer()}. + +stat(Q, Timeout) when ?is_amqqueue(Q) -> Leader = amqqueue:get_pid(Q), try - {ok, _, _} = rabbit_fifo_client:stat(Leader) + case rabbit_fifo_client:stat(Leader, Timeout) of + {ok, _, _} = Success -> Success; + {error, _} -> {ok, 0, 0}; + {timeout, _} -> {ok, 0, 0} + end catch _:_ -> %% Leader is not available, cluster might be in minority |
