diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-10-11 09:26:42 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-10-11 12:04:47 +0100 |
| commit | 851244f0fb6d4e264b660da340447914f459265a (patch) | |
| tree | 6d3459b93dac0cc594e9644ecf405f866235afc6 | |
| parent | 2cbbcdbb48b5b8b87bb19bc0e08669987dc9f5d0 (diff) | |
| download | rabbitmq-server-git-851244f0fb6d4e264b660da340447914f459265a.tar.gz | |
Optimise QQ memory use
Take fewer release cursor snapshots points as the message backlog grows.
Also introduces a compacted form of the internal message header map
where initially it is only an integer representing the size of the
message body. Later when additional keys need to be added it is expanded
into a full map. This avoid creating and holding many individial maps
with just a size element.
[#169064158]
| -rw-r--r-- | src/rabbit_channel.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 199 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 6 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 5 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 8 |
5 files changed, 155 insertions, 70 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8b5fe91cc9..2148a435b6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -762,7 +762,12 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, end, State = lists:foldl( fun({MsgId, {MsgHeader, Msg}}, Acc) -> - IsDelivered = maps:is_key(delivery_count, MsgHeader), + IsDelivered = case MsgHeader of + #{delivery_count := _} -> + true; + _ -> + false + end, Msg1 = add_delivery_count_header(MsgHeader, Msg), handle_deliver(CTag, AckRequired, {QName, From, MsgId, IsDelivered, Msg1}, diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 062fb7eee1..37afefef7e 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -588,8 +588,6 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name, query_consumer_count(State), % Consumers EnqueueBytes, CheckoutBytes}, - %% TODO: call a handler that works out if any known nodes need to be - %% purged and emit a command effect to append this to the log [{mod_call, rabbit_quorum_queue, handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}]. @@ -759,7 +757,8 @@ messages_ready(#?MODULE{messages = M, prefix_msgs = {PreR, PreM}, returns = R}) -> - %% TODO: optimise to avoid length/1 call + %% prefix messages will rarely have anything in them during normal + %% operations so length/1 is fine here maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM). messages_total(#?MODULE{ra_indexes = I, @@ -795,9 +794,9 @@ moving_average(Time, HalfLife, Next, Current) -> Next * (1 - Weight) + Current * Weight. num_checked_out(#?MODULE{consumers = Cons}) -> - lists:foldl(fun (#consumer{checked_out = C}, Acc) -> - maps:size(C) + Acc - end, 0, maps:values(Cons)). + maps:fold(fun (_, #consumer{checked_out = C}, Acc) -> + maps:size(C) + Acc + end, 0, Cons). cancel_consumer(ConsumerId, #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State, @@ -949,13 +948,16 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> end. enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, - low_msg_num = LowMsgNum, - next_msg_num = NextMsgNum} = State0) -> - Header = #{size => message_size(RawMsg)}, + low_msg_num = LowMsgNum, + next_msg_num = NextMsgNum} = State0) -> + %% the initial header is an integer only - it will get expanded to a map + %% when the next required key is added + Header = message_size(RawMsg), {State1, Msg} = case evaluate_memory_limit(Header, State0) of true -> - {State0, {RaftIdx, {Header, 'empty'}}}; % indexed message with header map + % indexed message with header map + {State0, {RaftIdx, {Header, 'empty'}}}; false -> {add_in_memory_counts(Header, State0), {RaftIdx, {Header, RawMsg}}} % indexed message with header map @@ -975,17 +977,30 @@ append_to_master_index(RaftIdx, incr_enqueue_count(#?MODULE{enqueue_count = C, - cfg = #cfg{release_cursor_interval = C}} = State0) -> - % this will trigger a dehydrated version of the state to be stored - % at this raft index for potential future snapshot generation + cfg = #cfg{release_cursor_interval = {_Base, C}} + } = State0) -> + %% this will trigger a dehydrated version of the state to be stored + %% at this raft index for potential future snapshot generation + %% Q: Why don't we just stash the release cursor here? + %% A: Because it needs to be the very last thing we do and we + %% first needs to run the checkout logic. State0#?MODULE{enqueue_count = 0}; +incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg} + = State0) + when is_integer(C) -> + %% conversion to new release cursor interval format + State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, + incr_enqueue_count(State); incr_enqueue_count(#?MODULE{enqueue_count = C} = State) -> State#?MODULE{enqueue_count = C + 1}. maybe_store_dehydrated_state(RaftIdx, - #?MODULE{ra_indexes = Indexes, + #?MODULE{cfg = + #cfg{release_cursor_interval = {Base, _}} + = Cfg, + ra_indexes = Indexes, enqueue_count = 0, - release_cursors = Cursors} = State) -> + release_cursors = Cursors0} = State) -> case rabbit_fifo_index:exists(RaftIdx, Indexes) of false -> %% the incoming enqueue must already have been dropped @@ -993,8 +1008,20 @@ maybe_store_dehydrated_state(RaftIdx, true -> Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, - State#?MODULE{release_cursors = lqueue:in(Cursor, Cursors)} + Cursors = lqueue:in(Cursor, Cursors0), + Interval = lqueue:len(Cursors) * Base, + State#?MODULE{release_cursors = Cursors, + cfg = Cfg#cfg{release_cursor_interval = + {Base, Interval}}} end; +maybe_store_dehydrated_state(RaftIdx, + #?MODULE{cfg = + #cfg{release_cursor_interval = C} = Cfg} + = State0) + when is_integer(C) -> + %% convert to new format + State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, + maybe_store_dehydrated_state(RaftIdx, State); maybe_store_dehydrated_state(_RaftIdx, State) -> State. @@ -1065,15 +1092,17 @@ complete(ConsumerId, Discarded, #consumer{checked_out = Checked} = Con0, Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0, ra_indexes = Indexes0} = State0) -> + %% TODO optimise use of Discarded map here MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], %% credit_mode = simple_prefetch should automatically top-up credit %% as messages are simple_prefetch or otherwise returned Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked), - credit = increase_credit(Con0, maps:size(Discarded))}, + credit = increase_credit(Con0, map_size(Discarded))}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), + %% TODO: use maps:fold instead State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> add_bytes_settle(Header, Acc); ({'$prefix_msg', Header}, Acc) -> @@ -1162,14 +1191,21 @@ find_next_cursor(Smallest, Cursors0, Potential) -> {Potential, Cursors0} end. +update_header(Key, UpdateFun, Default, Header) + when is_integer(Header) -> + update_header(Key, UpdateFun, Default, #{size => Header}); +update_header(Key, UpdateFun, Default, Header) -> + maps:update_with(Key, UpdateFun, Default, Header). + + return_one(MsgId, 0, {Tag, Header0}, #?MODULE{returns = Returns, consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, - Effects0, ConsumerId) when Tag == '$prefix_msg'; Tag == '$empty_msg' -> + Effects0, ConsumerId) + when Tag == '$prefix_msg'; Tag == '$empty_msg' -> #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), - Header = maps:update_with(delivery_count, fun (C) -> C+1 end, - 1, Header0), + Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg0 = {Tag, Header}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> @@ -1198,8 +1234,7 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId) -> #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), - Header = maps:update_with(delivery_count, fun (C) -> C+1 end, - 1, Header0), + Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg0 = {RaftId, {Header, RawMsg}}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> @@ -1211,13 +1246,15 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, %% this should not affect the release cursor in any way {Msg, State1} = case RawMsg of - 'empty' -> {Msg0, State0}; - _ -> case evaluate_memory_limit(Header, State0) of - true -> - {{RaftId, {Header, 'empty'}}, State0}; - false -> - {Msg0, add_in_memory_counts(Header, State0)} - end + 'empty' -> + {Msg0, State0}; + _ -> + case evaluate_memory_limit(Header, State0) of + true -> + {{RaftId, {Header, 'empty'}}, State0}; + false -> + {Msg0, add_in_memory_counts(Header, State0)} + end end, {add_bytes_return( Header, @@ -1289,13 +1326,18 @@ evaluate_limit(Result, State0, Effects0) -> {State0, Result, Effects0} end. -evaluate_memory_limit(_Header, #?MODULE{cfg = #cfg{max_in_memory_length = undefined, - max_in_memory_bytes = undefined}}) -> +evaluate_memory_limit(_Header, + #?MODULE{cfg = #cfg{max_in_memory_length = undefined, + max_in_memory_bytes = undefined}}) -> false; -evaluate_memory_limit(#{size := Size}, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength, - max_in_memory_bytes = MaxBytes}, - msg_bytes_in_memory = Bytes, - msgs_ready_in_memory = Length}) -> +evaluate_memory_limit(#{size := Size}, State) -> + evaluate_memory_limit(Size, State); +evaluate_memory_limit(Size, + #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength, + max_in_memory_bytes = MaxBytes}, + msg_bytes_in_memory = Bytes, + msgs_ready_in_memory = Length}) + when is_integer(Size) -> (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes). append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 -> @@ -1643,49 +1685,82 @@ make_purge_nodes(Nodes) -> make_update_config(Config) -> #update_config{config = Config}. -add_bytes_enqueue(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> - State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}. - -add_bytes_drop(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> - State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}. - -add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout, - msg_bytes_enqueue = Enqueue } = State) -> +add_bytes_enqueue(Bytes, + #?MODULE{msg_bytes_enqueue = Enqueue} = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}; +add_bytes_enqueue(#{size := Bytes}, State) -> + add_bytes_enqueue(Bytes, State). + +add_bytes_drop(Bytes, + #?MODULE{msg_bytes_enqueue = Enqueue} = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}; +add_bytes_drop(#{size := Bytes}, State) -> + add_bytes_drop(Bytes, State). + +add_bytes_checkout(Bytes, + #?MODULE{msg_bytes_checkout = Checkout, + msg_bytes_enqueue = Enqueue } = State) + when is_integer(Bytes) -> State#?MODULE{msg_bytes_checkout = Checkout + Bytes, - msg_bytes_enqueue = Enqueue - Bytes}. - -add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) -> - State#?MODULE{msg_bytes_checkout = Checkout - Bytes}. - -add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout, - msg_bytes_enqueue = Enqueue} = State) -> + msg_bytes_enqueue = Enqueue - Bytes}; +add_bytes_checkout(#{size := Bytes}, State) -> + add_bytes_checkout(Bytes, State). + +add_bytes_settle(Bytes, + #?MODULE{msg_bytes_checkout = Checkout} = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_checkout = Checkout - Bytes}; +add_bytes_settle(#{size := Bytes}, State) -> + add_bytes_settle(Bytes, State). + +add_bytes_return(Bytes, + #?MODULE{msg_bytes_checkout = Checkout, + msg_bytes_enqueue = Enqueue} = State) + when is_integer(Bytes) -> State#?MODULE{msg_bytes_checkout = Checkout - Bytes, - msg_bytes_enqueue = Enqueue + Bytes}. - -add_in_memory_counts(#{size := Bytes}, #?MODULE{msg_bytes_in_memory = InMemoryBytes, - msgs_ready_in_memory = InMemoryCount} = State) -> + msg_bytes_enqueue = Enqueue + Bytes}; +add_bytes_return(#{size := Bytes}, State) -> + add_bytes_return(Bytes, State). + +add_in_memory_counts(Bytes, + #?MODULE{msg_bytes_in_memory = InMemoryBytes, + msgs_ready_in_memory = InMemoryCount} = State) + when is_integer(Bytes) -> State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes, - msgs_ready_in_memory = InMemoryCount + 1}. + msgs_ready_in_memory = InMemoryCount + 1}; +add_in_memory_counts(#{size := Bytes}, State) -> + add_in_memory_counts(Bytes, State). -subtract_in_memory_counts(#{size := Bytes}, +subtract_in_memory_counts(Bytes, #?MODULE{msg_bytes_in_memory = InMemoryBytes, - msgs_ready_in_memory = InMemoryCount} = State) -> + msgs_ready_in_memory = InMemoryCount} = State) + when is_integer(Bytes) -> State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes, - msgs_ready_in_memory = InMemoryCount - 1}. + msgs_ready_in_memory = InMemoryCount - 1}; +subtract_in_memory_counts(#{size := Bytes}, State) -> + subtract_in_memory_counts(Bytes, State). message_size(#basic_message{content = Content}) -> #content{payload_fragments_rev = PFR} = Content, iolist_size(PFR); -message_size({'$prefix_msg', #{size := B}}) -> - B; -message_size({'$empty_msg', #{size := B}}) -> - B; +message_size({'$prefix_msg', H}) -> + get_size_from_header(H); +message_size({'$empty_msg', H}) -> + get_size_from_header(H); message_size(B) when is_binary(B) -> byte_size(B); message_size(Msg) -> %% probably only hit this for testing so ok to use erts_debug erts_debug:size(Msg). +get_size_from_header(Size) when is_integer(Size) -> + Size; +get_size_from_header(#{size := B}) -> + B. + + all_nodes(#?MODULE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index 0e9de0fb10..0d1d5ed2d1 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -18,11 +18,13 @@ %% in enqueue messages. Used to ensure ordering of messages send from the %% same process --type msg_header() :: #{size := msg_size(), +-type msg_header() :: msg_size() | + #{size := msg_size(), delivery_count => non_neg_integer()}. -%% The message header map: +%% The message header: %% delivery_count: the number of unsuccessful delivery attempts. %% A non-zero value indicates a previous attempt. +%% If it only contains the size it can be condensed to an integer only -type msg() :: {msg_header(), raw_msg()}. %% message with a header map. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 4f8c129291..b52678605b 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -473,7 +473,10 @@ basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) -> {ok, empty, QState} -> {ok, empty, QState}; {ok, {{MsgId, {MsgHeader, Msg0}}, MsgsReady}, QState} -> - Count = maps:get(delivery_count, MsgHeader, 0), + Count = case MsgHeader of + #{delivery_count := C} -> C; + _ -> 0 + end, IsDelivered = Count > 0, Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0), {ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState}; diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 0a0ac94e63..0d9acfa1fa 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -444,12 +444,12 @@ discarded_message_without_dead_letter_handler_is_removed_test(_) -> {State0, [_, _]} = enq(1, 1, first, test_init(test)), {State1, Effects1} = check_n(Cid, 2, 10, State0), ?ASSERT_EFF({send_msg, _, - {delivery, _, [{0, {#{}, first}}]}, _}, + {delivery, _, [{0, {_, first}}]}, _}, Effects1), {_State2, _, Effects2} = apply(meta(1), rabbit_fifo:make_discard(Cid, [0]), State1), ?assertNoEffect({send_msg, _, - {delivery, _, [{0, {#{}, first}}]}, _}, + {delivery, _, [{0, {_, first}}]}, _}, Effects2), ok. @@ -462,7 +462,7 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test(_) -> {State0, [_, _]} = enq(1, 1, first, State00), {State1, Effects1} = check_n(Cid, 2, 10, State0), ?ASSERT_EFF({send_msg, _, - {delivery, _, [{0, {#{}, first}}]}, _}, + {delivery, _, [{0, {_, first}}]}, _}, Effects1), {_State2, _, Effects2} = apply(meta(1), rabbit_fifo:make_discard(Cid, [0]), State1), % assert mod call effect with appended reason and message @@ -502,7 +502,7 @@ delivery_query_returns_deliveries_test(_) -> Entries = lists:zip(Indexes, Commands), {State, _Effects} = run_log(test_init(help), Entries), % 3 deliveries are returned - [{0, {#{}, one}}] = rabbit_fifo:get_checked_out(Cid, 0, 0, State), + [{0, {_, one}}] = rabbit_fifo:get_checked_out(Cid, 0, 0, State), [_, _, _] = rabbit_fifo:get_checked_out(Cid, 1, 3, State), ok. |
