summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-04-02 12:26:28 +0100
committerDiana Corbacho <diana@rabbitmq.com>2019-04-03 16:32:32 +0100
commit19f8dec83bfa899cf7d43e532ca8a52c73213a97 (patch)
tree0876af93cb80b793c44c57110a33e48e55e5b68a /src
parent74aac975655b4d419ac53c911768048234edbb9a (diff)
downloadrabbitmq-server-git-19f8dec83bfa899cf7d43e532ca8a52c73213a97.tar.gz
Fix in-memory counters
[#164735591]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl113
1 files changed, 75 insertions, 38 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 0f957b4a19..d397459d4f 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -897,17 +897,20 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
{FullMsg = {_MsgId, {RaftIdxToDrop, {#{size := Bytes} = Header, Msg}}},
State1} ->
Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
- State1 = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}),
+ State2 = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}),
State = case Msg of
- 'empty' -> subtract_in_memory_counts(Header, State1);
- _ -> State1
+ 'empty' -> State2;
+ _ -> subtract_in_memory_counts(Header, State2)
end,
Effects = dead_letter_effects(maxlen, #{none => FullMsg},
State, Effects0),
{State, Effects};
- {{'$prefix_msg', #{size := Bytes}}, State1} ->
- State = add_bytes_drop(Bytes, State1),
- {State, Effects0};
+ {{'$prefix_msg', #{size := Bytes} = Header}, State1} ->
+ State2 = subtract_in_memory_counts(Header, add_bytes_drop(Bytes, State1)),
+ {State2, Effects0};
+ {{'$empty_msg', #{size := Bytes}}, State1} ->
+ State2 = add_bytes_drop(Bytes, State1),
+ {State2, Effects0};
empty ->
{State0, Effects0}
end.
@@ -926,10 +929,10 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
end,
State = add_bytes_enqueue(Size, State1),
State#?MODULE{messages = Messages#{NextMsgNum => Msg},
- % this is probably only done to record it when low_msg_num
- % is undefined
- low_msg_num = min(LowMsgNum, NextMsgNum),
- next_msg_num = NextMsgNum + 1}.
+ %% this is probably only done to record it when low_msg_num
+ %% is undefined
+ low_msg_num = min(LowMsgNum, NextMsgNum),
+ next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
#?MODULE{ra_indexes = Indexes0} = State0) ->
@@ -1016,6 +1019,9 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
return_one(0, Msg, S0, E0,
ConsumerId, Con);
+ ({'$empty_msg', _} = Msg, {S0, E0}) ->
+ return_one(0, Msg, S0, E0,
+ ConsumerId, Con);
({MsgNum, Msg}, {S0, E0}) ->
return_one(MsgNum, Msg, S0, E0,
ConsumerId, Con)
@@ -1062,6 +1068,8 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
add_bytes_settle(Header, Acc);
({'$prefix_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$empty_msg', Header}, Acc) ->
add_bytes_settle(Header, Acc)
end, State0, maps:values(Discarded)),
%% need to pass the length of discarded as $prefix_msgs would be filtered
@@ -1128,14 +1136,14 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
{Potential, Cursors0}
end.
-return_one(0, {'$prefix_msg', Header0},
+return_one(0, {Tag, Header0},
#?MODULE{returns = Returns,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
- Effects0, ConsumerId, Con) ->
+ Effects0, ConsumerId, Con) when Tag == '$prefix_msg'; Tag == '$empty_msg'->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
- Msg = {'$prefix_msg', Header},
+ Msg = {Tag, Header},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
Checked = Con#consumer.checked_out,
@@ -1144,8 +1152,12 @@ return_one(0, {'$prefix_msg', Header0},
{add_bytes_settle(Header, State1), Effects};
_ ->
%% this should not affect the release cursor in any way
- {add_bytes_return(Msg,
- State0#?MODULE{returns = lqueue:in(Msg, Returns)}),
+ State1 = case Tag of
+ '$empty_msg' -> State0;
+ _ -> add_in_memory_counts(maps:get(size, Header), State0)
+ end,
+ {add_bytes_return(Header,
+ State1#?MODULE{returns = lqueue:in(Msg, Returns)}),
Effects0}
end;
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
@@ -1165,16 +1177,16 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
Checked = Con#consumer.checked_out,
{State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked,
Effects, State0),
- State2 = case RawMsg of
- 'empty' -> State1;
- _ -> add_in_memory_counts(maps:get(size, Header), State1)
- end,
- {add_bytes_settle(Header, State2), Effects1};
+ {add_bytes_settle(Header, State1), Effects1};
_ ->
+ State1 = case RawMsg of
+ 'empty' -> State0;
+ _ -> add_in_memory_counts(maps:get(size, Header), State0)
+ end,
%% this should not affect the release cursor in any way
- {add_bytes_return(RawMsg,
- State0#?MODULE{returns =
- lqueue:in({MsgNum, Msg}, Returns)}),
+ {add_bytes_return(Header,
+ State1#?MODULE{returns =
+ lqueue:in({MsgNum, Msg}, Returns)}),
Effects0}
end.
@@ -1184,6 +1196,8 @@ return_all(State0, Checked0, Effects0, ConsumerId, Consumer) ->
Checked = lists:sort(maps:to_list(Checked0)),
lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) ->
return_one(0, Msg, S, E, ConsumerId, Consumer);
+ ({_, {'$empty_msg', _} = Msg}, {S, E}) ->
+ return_one(0, Msg, S, E, ConsumerId, Consumer);
({_, {MsgNum, Msg}}, {S, E}) ->
return_one(MsgNum, Msg, S, E, ConsumerId, Consumer)
end, {State0, Effects0}, Checked).
@@ -1269,6 +1283,9 @@ append_log_effects(Effects0, AccMap) ->
%%
%% When we return it is always done to the current return queue
%% for both prefix messages and current messages
+take_next_msg(#?MODULE{prefix_msgs = {[{'$empty_msg', _} = Msg | Rem], P}} = State) ->
+ %% there are prefix returns, these should be served first
+ {Msg, State#?MODULE{prefix_msgs = {Rem, P}}};
take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) ->
%% there are prefix returns, these should be served first
{{'$prefix_msg', Header},
@@ -1301,10 +1318,16 @@ take_next_msg(#?MODULE{returns = Returns,
end
end;
empty ->
- [Header | Rem] = P,
- %% There are prefix msgs
- {{'$prefix_msg', Header},
- State#?MODULE{prefix_msgs = {R, Rem}}}
+ [Msg | Rem] = P,
+ case Msg of
+ {Header, 'empty'} ->
+ %% There are prefix msgs
+ {{'$empty_msg', Header},
+ State#?MODULE{prefix_msgs = {R, Rem}}};
+ Header ->
+ {{'$prefix_msg', Header},
+ State#?MODULE{prefix_msgs = {R, Rem}}}
+ end
end.
send_msg_effect({CTag, CPid}, Msgs) ->
@@ -1360,16 +1383,20 @@ checkout_one(#?MODULE{service_queue = SQ0,
consumers = Cons},
{State, Msg} =
case ConsumerMsg of
- {'$prefix_msg', _} ->
- {add_bytes_checkout(ConsumerMsg, State1),
+ {'$prefix_msg', Header} ->
+ {subtract_in_memory_counts(
+ Header, add_bytes_checkout(Header, State1)),
+ ConsumerMsg};
+ {'$empty_msg', Header} ->
+ {add_bytes_checkout(Header, State1),
ConsumerMsg};
{_, {_, {Header, 'empty'}} = M} ->
- {add_bytes_checkout(maps:get(size, Header), State1),
+ {add_bytes_checkout(Header, State1),
M};
- {_, {_, {Header, RawMsg} = M}} ->
+ {_, {_, {Header, _} = M}} ->
{subtract_in_memory_counts(
Header,
- add_bytes_checkout(RawMsg, State1)),
+ add_bytes_checkout(Header, State1)),
M}
end,
{success, ConsumerId, Next, Msg, State};
@@ -1483,13 +1510,19 @@ dehydrate_state(#?MODULE{messages = Messages,
%% TODO: optimise this function as far as possible
PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) ->
[Header | Acc];
+ ({'$empty_msg', _} = Msg, Acc) ->
+ [Msg | Acc];
+ ({_, {_, {Header, 'empty'}}}, Acc) ->
+ [{'$empty_msg', Header} | Acc];
({_, {_, {Header, _}}}, Acc) ->
[Header | Acc]
end,
lists:reverse(PrefRet0),
lqueue:to_list(Returns)),
- PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {Header, _}}}, Acc) ->
- [Header| Acc]
+ PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_, 'empty'} = Msg}}, Acc) ->
+ [Msg | Acc];
+ ({_, {_RaftIdx, {Header, _}}}, Acc) ->
+ [Header | Acc]
end,
lists:reverse(PrefMsg0),
lists:sort(maps:to_list(Messages))),
@@ -1509,6 +1542,10 @@ dehydrate_state(#?MODULE{messages = Messages,
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
M;
+ (_, {'$empty_msg', _} = M) ->
+ M;
+ (_, {_, {_, {Header, 'empty'}}}) ->
+ {'$empty_msg', Header};
(_, {_, {_, {Header, _}}}) ->
{'$prefix_msg', Header}
end, Checked0),
@@ -1569,18 +1606,16 @@ add_bytes_enqueue(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
add_bytes_drop(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}.
-add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout,
+add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State) ->
- Bytes = message_size(Msg),
State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
msg_bytes_enqueue = Enqueue - Bytes}.
add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) ->
State#?MODULE{msg_bytes_checkout = Checkout - Bytes}.
-add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout,
+add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue} = State) ->
- Bytes = message_size(Msg),
State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
msg_bytes_enqueue = Enqueue + Bytes}.
@@ -1600,6 +1635,8 @@ message_size(#basic_message{content = Content}) ->
iolist_size(PFR);
message_size({'$prefix_msg', #{size := B}}) ->
B;
+message_size({'$empty_msg', #{size := B}}) ->
+ B;
message_size(B) when is_binary(B) ->
byte_size(B);
message_size(Msg) ->