diff options
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 4 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 18 |
2 files changed, 22 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index cbd6abc843..9f2ffcd30a 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -35,6 +35,7 @@ query_messages_total/1, query_processes/1, query_ra_indexes/1, + query_oldest_message_timestamp/1, query_consumer_count/1, query_consumers/1, query_stat/1, @@ -869,6 +870,9 @@ query_processes(#?MODULE{enqueuers = Enqs, consumers = Cons0}) -> query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) -> RaIndexes. +query_oldest_message_timestamp(#?MODULE{ra_indexes = RaIndexes}) -> + rabbit_fifo_index:smallest_value(RaIndexes). + query_consumer_count(#?MODULE{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> Up = maps:filter(fun(_ConsumerId, #consumer{status = Status}) -> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 9075ec1df1..2fa6edbc48 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -63,6 +63,8 @@ notify_decorators/3, spawn_notify_decorators/3]). +-export([get_oldest_message_timestamp/1]). + -export([is_enabled/0, declare/2]). @@ -1595,6 +1597,22 @@ notify_decorators(Q) when ?is_amqqueue(Q) -> _ -> ok end. + +-spec get_oldest_message_timestamp(amqqueue:amqqueue()) -> + {ok, non_neg_integer() | undefined | ''} | + {error, term()}. +get_oldest_message_timestamp(Q) when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), + case ra:local_query(QPid, + fun rabbit_fifo:query_oldest_message_timestamp/1) of + {ok, {_, Timestamp}, _} -> + {ok, Timestamp}; + {error, _} = Err -> + Err; + {timeout, _} -> + {error, timeout} + end. + notify_decorators(QName, Event) -> notify_decorators(QName, Event, []). |