diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 199 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 6 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 5 |
4 files changed, 151 insertions, 66 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8b5fe91cc9..2148a435b6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -762,7 +762,12 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, end, State = lists:foldl( fun({MsgId, {MsgHeader, Msg}}, Acc) -> - IsDelivered = maps:is_key(delivery_count, MsgHeader), + IsDelivered = case MsgHeader of + #{delivery_count := _} -> + true; + _ -> + false + end, Msg1 = add_delivery_count_header(MsgHeader, Msg), handle_deliver(CTag, AckRequired, {QName, From, MsgId, IsDelivered, Msg1}, diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 062fb7eee1..37afefef7e 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -588,8 +588,6 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name, query_consumer_count(State), % Consumers EnqueueBytes, CheckoutBytes}, - %% TODO: call a handler that works out if any known nodes need to be - %% purged and emit a command effect to append this to the log [{mod_call, rabbit_quorum_queue, handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}]. @@ -759,7 +757,8 @@ messages_ready(#?MODULE{messages = M, prefix_msgs = {PreR, PreM}, returns = R}) -> - %% TODO: optimise to avoid length/1 call + %% prefix messages will rarely have anything in them during normal + %% operations so length/1 is fine here maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM). messages_total(#?MODULE{ra_indexes = I, @@ -795,9 +794,9 @@ moving_average(Time, HalfLife, Next, Current) -> Next * (1 - Weight) + Current * Weight. num_checked_out(#?MODULE{consumers = Cons}) -> - lists:foldl(fun (#consumer{checked_out = C}, Acc) -> - maps:size(C) + Acc - end, 0, maps:values(Cons)). + maps:fold(fun (_, #consumer{checked_out = C}, Acc) -> + maps:size(C) + Acc + end, 0, Cons). cancel_consumer(ConsumerId, #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State, @@ -949,13 +948,16 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> end. enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, - low_msg_num = LowMsgNum, - next_msg_num = NextMsgNum} = State0) -> - Header = #{size => message_size(RawMsg)}, + low_msg_num = LowMsgNum, + next_msg_num = NextMsgNum} = State0) -> + %% the initial header is an integer only - it will get expanded to a map + %% when the next required key is added + Header = message_size(RawMsg), {State1, Msg} = case evaluate_memory_limit(Header, State0) of true -> - {State0, {RaftIdx, {Header, 'empty'}}}; % indexed message with header map + % indexed message with header map + {State0, {RaftIdx, {Header, 'empty'}}}; false -> {add_in_memory_counts(Header, State0), {RaftIdx, {Header, RawMsg}}} % indexed message with header map @@ -975,17 +977,30 @@ append_to_master_index(RaftIdx, incr_enqueue_count(#?MODULE{enqueue_count = C, - cfg = #cfg{release_cursor_interval = C}} = State0) -> - % this will trigger a dehydrated version of the state to be stored - % at this raft index for potential future snapshot generation + cfg = #cfg{release_cursor_interval = {_Base, C}} + } = State0) -> + %% this will trigger a dehydrated version of the state to be stored + %% at this raft index for potential future snapshot generation + %% Q: Why don't we just stash the release cursor here? + %% A: Because it needs to be the very last thing we do and we + %% first needs to run the checkout logic. State0#?MODULE{enqueue_count = 0}; +incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg} + = State0) + when is_integer(C) -> + %% conversion to new release cursor interval format + State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, + incr_enqueue_count(State); incr_enqueue_count(#?MODULE{enqueue_count = C} = State) -> State#?MODULE{enqueue_count = C + 1}. maybe_store_dehydrated_state(RaftIdx, - #?MODULE{ra_indexes = Indexes, + #?MODULE{cfg = + #cfg{release_cursor_interval = {Base, _}} + = Cfg, + ra_indexes = Indexes, enqueue_count = 0, - release_cursors = Cursors} = State) -> + release_cursors = Cursors0} = State) -> case rabbit_fifo_index:exists(RaftIdx, Indexes) of false -> %% the incoming enqueue must already have been dropped @@ -993,8 +1008,20 @@ maybe_store_dehydrated_state(RaftIdx, true -> Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, - State#?MODULE{release_cursors = lqueue:in(Cursor, Cursors)} + Cursors = lqueue:in(Cursor, Cursors0), + Interval = lqueue:len(Cursors) * Base, + State#?MODULE{release_cursors = Cursors, + cfg = Cfg#cfg{release_cursor_interval = + {Base, Interval}}} end; +maybe_store_dehydrated_state(RaftIdx, + #?MODULE{cfg = + #cfg{release_cursor_interval = C} = Cfg} + = State0) + when is_integer(C) -> + %% convert to new format + State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, + maybe_store_dehydrated_state(RaftIdx, State); maybe_store_dehydrated_state(_RaftIdx, State) -> State. @@ -1065,15 +1092,17 @@ complete(ConsumerId, Discarded, #consumer{checked_out = Checked} = Con0, Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0, ra_indexes = Indexes0} = State0) -> + %% TODO optimise use of Discarded map here MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], %% credit_mode = simple_prefetch should automatically top-up credit %% as messages are simple_prefetch or otherwise returned Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked), - credit = increase_credit(Con0, maps:size(Discarded))}, + credit = increase_credit(Con0, map_size(Discarded))}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), + %% TODO: use maps:fold instead State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> add_bytes_settle(Header, Acc); ({'$prefix_msg', Header}, Acc) -> @@ -1162,14 +1191,21 @@ find_next_cursor(Smallest, Cursors0, Potential) -> {Potential, Cursors0} end. +update_header(Key, UpdateFun, Default, Header) + when is_integer(Header) -> + update_header(Key, UpdateFun, Default, #{size => Header}); +update_header(Key, UpdateFun, Default, Header) -> + maps:update_with(Key, UpdateFun, Default, Header). + + return_one(MsgId, 0, {Tag, Header0}, #?MODULE{returns = Returns, consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, - Effects0, ConsumerId) when Tag == '$prefix_msg'; Tag == '$empty_msg' -> + Effects0, ConsumerId) + when Tag == '$prefix_msg'; Tag == '$empty_msg' -> #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), - Header = maps:update_with(delivery_count, fun (C) -> C+1 end, - 1, Header0), + Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg0 = {Tag, Header}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> @@ -1198,8 +1234,7 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId) -> #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), - Header = maps:update_with(delivery_count, fun (C) -> C+1 end, - 1, Header0), + Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg0 = {RaftId, {Header, RawMsg}}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> @@ -1211,13 +1246,15 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, %% this should not affect the release cursor in any way {Msg, State1} = case RawMsg of - 'empty' -> {Msg0, State0}; - _ -> case evaluate_memory_limit(Header, State0) of - true -> - {{RaftId, {Header, 'empty'}}, State0}; - false -> - {Msg0, add_in_memory_counts(Header, State0)} - end + 'empty' -> + {Msg0, State0}; + _ -> + case evaluate_memory_limit(Header, State0) of + true -> + {{RaftId, {Header, 'empty'}}, State0}; + false -> + {Msg0, add_in_memory_counts(Header, State0)} + end end, {add_bytes_return( Header, @@ -1289,13 +1326,18 @@ evaluate_limit(Result, State0, Effects0) -> {State0, Result, Effects0} end. -evaluate_memory_limit(_Header, #?MODULE{cfg = #cfg{max_in_memory_length = undefined, - max_in_memory_bytes = undefined}}) -> +evaluate_memory_limit(_Header, + #?MODULE{cfg = #cfg{max_in_memory_length = undefined, + max_in_memory_bytes = undefined}}) -> false; -evaluate_memory_limit(#{size := Size}, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength, - max_in_memory_bytes = MaxBytes}, - msg_bytes_in_memory = Bytes, - msgs_ready_in_memory = Length}) -> +evaluate_memory_limit(#{size := Size}, State) -> + evaluate_memory_limit(Size, State); +evaluate_memory_limit(Size, + #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength, + max_in_memory_bytes = MaxBytes}, + msg_bytes_in_memory = Bytes, + msgs_ready_in_memory = Length}) + when is_integer(Size) -> (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes). append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 -> @@ -1643,49 +1685,82 @@ make_purge_nodes(Nodes) -> make_update_config(Config) -> #update_config{config = Config}. -add_bytes_enqueue(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> - State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}. - -add_bytes_drop(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> - State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}. - -add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout, - msg_bytes_enqueue = Enqueue } = State) -> +add_bytes_enqueue(Bytes, + #?MODULE{msg_bytes_enqueue = Enqueue} = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}; +add_bytes_enqueue(#{size := Bytes}, State) -> + add_bytes_enqueue(Bytes, State). + +add_bytes_drop(Bytes, + #?MODULE{msg_bytes_enqueue = Enqueue} = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}; +add_bytes_drop(#{size := Bytes}, State) -> + add_bytes_drop(Bytes, State). + +add_bytes_checkout(Bytes, + #?MODULE{msg_bytes_checkout = Checkout, + msg_bytes_enqueue = Enqueue } = State) + when is_integer(Bytes) -> State#?MODULE{msg_bytes_checkout = Checkout + Bytes, - msg_bytes_enqueue = Enqueue - Bytes}. - -add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) -> - State#?MODULE{msg_bytes_checkout = Checkout - Bytes}. - -add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout, - msg_bytes_enqueue = Enqueue} = State) -> + msg_bytes_enqueue = Enqueue - Bytes}; +add_bytes_checkout(#{size := Bytes}, State) -> + add_bytes_checkout(Bytes, State). + +add_bytes_settle(Bytes, + #?MODULE{msg_bytes_checkout = Checkout} = State) + when is_integer(Bytes) -> + State#?MODULE{msg_bytes_checkout = Checkout - Bytes}; +add_bytes_settle(#{size := Bytes}, State) -> + add_bytes_settle(Bytes, State). + +add_bytes_return(Bytes, + #?MODULE{msg_bytes_checkout = Checkout, + msg_bytes_enqueue = Enqueue} = State) + when is_integer(Bytes) -> State#?MODULE{msg_bytes_checkout = Checkout - Bytes, - msg_bytes_enqueue = Enqueue + Bytes}. - -add_in_memory_counts(#{size := Bytes}, #?MODULE{msg_bytes_in_memory = InMemoryBytes, - msgs_ready_in_memory = InMemoryCount} = State) -> + msg_bytes_enqueue = Enqueue + Bytes}; +add_bytes_return(#{size := Bytes}, State) -> + add_bytes_return(Bytes, State). + +add_in_memory_counts(Bytes, + #?MODULE{msg_bytes_in_memory = InMemoryBytes, + msgs_ready_in_memory = InMemoryCount} = State) + when is_integer(Bytes) -> State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes, - msgs_ready_in_memory = InMemoryCount + 1}. + msgs_ready_in_memory = InMemoryCount + 1}; +add_in_memory_counts(#{size := Bytes}, State) -> + add_in_memory_counts(Bytes, State). -subtract_in_memory_counts(#{size := Bytes}, +subtract_in_memory_counts(Bytes, #?MODULE{msg_bytes_in_memory = InMemoryBytes, - msgs_ready_in_memory = InMemoryCount} = State) -> + msgs_ready_in_memory = InMemoryCount} = State) + when is_integer(Bytes) -> State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes, - msgs_ready_in_memory = InMemoryCount - 1}. + msgs_ready_in_memory = InMemoryCount - 1}; +subtract_in_memory_counts(#{size := Bytes}, State) -> + subtract_in_memory_counts(Bytes, State). message_size(#basic_message{content = Content}) -> #content{payload_fragments_rev = PFR} = Content, iolist_size(PFR); -message_size({'$prefix_msg', #{size := B}}) -> - B; -message_size({'$empty_msg', #{size := B}}) -> - B; +message_size({'$prefix_msg', H}) -> + get_size_from_header(H); +message_size({'$empty_msg', H}) -> + get_size_from_header(H); message_size(B) when is_binary(B) -> byte_size(B); message_size(Msg) -> %% probably only hit this for testing so ok to use erts_debug erts_debug:size(Msg). +get_size_from_header(Size) when is_integer(Size) -> + Size; +get_size_from_header(#{size := B}) -> + B. + + all_nodes(#?MODULE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index 0e9de0fb10..0d1d5ed2d1 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -18,11 +18,13 @@ %% in enqueue messages. Used to ensure ordering of messages send from the %% same process --type msg_header() :: #{size := msg_size(), +-type msg_header() :: msg_size() | + #{size := msg_size(), delivery_count => non_neg_integer()}. -%% The message header map: +%% The message header: %% delivery_count: the number of unsuccessful delivery attempts. %% A non-zero value indicates a previous attempt. +%% If it only contains the size it can be condensed to an integer only -type msg() :: {msg_header(), raw_msg()}. %% message with a header map. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 4f8c129291..b52678605b 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -473,7 +473,10 @@ basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) -> {ok, empty, QState} -> {ok, empty, QState}; {ok, {{MsgId, {MsgHeader, Msg0}}, MsgsReady}, QState} -> - Count = maps:get(delivery_count, MsgHeader, 0), + Count = case MsgHeader of + #{delivery_count := C} -> C; + _ -> 0 + end, IsDelivered = Count > 0, Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0), {ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState}; |
