diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-04-02 12:26:28 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-04-03 16:32:32 +0100 |
| commit | 19f8dec83bfa899cf7d43e532ca8a52c73213a97 (patch) | |
| tree | 0876af93cb80b793c44c57110a33e48e55e5b68a /src | |
| parent | 74aac975655b4d419ac53c911768048234edbb9a (diff) | |
| download | rabbitmq-server-git-19f8dec83bfa899cf7d43e532ca8a52c73213a97.tar.gz | |
Fix in-memory counters
[#164735591]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 113 |
1 files changed, 75 insertions, 38 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 0f957b4a19..d397459d4f 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -897,17 +897,20 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> {FullMsg = {_MsgId, {RaftIdxToDrop, {#{size := Bytes} = Header, Msg}}}, State1} -> Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0), - State1 = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}), + State2 = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}), State = case Msg of - 'empty' -> subtract_in_memory_counts(Header, State1); - _ -> State1 + 'empty' -> State2; + _ -> subtract_in_memory_counts(Header, State2) end, Effects = dead_letter_effects(maxlen, #{none => FullMsg}, State, Effects0), {State, Effects}; - {{'$prefix_msg', #{size := Bytes}}, State1} -> - State = add_bytes_drop(Bytes, State1), - {State, Effects0}; + {{'$prefix_msg', #{size := Bytes} = Header}, State1} -> + State2 = subtract_in_memory_counts(Header, add_bytes_drop(Bytes, State1)), + {State2, Effects0}; + {{'$empty_msg', #{size := Bytes}}, State1} -> + State2 = add_bytes_drop(Bytes, State1), + {State2, Effects0}; empty -> {State0, Effects0} end. @@ -926,10 +929,10 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, end, State = add_bytes_enqueue(Size, State1), State#?MODULE{messages = Messages#{NextMsgNum => Msg}, - % this is probably only done to record it when low_msg_num - % is undefined - low_msg_num = min(LowMsgNum, NextMsgNum), - next_msg_num = NextMsgNum + 1}. + %% this is probably only done to record it when low_msg_num + %% is undefined + low_msg_num = min(LowMsgNum, NextMsgNum), + next_msg_num = NextMsgNum + 1}. append_to_master_index(RaftIdx, #?MODULE{ra_indexes = Indexes0} = State0) -> @@ -1016,6 +1019,9 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, fun({'$prefix_msg', _} = Msg, {S0, E0}) -> return_one(0, Msg, S0, E0, ConsumerId, Con); + ({'$empty_msg', _} = Msg, {S0, E0}) -> + return_one(0, Msg, S0, E0, + ConsumerId, Con); ({MsgNum, Msg}, {S0, E0}) -> return_one(MsgNum, Msg, S0, E0, ConsumerId, Con) @@ -1062,6 +1068,8 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> add_bytes_settle(Header, Acc); ({'$prefix_msg', Header}, Acc) -> + add_bytes_settle(Header, Acc); + ({'$empty_msg', Header}, Acc) -> add_bytes_settle(Header, Acc) end, State0, maps:values(Discarded)), %% need to pass the length of discarded as $prefix_msgs would be filtered @@ -1128,14 +1136,14 @@ find_next_cursor(Smallest, Cursors0, Potential) -> {Potential, Cursors0} end. -return_one(0, {'$prefix_msg', Header0}, +return_one(0, {Tag, Header0}, #?MODULE{returns = Returns, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, - Effects0, ConsumerId, Con) -> + Effects0, ConsumerId, Con) when Tag == '$prefix_msg'; Tag == '$empty_msg'-> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), - Msg = {'$prefix_msg', Header}, + Msg = {Tag, Header}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> Checked = Con#consumer.checked_out, @@ -1144,8 +1152,12 @@ return_one(0, {'$prefix_msg', Header0}, {add_bytes_settle(Header, State1), Effects}; _ -> %% this should not affect the release cursor in any way - {add_bytes_return(Msg, - State0#?MODULE{returns = lqueue:in(Msg, Returns)}), + State1 = case Tag of + '$empty_msg' -> State0; + _ -> add_in_memory_counts(maps:get(size, Header), State0) + end, + {add_bytes_return(Header, + State1#?MODULE{returns = lqueue:in(Msg, Returns)}), Effects0} end; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, @@ -1165,16 +1177,16 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, Checked = Con#consumer.checked_out, {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, Effects, State0), - State2 = case RawMsg of - 'empty' -> State1; - _ -> add_in_memory_counts(maps:get(size, Header), State1) - end, - {add_bytes_settle(Header, State2), Effects1}; + {add_bytes_settle(Header, State1), Effects1}; _ -> + State1 = case RawMsg of + 'empty' -> State0; + _ -> add_in_memory_counts(maps:get(size, Header), State0) + end, %% this should not affect the release cursor in any way - {add_bytes_return(RawMsg, - State0#?MODULE{returns = - lqueue:in({MsgNum, Msg}, Returns)}), + {add_bytes_return(Header, + State1#?MODULE{returns = + lqueue:in({MsgNum, Msg}, Returns)}), Effects0} end. @@ -1184,6 +1196,8 @@ return_all(State0, Checked0, Effects0, ConsumerId, Consumer) -> Checked = lists:sort(maps:to_list(Checked0)), lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) -> return_one(0, Msg, S, E, ConsumerId, Consumer); + ({_, {'$empty_msg', _} = Msg}, {S, E}) -> + return_one(0, Msg, S, E, ConsumerId, Consumer); ({_, {MsgNum, Msg}}, {S, E}) -> return_one(MsgNum, Msg, S, E, ConsumerId, Consumer) end, {State0, Effects0}, Checked). @@ -1269,6 +1283,9 @@ append_log_effects(Effects0, AccMap) -> %% %% When we return it is always done to the current return queue %% for both prefix messages and current messages +take_next_msg(#?MODULE{prefix_msgs = {[{'$empty_msg', _} = Msg | Rem], P}} = State) -> + %% there are prefix returns, these should be served first + {Msg, State#?MODULE{prefix_msgs = {Rem, P}}}; take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) -> %% there are prefix returns, these should be served first {{'$prefix_msg', Header}, @@ -1301,10 +1318,16 @@ take_next_msg(#?MODULE{returns = Returns, end end; empty -> - [Header | Rem] = P, - %% There are prefix msgs - {{'$prefix_msg', Header}, - State#?MODULE{prefix_msgs = {R, Rem}}} + [Msg | Rem] = P, + case Msg of + {Header, 'empty'} -> + %% There are prefix msgs + {{'$empty_msg', Header}, + State#?MODULE{prefix_msgs = {R, Rem}}}; + Header -> + {{'$prefix_msg', Header}, + State#?MODULE{prefix_msgs = {R, Rem}}} + end end. send_msg_effect({CTag, CPid}, Msgs) -> @@ -1360,16 +1383,20 @@ checkout_one(#?MODULE{service_queue = SQ0, consumers = Cons}, {State, Msg} = case ConsumerMsg of - {'$prefix_msg', _} -> - {add_bytes_checkout(ConsumerMsg, State1), + {'$prefix_msg', Header} -> + {subtract_in_memory_counts( + Header, add_bytes_checkout(Header, State1)), + ConsumerMsg}; + {'$empty_msg', Header} -> + {add_bytes_checkout(Header, State1), ConsumerMsg}; {_, {_, {Header, 'empty'}} = M} -> - {add_bytes_checkout(maps:get(size, Header), State1), + {add_bytes_checkout(Header, State1), M}; - {_, {_, {Header, RawMsg} = M}} -> + {_, {_, {Header, _} = M}} -> {subtract_in_memory_counts( Header, - add_bytes_checkout(RawMsg, State1)), + add_bytes_checkout(Header, State1)), M} end, {success, ConsumerId, Next, Msg, State}; @@ -1483,13 +1510,19 @@ dehydrate_state(#?MODULE{messages = Messages, %% TODO: optimise this function as far as possible PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) -> [Header | Acc]; + ({'$empty_msg', _} = Msg, Acc) -> + [Msg | Acc]; + ({_, {_, {Header, 'empty'}}}, Acc) -> + [{'$empty_msg', Header} | Acc]; ({_, {_, {Header, _}}}, Acc) -> [Header | Acc] end, lists:reverse(PrefRet0), lqueue:to_list(Returns)), - PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {Header, _}}}, Acc) -> - [Header| Acc] + PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_, 'empty'} = Msg}}, Acc) -> + [Msg | Acc]; + ({_, {_RaftIdx, {Header, _}}}, Acc) -> + [Header | Acc] end, lists:reverse(PrefMsg0), lists:sort(maps:to_list(Messages))), @@ -1509,6 +1542,10 @@ dehydrate_state(#?MODULE{messages = Messages, dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> M; + (_, {'$empty_msg', _} = M) -> + M; + (_, {_, {_, {Header, 'empty'}}}) -> + {'$empty_msg', Header}; (_, {_, {_, {Header, _}}}) -> {'$prefix_msg', Header} end, Checked0), @@ -1569,18 +1606,16 @@ add_bytes_enqueue(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> add_bytes_drop(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}. -add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout, +add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue } = State) -> - Bytes = message_size(Msg), State#?MODULE{msg_bytes_checkout = Checkout + Bytes, msg_bytes_enqueue = Enqueue - Bytes}. add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) -> State#?MODULE{msg_bytes_checkout = Checkout - Bytes}. -add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout, +add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue} = State) -> - Bytes = message_size(Msg), State#?MODULE{msg_bytes_checkout = Checkout - Bytes, msg_bytes_enqueue = Enqueue + Bytes}. @@ -1600,6 +1635,8 @@ message_size(#basic_message{content = Content}) -> iolist_size(PFR); message_size({'$prefix_msg', #{size := B}}) -> B; +message_size({'$empty_msg', #{size := B}}) -> + B; message_size(B) when is_binary(B) -> byte_size(B); message_size(Msg) -> |
