diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-03-05 11:55:28 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-03-05 11:55:28 +0000 |
| commit | 34f3a0066e050f3eaaebf67d73db462e4262b699 (patch) | |
| tree | e3dca0d8d0092f972c9d24ef7e72ff156057f5ea /src | |
| parent | 23745d6a880e10836145c429518d3e5faded7c3d (diff) | |
| download | rabbitmq-server-git-34f3a0066e050f3eaaebf67d73db462e4262b699.tar.gz | |
rabbit_fifo: change release cursor calculationrabbit-fifo-release-cursor-fix
Release cursors are taken less frequently the more messages there are on
queue. This changes how this is calculated to simply use the message
count rather than some multiple of the currently captured release
cursors. This is more consistent and doesn't depend on non snapshottable
state.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 1 |
2 files changed, 27 insertions, 8 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d0b56961ec..7c7eba257d 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -609,15 +609,28 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name, overview(#?MODULE{consumers = Cons, enqueuers = Enqs, release_cursors = Cursors, + enqueue_count = EnqCount, msg_bytes_enqueue = EnqueueBytes, - msg_bytes_checkout = CheckoutBytes} = State) -> + msg_bytes_checkout = CheckoutBytes, + cfg = Cfg} = State) -> + Conf = #{name => Cfg#cfg.name, + resource => Cfg#cfg.resource, + release_cursor_interval => Cfg#cfg.release_cursor_interval, + dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler, + max_length => Cfg#cfg.max_length, + max_bytes => Cfg#cfg.max_bytes, + consumer_strategy => Cfg#cfg.consumer_strategy, + max_in_memory_length => Cfg#cfg.max_in_memory_length, + max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes}, #{type => ?MODULE, + config => Conf, num_consumers => maps:size(Cons), num_checked_out => num_checked_out(State), num_enqueuers => maps:size(Enqs), num_ready_messages => messages_ready(State), num_messages => messages_total(State), num_release_cursors => lqueue:len(Cursors), + release_crusor_enqueue_counter => EnqCount, enqueue_message_bytes => EnqueueBytes, checkout_message_bytes => CheckoutBytes}. @@ -1022,15 +1035,20 @@ maybe_store_dehydrated_state(RaftIdx, %% the incoming enqueue must already have been dropped State0; true -> - State = convert_prefix_msgs(State0), - {Time, Dehydrated} = timer:tc(fun () -> dehydrate_state(State) end), - rabbit_log:info("dehydrating state took ~bms", [Time div 1000]), + Interval = case Base of + 0 -> 0; + _ -> + Total = messages_total(State0), + min(max(Total, Base), + ?RELEASE_CURSOR_EVERY_MAX) + end, + State = convert_prefix_msgs( + State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = + {Base, Interval}}}), + Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, Cursors = lqueue:in(Cursor, Cursors0), - Interval = lqueue:len(Cursors) * Base, - State#?MODULE{release_cursors = Cursors, - cfg = Cfg#cfg{release_cursor_interval = - {Base, Interval}}} + State#?MODULE{release_cursors = Cursors} end; maybe_store_dehydrated_state(RaftIdx, #?MODULE{cfg = diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index b9e967cbb1..2a8899d593 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -68,6 +68,7 @@ % represents a partially applied module call -define(RELEASE_CURSOR_EVERY, 64000). +-define(RELEASE_CURSOR_EVERY_MAX, 3200000). -define(USE_AVG_HALF_LIFE, 10000.0). -record(consumer, |
