diff options
author | kjnilsson <knilsson@pivotal.io> | 2020-03-05 11:55:28 +0000 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2020-03-05 11:55:28 +0000 |
commit | 34f3a0066e050f3eaaebf67d73db462e4262b699 (patch) | |
tree | e3dca0d8d0092f972c9d24ef7e72ff156057f5ea | |
parent | 23745d6a880e10836145c429518d3e5faded7c3d (diff) | |
download | rabbitmq-server-git-rabbit-fifo-release-cursor-fix.tar.gz |
rabbit_fifo: change release cursor calculationrabbit-fifo-release-cursor-fix
Release cursors are taken less frequently the more messages there are on
queue. This changes how this is calculated to simply use the message
count rather than some multiple of the currently captured release
cursors. This is more consistent and doesn't depend on non snapshottable
state.
-rw-r--r-- | src/rabbit_fifo.erl | 34 | ||||
-rw-r--r-- | src/rabbit_fifo.hrl | 1 | ||||
-rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 21 |
3 files changed, 48 insertions, 8 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d0b56961ec..7c7eba257d 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -609,15 +609,28 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name, overview(#?MODULE{consumers = Cons, enqueuers = Enqs, release_cursors = Cursors, + enqueue_count = EnqCount, msg_bytes_enqueue = EnqueueBytes, - msg_bytes_checkout = CheckoutBytes} = State) -> + msg_bytes_checkout = CheckoutBytes, + cfg = Cfg} = State) -> + Conf = #{name => Cfg#cfg.name, + resource => Cfg#cfg.resource, + release_cursor_interval => Cfg#cfg.release_cursor_interval, + dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler, + max_length => Cfg#cfg.max_length, + max_bytes => Cfg#cfg.max_bytes, + consumer_strategy => Cfg#cfg.consumer_strategy, + max_in_memory_length => Cfg#cfg.max_in_memory_length, + max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes}, #{type => ?MODULE, + config => Conf, num_consumers => maps:size(Cons), num_checked_out => num_checked_out(State), num_enqueuers => maps:size(Enqs), num_ready_messages => messages_ready(State), num_messages => messages_total(State), num_release_cursors => lqueue:len(Cursors), + release_crusor_enqueue_counter => EnqCount, enqueue_message_bytes => EnqueueBytes, checkout_message_bytes => CheckoutBytes}. @@ -1022,15 +1035,20 @@ maybe_store_dehydrated_state(RaftIdx, %% the incoming enqueue must already have been dropped State0; true -> - State = convert_prefix_msgs(State0), - {Time, Dehydrated} = timer:tc(fun () -> dehydrate_state(State) end), - rabbit_log:info("dehydrating state took ~bms", [Time div 1000]), + Interval = case Base of + 0 -> 0; + _ -> + Total = messages_total(State0), + min(max(Total, Base), + ?RELEASE_CURSOR_EVERY_MAX) + end, + State = convert_prefix_msgs( + State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = + {Base, Interval}}}), + Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, Cursors = lqueue:in(Cursor, Cursors0), - Interval = lqueue:len(Cursors) * Base, - State#?MODULE{release_cursors = Cursors, - cfg = Cfg#cfg{release_cursor_interval = - {Base, Interval}}} + State#?MODULE{release_cursors = Cursors} end; maybe_store_dehydrated_state(RaftIdx, #?MODULE{cfg = diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index b9e967cbb1..2a8899d593 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -68,6 +68,7 @@ % represents a partially applied module call -define(RELEASE_CURSOR_EVERY, 64000). +-define(RELEASE_CURSOR_EVERY_MAX, 3200000). -define(USE_AVG_HALF_LIFE, 10000.0). -record(consumer, diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 2413e3391c..23522e71f9 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -45,6 +45,7 @@ all_tests() -> scenario19, scenario20, scenario21, + scenario22, single_active, single_active_01, single_active_02, @@ -376,6 +377,24 @@ scenario21(_Config) -> Commands), ok. +scenario22(_Config) -> + % C1Pid = c:pid(0,883,1), + % C1 = {<<>>, C1Pid}, + E = c:pid(0,176,1), + Commands = [ + make_enqueue(E,1,<<"1">>), + make_enqueue(E,2,<<"2">>), + make_enqueue(E,3,<<"3">>), + make_enqueue(E,4,<<"4">>), + make_enqueue(E,5,<<"5">>) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + release_cursor_interval => 1, + max_length => 3, + dead_letter_handler => {?MODULE, banana, []}}, + Commands), + ok. + single_active_01(_Config) -> C1Pid = test_util:fake_pid(rabbit@fake_node1), C1 = {<<0>>, C1Pid}, @@ -1093,6 +1112,8 @@ run_proper(Fun, Args, NumTests) -> run_snapshot_test(Conf, Commands) -> %% create every incremental permutation of the commands lists %% and run the snapshot tests against that + ct:pal("running snapshot test with ~b commands using config ~p", + [length(Commands), Conf]), [begin % ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), run_snapshot_test0(Conf, C) |