summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-10-11 09:26:42 +0100
committerkjnilsson <knilsson@pivotal.io>2019-10-11 12:04:47 +0100
commit851244f0fb6d4e264b660da340447914f459265a (patch)
tree6d3459b93dac0cc594e9644ecf405f866235afc6
parent2cbbcdbb48b5b8b87bb19bc0e08669987dc9f5d0 (diff)
downloadrabbitmq-server-git-851244f0fb6d4e264b660da340447914f459265a.tar.gz
Optimise QQ memory use
Take fewer release cursor snapshots points as the message backlog grows. Also introduces a compacted form of the internal message header map where initially it is only an integer representing the size of the message body. Later when additional keys need to be added it is expanded into a full map. This avoid creating and holding many individial maps with just a size element. [#169064158]
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_fifo.erl199
-rw-r--r--src/rabbit_fifo.hrl6
-rw-r--r--src/rabbit_quorum_queue.erl5
-rw-r--r--test/rabbit_fifo_SUITE.erl8
5 files changed, 155 insertions, 70 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8b5fe91cc9..2148a435b6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -762,7 +762,12 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
end,
State = lists:foldl(
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
- IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ IsDelivered = case MsgHeader of
+ #{delivery_count := _} ->
+ true;
+ _ ->
+ false
+ end,
Msg1 = add_delivery_count_header(MsgHeader, Msg),
handle_deliver(CTag, AckRequired,
{QName, From, MsgId, IsDelivered, Msg1},
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 062fb7eee1..37afefef7e 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -588,8 +588,6 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
query_consumer_count(State), % Consumers
EnqueueBytes,
CheckoutBytes},
- %% TODO: call a handler that works out if any known nodes need to be
- %% purged and emit a command effect to append this to the log
[{mod_call, rabbit_quorum_queue,
handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}].
@@ -759,7 +757,8 @@ messages_ready(#?MODULE{messages = M,
prefix_msgs = {PreR, PreM},
returns = R}) ->
- %% TODO: optimise to avoid length/1 call
+ %% prefix messages will rarely have anything in them during normal
+ %% operations so length/1 is fine here
maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM).
messages_total(#?MODULE{ra_indexes = I,
@@ -795,9 +794,9 @@ moving_average(Time, HalfLife, Next, Current) ->
Next * (1 - Weight) + Current * Weight.
num_checked_out(#?MODULE{consumers = Cons}) ->
- lists:foldl(fun (#consumer{checked_out = C}, Acc) ->
- maps:size(C) + Acc
- end, 0, maps:values(Cons)).
+ maps:fold(fun (_, #consumer{checked_out = C}, Acc) ->
+ maps:size(C) + Acc
+ end, 0, Cons).
cancel_consumer(ConsumerId,
#?MODULE{cfg = #cfg{consumer_strategy = competing}} = State,
@@ -949,13 +948,16 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
end.
enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
- low_msg_num = LowMsgNum,
- next_msg_num = NextMsgNum} = State0) ->
- Header = #{size => message_size(RawMsg)},
+ low_msg_num = LowMsgNum,
+ next_msg_num = NextMsgNum} = State0) ->
+ %% the initial header is an integer only - it will get expanded to a map
+ %% when the next required key is added
+ Header = message_size(RawMsg),
{State1, Msg} =
case evaluate_memory_limit(Header, State0) of
true ->
- {State0, {RaftIdx, {Header, 'empty'}}}; % indexed message with header map
+ % indexed message with header map
+ {State0, {RaftIdx, {Header, 'empty'}}};
false ->
{add_in_memory_counts(Header, State0),
{RaftIdx, {Header, RawMsg}}} % indexed message with header map
@@ -975,17 +977,30 @@ append_to_master_index(RaftIdx,
incr_enqueue_count(#?MODULE{enqueue_count = C,
- cfg = #cfg{release_cursor_interval = C}} = State0) ->
- % this will trigger a dehydrated version of the state to be stored
- % at this raft index for potential future snapshot generation
+ cfg = #cfg{release_cursor_interval = {_Base, C}}
+ } = State0) ->
+ %% this will trigger a dehydrated version of the state to be stored
+ %% at this raft index for potential future snapshot generation
+ %% Q: Why don't we just stash the release cursor here?
+ %% A: Because it needs to be the very last thing we do and we
+ %% first needs to run the checkout logic.
State0#?MODULE{enqueue_count = 0};
+incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg}
+ = State0)
+ when is_integer(C) ->
+ %% conversion to new release cursor interval format
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
+ incr_enqueue_count(State);
incr_enqueue_count(#?MODULE{enqueue_count = C} = State) ->
State#?MODULE{enqueue_count = C + 1}.
maybe_store_dehydrated_state(RaftIdx,
- #?MODULE{ra_indexes = Indexes,
+ #?MODULE{cfg =
+ #cfg{release_cursor_interval = {Base, _}}
+ = Cfg,
+ ra_indexes = Indexes,
enqueue_count = 0,
- release_cursors = Cursors} = State) ->
+ release_cursors = Cursors0} = State) ->
case rabbit_fifo_index:exists(RaftIdx, Indexes) of
false ->
%% the incoming enqueue must already have been dropped
@@ -993,8 +1008,20 @@ maybe_store_dehydrated_state(RaftIdx,
true ->
Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
- State#?MODULE{release_cursors = lqueue:in(Cursor, Cursors)}
+ Cursors = lqueue:in(Cursor, Cursors0),
+ Interval = lqueue:len(Cursors) * Base,
+ State#?MODULE{release_cursors = Cursors,
+ cfg = Cfg#cfg{release_cursor_interval =
+ {Base, Interval}}}
end;
+maybe_store_dehydrated_state(RaftIdx,
+ #?MODULE{cfg =
+ #cfg{release_cursor_interval = C} = Cfg}
+ = State0)
+ when is_integer(C) ->
+ %% convert to new format
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
+ maybe_store_dehydrated_state(RaftIdx, State);
maybe_store_dehydrated_state(_RaftIdx, State) ->
State.
@@ -1065,15 +1092,17 @@ complete(ConsumerId, Discarded,
#consumer{checked_out = Checked} = Con0, Effects0,
#?MODULE{consumers = Cons0, service_queue = SQ0,
ra_indexes = Indexes0} = State0) ->
+ %% TODO optimise use of Discarded map here
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
%% credit_mode = simple_prefetch should automatically top-up credit
%% as messages are simple_prefetch or otherwise returned
Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked),
- credit = increase_credit(Con0, maps:size(Discarded))},
+ credit = increase_credit(Con0, map_size(Discarded))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
MsgRaftIdxs),
+ %% TODO: use maps:fold instead
State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
add_bytes_settle(Header, Acc);
({'$prefix_msg', Header}, Acc) ->
@@ -1162,14 +1191,21 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
{Potential, Cursors0}
end.
+update_header(Key, UpdateFun, Default, Header)
+ when is_integer(Header) ->
+ update_header(Key, UpdateFun, Default, #{size => Header});
+update_header(Key, UpdateFun, Default, Header) ->
+ maps:update_with(Key, UpdateFun, Default, Header).
+
+
return_one(MsgId, 0, {Tag, Header0},
#?MODULE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
- Effects0, ConsumerId) when Tag == '$prefix_msg'; Tag == '$empty_msg' ->
+ Effects0, ConsumerId)
+ when Tag == '$prefix_msg'; Tag == '$empty_msg' ->
#consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
- Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
- 1, Header0),
+ Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0),
Msg0 = {Tag, Header},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
@@ -1198,8 +1234,7 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
Effects0, ConsumerId) ->
#consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
- Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
- 1, Header0),
+ Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0),
Msg0 = {RaftId, {Header, RawMsg}},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
@@ -1211,13 +1246,15 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
%% this should not affect the release cursor in any way
{Msg, State1} = case RawMsg of
- 'empty' -> {Msg0, State0};
- _ -> case evaluate_memory_limit(Header, State0) of
- true ->
- {{RaftId, {Header, 'empty'}}, State0};
- false ->
- {Msg0, add_in_memory_counts(Header, State0)}
- end
+ 'empty' ->
+ {Msg0, State0};
+ _ ->
+ case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{RaftId, {Header, 'empty'}}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
end,
{add_bytes_return(
Header,
@@ -1289,13 +1326,18 @@ evaluate_limit(Result, State0, Effects0) ->
{State0, Result, Effects0}
end.
-evaluate_memory_limit(_Header, #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
- max_in_memory_bytes = undefined}}) ->
+evaluate_memory_limit(_Header,
+ #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
+ max_in_memory_bytes = undefined}}) ->
false;
-evaluate_memory_limit(#{size := Size}, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
- max_in_memory_bytes = MaxBytes},
- msg_bytes_in_memory = Bytes,
- msgs_ready_in_memory = Length}) ->
+evaluate_memory_limit(#{size := Size}, State) ->
+ evaluate_memory_limit(Size, State);
+evaluate_memory_limit(Size,
+ #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
+ max_in_memory_bytes = MaxBytes},
+ msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length})
+ when is_integer(Size) ->
(Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes).
append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
@@ -1643,49 +1685,82 @@ make_purge_nodes(Nodes) ->
make_update_config(Config) ->
#update_config{config = Config}.
-add_bytes_enqueue(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
- State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}.
-
-add_bytes_drop(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
- State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}.
-
-add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
- msg_bytes_enqueue = Enqueue } = State) ->
+add_bytes_enqueue(Bytes,
+ #?MODULE{msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes};
+add_bytes_enqueue(#{size := Bytes}, State) ->
+ add_bytes_enqueue(Bytes, State).
+
+add_bytes_drop(Bytes,
+ #?MODULE{msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes};
+add_bytes_drop(#{size := Bytes}, State) ->
+ add_bytes_drop(Bytes, State).
+
+add_bytes_checkout(Bytes,
+ #?MODULE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue } = State)
+ when is_integer(Bytes) ->
State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
- msg_bytes_enqueue = Enqueue - Bytes}.
-
-add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) ->
- State#?MODULE{msg_bytes_checkout = Checkout - Bytes}.
-
-add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
- msg_bytes_enqueue = Enqueue} = State) ->
+ msg_bytes_enqueue = Enqueue - Bytes};
+add_bytes_checkout(#{size := Bytes}, State) ->
+ add_bytes_checkout(Bytes, State).
+
+add_bytes_settle(Bytes,
+ #?MODULE{msg_bytes_checkout = Checkout} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_checkout = Checkout - Bytes};
+add_bytes_settle(#{size := Bytes}, State) ->
+ add_bytes_settle(Bytes, State).
+
+add_bytes_return(Bytes,
+ #?MODULE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
- msg_bytes_enqueue = Enqueue + Bytes}.
-
-add_in_memory_counts(#{size := Bytes}, #?MODULE{msg_bytes_in_memory = InMemoryBytes,
- msgs_ready_in_memory = InMemoryCount} = State) ->
+ msg_bytes_enqueue = Enqueue + Bytes};
+add_bytes_return(#{size := Bytes}, State) ->
+ add_bytes_return(Bytes, State).
+
+add_in_memory_counts(Bytes,
+ #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State)
+ when is_integer(Bytes) ->
State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes,
- msgs_ready_in_memory = InMemoryCount + 1}.
+ msgs_ready_in_memory = InMemoryCount + 1};
+add_in_memory_counts(#{size := Bytes}, State) ->
+ add_in_memory_counts(Bytes, State).
-subtract_in_memory_counts(#{size := Bytes},
+subtract_in_memory_counts(Bytes,
#?MODULE{msg_bytes_in_memory = InMemoryBytes,
- msgs_ready_in_memory = InMemoryCount} = State) ->
+ msgs_ready_in_memory = InMemoryCount} = State)
+ when is_integer(Bytes) ->
State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes,
- msgs_ready_in_memory = InMemoryCount - 1}.
+ msgs_ready_in_memory = InMemoryCount - 1};
+subtract_in_memory_counts(#{size := Bytes}, State) ->
+ subtract_in_memory_counts(Bytes, State).
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
-message_size({'$prefix_msg', #{size := B}}) ->
- B;
-message_size({'$empty_msg', #{size := B}}) ->
- B;
+message_size({'$prefix_msg', H}) ->
+ get_size_from_header(H);
+message_size({'$empty_msg', H}) ->
+ get_size_from_header(H);
message_size(B) when is_binary(B) ->
byte_size(B);
message_size(Msg) ->
%% probably only hit this for testing so ok to use erts_debug
erts_debug:size(Msg).
+get_size_from_header(Size) when is_integer(Size) ->
+ Size;
+get_size_from_header(#{size := B}) ->
+ B.
+
+
all_nodes(#?MODULE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 0e9de0fb10..0d1d5ed2d1 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -18,11 +18,13 @@
%% in enqueue messages. Used to ensure ordering of messages send from the
%% same process
--type msg_header() :: #{size := msg_size(),
+-type msg_header() :: msg_size() |
+ #{size := msg_size(),
delivery_count => non_neg_integer()}.
-%% The message header map:
+%% The message header:
%% delivery_count: the number of unsuccessful delivery attempts.
%% A non-zero value indicates a previous attempt.
+%% If it only contains the size it can be condensed to an integer only
-type msg() :: {msg_header(), raw_msg()}.
%% message with a header map.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 4f8c129291..b52678605b 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -473,7 +473,10 @@ basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) ->
{ok, empty, QState} ->
{ok, empty, QState};
{ok, {{MsgId, {MsgHeader, Msg0}}, MsgsReady}, QState} ->
- Count = maps:get(delivery_count, MsgHeader, 0),
+ Count = case MsgHeader of
+ #{delivery_count := C} -> C;
+ _ -> 0
+ end,
IsDelivered = Count > 0,
Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
{ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState};
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 0a0ac94e63..0d9acfa1fa 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -444,12 +444,12 @@ discarded_message_without_dead_letter_handler_is_removed_test(_) ->
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
{State1, Effects1} = check_n(Cid, 2, 10, State0),
?ASSERT_EFF({send_msg, _,
- {delivery, _, [{0, {#{}, first}}]}, _},
+ {delivery, _, [{0, {_, first}}]}, _},
Effects1),
{_State2, _, Effects2} = apply(meta(1),
rabbit_fifo:make_discard(Cid, [0]), State1),
?assertNoEffect({send_msg, _,
- {delivery, _, [{0, {#{}, first}}]}, _},
+ {delivery, _, [{0, {_, first}}]}, _},
Effects2),
ok.
@@ -462,7 +462,7 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test(_) ->
{State0, [_, _]} = enq(1, 1, first, State00),
{State1, Effects1} = check_n(Cid, 2, 10, State0),
?ASSERT_EFF({send_msg, _,
- {delivery, _, [{0, {#{}, first}}]}, _},
+ {delivery, _, [{0, {_, first}}]}, _},
Effects1),
{_State2, _, Effects2} = apply(meta(1), rabbit_fifo:make_discard(Cid, [0]), State1),
% assert mod call effect with appended reason and message
@@ -502,7 +502,7 @@ delivery_query_returns_deliveries_test(_) ->
Entries = lists:zip(Indexes, Commands),
{State, _Effects} = run_log(test_init(help), Entries),
% 3 deliveries are returned
- [{0, {#{}, one}}] = rabbit_fifo:get_checked_out(Cid, 0, 0, State),
+ [{0, {_, one}}] = rabbit_fifo:get_checked_out(Cid, 0, 0, State),
[_, _, _] = rabbit_fifo:get_checked_out(Cid, 1, 3, State),
ok.