diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 1 |
2 files changed, 27 insertions, 8 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d0b56961ec..7c7eba257d 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -609,15 +609,28 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name, overview(#?MODULE{consumers = Cons, enqueuers = Enqs, release_cursors = Cursors, + enqueue_count = EnqCount, msg_bytes_enqueue = EnqueueBytes, - msg_bytes_checkout = CheckoutBytes} = State) -> + msg_bytes_checkout = CheckoutBytes, + cfg = Cfg} = State) -> + Conf = #{name => Cfg#cfg.name, + resource => Cfg#cfg.resource, + release_cursor_interval => Cfg#cfg.release_cursor_interval, + dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler, + max_length => Cfg#cfg.max_length, + max_bytes => Cfg#cfg.max_bytes, + consumer_strategy => Cfg#cfg.consumer_strategy, + max_in_memory_length => Cfg#cfg.max_in_memory_length, + max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes}, #{type => ?MODULE, + config => Conf, num_consumers => maps:size(Cons), num_checked_out => num_checked_out(State), num_enqueuers => maps:size(Enqs), num_ready_messages => messages_ready(State), num_messages => messages_total(State), num_release_cursors => lqueue:len(Cursors), + release_crusor_enqueue_counter => EnqCount, enqueue_message_bytes => EnqueueBytes, checkout_message_bytes => CheckoutBytes}. @@ -1022,15 +1035,20 @@ maybe_store_dehydrated_state(RaftIdx, %% the incoming enqueue must already have been dropped State0; true -> - State = convert_prefix_msgs(State0), - {Time, Dehydrated} = timer:tc(fun () -> dehydrate_state(State) end), - rabbit_log:info("dehydrating state took ~bms", [Time div 1000]), + Interval = case Base of + 0 -> 0; + _ -> + Total = messages_total(State0), + min(max(Total, Base), + ?RELEASE_CURSOR_EVERY_MAX) + end, + State = convert_prefix_msgs( + State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = + {Base, Interval}}}), + Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, Cursors = lqueue:in(Cursor, Cursors0), - Interval = lqueue:len(Cursors) * Base, - State#?MODULE{release_cursors = Cursors, - cfg = Cfg#cfg{release_cursor_interval = - {Base, Interval}}} + State#?MODULE{release_cursors = Cursors} end; maybe_store_dehydrated_state(RaftIdx, #?MODULE{cfg = diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index b9e967cbb1..2a8899d593 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -68,6 +68,7 @@ % represents a partially applied module call -define(RELEASE_CURSOR_EVERY, 64000). +-define(RELEASE_CURSOR_EVERY_MAX, 3200000). -define(USE_AVG_HALF_LIFE, 10000.0). -record(consumer, |
