diff options
-rw-r--r-- | src/rabbit_fifo.erl | 34 | ||||
-rw-r--r-- | src/rabbit_fifo.hrl | 1 | ||||
-rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 21 |
3 files changed, 48 insertions, 8 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d0b56961ec..7c7eba257d 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -609,15 +609,28 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name, overview(#?MODULE{consumers = Cons, enqueuers = Enqs, release_cursors = Cursors, + enqueue_count = EnqCount, msg_bytes_enqueue = EnqueueBytes, - msg_bytes_checkout = CheckoutBytes} = State) -> + msg_bytes_checkout = CheckoutBytes, + cfg = Cfg} = State) -> + Conf = #{name => Cfg#cfg.name, + resource => Cfg#cfg.resource, + release_cursor_interval => Cfg#cfg.release_cursor_interval, + dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler, + max_length => Cfg#cfg.max_length, + max_bytes => Cfg#cfg.max_bytes, + consumer_strategy => Cfg#cfg.consumer_strategy, + max_in_memory_length => Cfg#cfg.max_in_memory_length, + max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes}, #{type => ?MODULE, + config => Conf, num_consumers => maps:size(Cons), num_checked_out => num_checked_out(State), num_enqueuers => maps:size(Enqs), num_ready_messages => messages_ready(State), num_messages => messages_total(State), num_release_cursors => lqueue:len(Cursors), + release_crusor_enqueue_counter => EnqCount, enqueue_message_bytes => EnqueueBytes, checkout_message_bytes => CheckoutBytes}. @@ -1022,15 +1035,20 @@ maybe_store_dehydrated_state(RaftIdx, %% the incoming enqueue must already have been dropped State0; true -> - State = convert_prefix_msgs(State0), - {Time, Dehydrated} = timer:tc(fun () -> dehydrate_state(State) end), - rabbit_log:info("dehydrating state took ~bms", [Time div 1000]), + Interval = case Base of + 0 -> 0; + _ -> + Total = messages_total(State0), + min(max(Total, Base), + ?RELEASE_CURSOR_EVERY_MAX) + end, + State = convert_prefix_msgs( + State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = + {Base, Interval}}}), + Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, Cursors = lqueue:in(Cursor, Cursors0), - Interval = lqueue:len(Cursors) * Base, - State#?MODULE{release_cursors = Cursors, - cfg = Cfg#cfg{release_cursor_interval = - {Base, Interval}}} + State#?MODULE{release_cursors = Cursors} end; maybe_store_dehydrated_state(RaftIdx, #?MODULE{cfg = diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index b9e967cbb1..2a8899d593 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -68,6 +68,7 @@ % represents a partially applied module call -define(RELEASE_CURSOR_EVERY, 64000). +-define(RELEASE_CURSOR_EVERY_MAX, 3200000). -define(USE_AVG_HALF_LIFE, 10000.0). -record(consumer, diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 2413e3391c..23522e71f9 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -45,6 +45,7 @@ all_tests() -> scenario19, scenario20, scenario21, + scenario22, single_active, single_active_01, single_active_02, @@ -376,6 +377,24 @@ scenario21(_Config) -> Commands), ok. +scenario22(_Config) -> + % C1Pid = c:pid(0,883,1), + % C1 = {<<>>, C1Pid}, + E = c:pid(0,176,1), + Commands = [ + make_enqueue(E,1,<<"1">>), + make_enqueue(E,2,<<"2">>), + make_enqueue(E,3,<<"3">>), + make_enqueue(E,4,<<"4">>), + make_enqueue(E,5,<<"5">>) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + release_cursor_interval => 1, + max_length => 3, + dead_letter_handler => {?MODULE, banana, []}}, + Commands), + ok. + single_active_01(_Config) -> C1Pid = test_util:fake_pid(rabbit@fake_node1), C1 = {<<0>>, C1Pid}, @@ -1093,6 +1112,8 @@ run_proper(Fun, Args, NumTests) -> run_snapshot_test(Conf, Commands) -> %% create every incremental permutation of the commands lists %% and run the snapshot tests against that + ct:pal("running snapshot test with ~b commands using config ~p", + [length(Commands), Conf]), [begin % ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), run_snapshot_test0(Conf, C) |