summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl34
-rw-r--r--src/rabbit_fifo.hrl1
2 files changed, 27 insertions, 8 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index d0b56961ec..7c7eba257d 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -609,15 +609,28 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
overview(#?MODULE{consumers = Cons,
enqueuers = Enqs,
release_cursors = Cursors,
+ enqueue_count = EnqCount,
msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes} = State) ->
+ msg_bytes_checkout = CheckoutBytes,
+ cfg = Cfg} = State) ->
+ Conf = #{name => Cfg#cfg.name,
+ resource => Cfg#cfg.resource,
+ release_cursor_interval => Cfg#cfg.release_cursor_interval,
+ dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
+ max_length => Cfg#cfg.max_length,
+ max_bytes => Cfg#cfg.max_bytes,
+ consumer_strategy => Cfg#cfg.consumer_strategy,
+ max_in_memory_length => Cfg#cfg.max_in_memory_length,
+ max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes},
#{type => ?MODULE,
+ config => Conf,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
num_enqueuers => maps:size(Enqs),
num_ready_messages => messages_ready(State),
num_messages => messages_total(State),
num_release_cursors => lqueue:len(Cursors),
+ release_crusor_enqueue_counter => EnqCount,
enqueue_message_bytes => EnqueueBytes,
checkout_message_bytes => CheckoutBytes}.
@@ -1022,15 +1035,20 @@ maybe_store_dehydrated_state(RaftIdx,
%% the incoming enqueue must already have been dropped
State0;
true ->
- State = convert_prefix_msgs(State0),
- {Time, Dehydrated} = timer:tc(fun () -> dehydrate_state(State) end),
- rabbit_log:info("dehydrating state took ~bms", [Time div 1000]),
+ Interval = case Base of
+ 0 -> 0;
+ _ ->
+ Total = messages_total(State0),
+ min(max(Total, Base),
+ ?RELEASE_CURSOR_EVERY_MAX)
+ end,
+ State = convert_prefix_msgs(
+ State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
+ {Base, Interval}}}),
+ Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
Cursors = lqueue:in(Cursor, Cursors0),
- Interval = lqueue:len(Cursors) * Base,
- State#?MODULE{release_cursors = Cursors,
- cfg = Cfg#cfg{release_cursor_interval =
- {Base, Interval}}}
+ State#?MODULE{release_cursors = Cursors}
end;
maybe_store_dehydrated_state(RaftIdx,
#?MODULE{cfg =
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index b9e967cbb1..2a8899d593 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -68,6 +68,7 @@
% represents a partially applied module call
-define(RELEASE_CURSOR_EVERY, 64000).
+-define(RELEASE_CURSOR_EVERY_MAX, 3200000).
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,