summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2019-10-25 08:36:48 +0300
committerMichael Klishin <michael@clojurewerkz.org>2019-10-25 08:36:48 +0300
commit8687c1bb5b36bce4a2ce781bc4aae2d7f10dfe9e (patch)
treec55148f3d0e8a4606d4e7606d3daf034e3f7dd49 /src
parent2fd609388022681995b9263cbedf5fa58e57c302 (diff)
downloadrabbitmq-server-git-8687c1bb5b36bce4a2ce781bc4aae2d7f10dfe9e.tar.gz
Make rabbit_amqueue:is_unresponsive/2 QQ aware
References rabbitmq/rabbitmq-cli#386.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl13
-rw-r--r--src/rabbit_fifo_client.erl19
-rw-r--r--src/rabbit_quorum_queue.erl14
3 files changed, 39 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ff83bc9d04..02a20367d9 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -1073,7 +1073,7 @@ map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
is_unresponsive(Q, _Timeout) when ?amqqueue_state_is(Q, crashed) ->
false;
-is_unresponsive(Q, Timeout) ->
+is_unresponsive(Q, Timeout) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
try
delegate:invoke(QPid, {gen_server2, call, [{info, [name]}, Timeout]}),
@@ -1082,6 +1082,17 @@ is_unresponsive(Q, Timeout) ->
%% TODO catch any exit??
exit:{timeout, _} ->
true
+ end;
+is_unresponsive(Q, Timeout) when ?amqqueue_is_quorum(Q) ->
+ try
+ case rabbit_fifo_client:stat(Q, Timeout) of
+ {ok, _, _} -> false;
+ {timeout, _} -> true;
+ {error, _} -> true
+ end
+ catch
+ exit:{timeout, _} ->
+ true
end.
format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q);
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index afd7926d80..46064a52dc 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -40,7 +40,8 @@
cluster_name/1,
update_machine_state/2,
pending_size/1,
- stat/1
+ stat/1,
+ stat/2
]).
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -419,8 +420,19 @@ pending_size(#state{pending = Pend}) ->
stat(Leader) ->
%% short timeout as we don't want to spend too long if it is going to
%% fail anyway
- {ok, {_, {R, C}}, _} = ra:local_query(Leader, fun rabbit_fifo:query_stat/1, 250),
- {ok, R, C}.
+ stat(Leader, 250).
+
+-spec stat(ra:server_id(), non_neg_integer()) ->
+ {ok, non_neg_integer(), non_neg_integer()}
+ | {error | timeout, term()}.
+stat(Leader, Timeout) ->
+ %% short timeout as we don't want to spend too long if it is going to
+ %% fail anyway
+ case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, Timeout) of
+ {ok, {_, {R, C}}, _} -> {ok, R, C};
+ {error, _} = Error -> Error;
+ {timeout, _} = Error -> Error
+ end.
%% @doc returns the cluster name
-spec cluster_name(state()) -> cluster_name().
@@ -771,4 +783,3 @@ find_leader([Server | Servers]) ->
_ ->
find_leader(Servers)
end.
-
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 7e013fd725..33bae0919f 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -18,7 +18,7 @@
-export([init_state/2, handle_event/2]).
-export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]).
--export([info/1, info/2, stat/1, infos/1]).
+-export([info/1, info/2, stat/1, stat/2, infos/1]).
-export([ack/3, reject/4, basic_get/4, basic_consume/10, basic_cancel/4]).
-export([credit/4]).
-export([purge/1]).
@@ -586,9 +586,19 @@ info(Q, Items) ->
-spec stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}.
stat(Q) when ?is_amqqueue(Q) ->
+ %% same short default timeout as in rabbit_fifo_client:stat/1
+ stat(Q, 250).
+
+-spec stat(amqqueue:amqqueue(), non_neg_integer()) -> {'ok', non_neg_integer(), non_neg_integer()}.
+
+stat(Q, Timeout) when ?is_amqqueue(Q) ->
Leader = amqqueue:get_pid(Q),
try
- {ok, _, _} = rabbit_fifo_client:stat(Leader)
+ case rabbit_fifo_client:stat(Leader, Timeout) of
+ {ok, _, _} = Success -> Success;
+ {error, _} -> {ok, 0, 0};
+ {timeout, _} -> {ok, 0, 0}
+ end
catch
_:_ ->
%% Leader is not available, cluster might be in minority