summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl4
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl18
2 files changed, 22 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index cbd6abc843..9f2ffcd30a 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -35,6 +35,7 @@
query_messages_total/1,
query_processes/1,
query_ra_indexes/1,
+ query_oldest_message_timestamp/1,
query_consumer_count/1,
query_consumers/1,
query_stat/1,
@@ -869,6 +870,9 @@ query_processes(#?MODULE{enqueuers = Enqs, consumers = Cons0}) ->
query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) ->
RaIndexes.
+query_oldest_message_timestamp(#?MODULE{ra_indexes = RaIndexes}) ->
+ rabbit_fifo_index:smallest_value(RaIndexes).
+
query_consumer_count(#?MODULE{consumers = Consumers,
waiting_consumers = WaitingConsumers}) ->
Up = maps:filter(fun(_ConsumerId, #consumer{status = Status}) ->
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index 9075ec1df1..2fa6edbc48 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -63,6 +63,8 @@
notify_decorators/3,
spawn_notify_decorators/3]).
+-export([get_oldest_message_timestamp/1]).
+
-export([is_enabled/0,
declare/2]).
@@ -1595,6 +1597,22 @@ notify_decorators(Q) when ?is_amqqueue(Q) ->
_ -> ok
end.
+
+-spec get_oldest_message_timestamp(amqqueue:amqqueue()) ->
+ {ok, non_neg_integer() | undefined | ''} |
+ {error, term()}.
+get_oldest_message_timestamp(Q) when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ case ra:local_query(QPid,
+ fun rabbit_fifo:query_oldest_message_timestamp/1) of
+ {ok, {_, Timestamp}, _} ->
+ {ok, Timestamp};
+ {error, _} = Err ->
+ Err;
+ {timeout, _} ->
+ {error, timeout}
+ end.
+
notify_decorators(QName, Event) ->
notify_decorators(QName, Event, []).