summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2021-02-01 16:49:10 +0100
committerdcorbacho <dparracorbacho@piotal.io>2021-02-01 16:49:10 +0100
commitbfaea09df9f146f13a536e08d30a606172aea830 (patch)
tree5d110ecf91621e02855bf97143fa4d9fccd875fb
parente305a71a3353922c7350375b41eb352a0066fca3 (diff)
downloadrabbitmq-server-git-is-unresponsive-stream.tar.gz
Implement `is_unresponsive` for stream queuesis-unresponsive-stream
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl11
1 files changed, 11 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index dccc725a9d..3704cf6ecc 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -1207,6 +1207,17 @@ is_unresponsive(Q, Timeout) when ?amqqueue_is_quorum(Q) ->
catch
exit:{timeout, _} ->
true
+ end;
+is_unresponsive(Q, Timeout) when ?amqqueue_is_stream(Q) ->
+ try
+ #{leader_pid := LeaderPid} = amqqueue:get_type_state(Q),
+ case gen_batch_server:call(LeaderPid, get_reader_context, Timeout) of
+ #{dir := _} -> false;
+ _ -> true
+ end
+ catch
+ exit:{timeout, _} ->
+ true
end.
format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q);