summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-04-04 12:33:37 +0100
committerDiana Corbacho <diana@rabbitmq.com>2019-04-04 12:33:37 +0100
commit27ae52584f210b46138ce2750024d23da7166753 (patch)
tree006b1d33ed74daacfb7b94dab6f901bd2ca7fe68
parent279a35b6aaf9e9b7ce7394500b9641eadfaebd0a (diff)
downloadrabbitmq-server-git-27ae52584f210b46138ce2750024d23da7166753.tar.gz
Apply memory limit on returns
[#164735591]
-rw-r--r--src/rabbit_fifo.erl76
-rw-r--r--test/quorum_queue_SUITE.erl45
2 files changed, 87 insertions, 34 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 129dab77ff..231ecf42c6 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -925,10 +925,10 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
case take_next_msg(State0) of
- {FullMsg = {_MsgId, {RaftIdxToDrop, {#{size := Bytes} = Header, Msg}}},
+ {FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}},
State1} ->
Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
- State2 = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}),
+ State2 = add_bytes_drop(Header, State1#?MODULE{ra_indexes = Indexes}),
State = case Msg of
'empty' -> State2;
_ -> subtract_in_memory_counts(Header, State2)
@@ -936,11 +936,11 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
Effects = dead_letter_effects(maxlen, #{none => FullMsg},
State, Effects0),
{State, Effects};
- {{'$prefix_msg', #{size := Bytes} = Header}, State1} ->
- State2 = subtract_in_memory_counts(Header, add_bytes_drop(Bytes, State1)),
+ {{'$prefix_msg', Header}, State1} ->
+ State2 = subtract_in_memory_counts(Header, add_bytes_drop(Header, State1)),
{State2, Effects0};
- {{'$empty_msg', #{size := Bytes}}, State1} ->
- State2 = add_bytes_drop(Bytes, State1),
+ {{'$empty_msg', Header}, State1} ->
+ State2 = add_bytes_drop(Header, State1),
{State2, Effects0};
empty ->
{State0, Effects0}
@@ -949,16 +949,16 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
- Size = message_size(RawMsg),
+ Header = #{size => message_size(RawMsg)},
{State1, Msg} =
- case evaluate_memory_limit(Size, State0) of
+ case evaluate_memory_limit(Header, State0) of
true ->
- {State0, {RaftIdx, {#{size => Size}, 'empty'}}}; % indexed message with header map
+ {State0, {RaftIdx, {Header, 'empty'}}}; % indexed message with header map
false ->
- {add_in_memory_counts(Size, State0),
- {RaftIdx, {#{size => Size}, RawMsg}}} % indexed message with header map
+ {add_in_memory_counts(Header, State0),
+ {RaftIdx, {Header, RawMsg}}} % indexed message with header map
end,
- State = add_bytes_enqueue(Size, State1),
+ State = add_bytes_enqueue(Header, State1),
State#?MODULE{messages = Messages#{NextMsgNum => Msg},
%% this is probably only done to record it when low_msg_num
%% is undefined
@@ -1168,17 +1168,22 @@ return_one(MsgId, 0, {Tag, Header0},
#consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
1, Header0),
- Msg = {Tag, Header},
+ Msg0 = {Tag, Header},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
- complete(ConsumerId, #{MsgId => Msg}, Con0, Effects0, State0);
+ complete(ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0);
_ ->
%% this should not affect the release cursor in any way
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
- State1 = case Tag of
- '$empty_msg' -> State0;
- _ -> add_in_memory_counts(maps:get(size, Header), State0)
- end,
+ {Msg, State1} = case Tag of
+ '$empty_msg' -> {Msg0, State0};
+ _ -> case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{'$empty_msg', Header}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
+ end,
{add_bytes_return(
Header,
State1#?MODULE{consumers = Consumers#{ConsumerId => Con},
@@ -1193,20 +1198,25 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
#consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
1, Header0),
- Msg = {RaftId, {Header, RawMsg}},
+ Msg0 = {RaftId, {Header, RawMsg}},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
- DlMsg = {MsgNum, Msg},
+ DlMsg = {MsgNum, Msg0},
Effects = dead_letter_effects(delivery_limit, #{none => DlMsg},
State0, Effects0),
complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
_ ->
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
%% this should not affect the release cursor in any way
- State1 = case RawMsg of
- 'empty' -> State0;
- _ -> add_in_memory_counts(maps:get(size, Header), State0)
- end,
+ {Msg, State1} = case RawMsg of
+ 'empty' -> {Msg0, State0};
+ _ -> case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{RaftId, {Header, 'empty'}}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
+ end,
{add_bytes_return(
Header,
State1#?MODULE{consumers = Consumers#{ConsumerId => Con},
@@ -1277,13 +1287,13 @@ evaluate_limit(Result, State0, Effects0) ->
{State0, Result, Effects0}
end.
-evaluate_memory_limit(_Size, #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
- max_in_memory_bytes = undefined}}) ->
+evaluate_memory_limit(_Header, #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
+ max_in_memory_bytes = undefined}}) ->
false;
-evaluate_memory_limit(Size, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
- max_in_memory_bytes = MaxBytes},
- msg_bytes_in_memory = Bytes,
- msgs_ready_in_memory = Length}) ->
+evaluate_memory_limit(#{size := Size}, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
+ max_in_memory_bytes = MaxBytes},
+ msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length}) ->
(Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes).
append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
@@ -1628,10 +1638,10 @@ make_purge_nodes(Nodes) ->
make_update_config(Config) ->
#update_config{config = Config}.
-add_bytes_enqueue(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
+add_bytes_enqueue(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}.
-add_bytes_drop(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
+add_bytes_drop(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}.
add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
@@ -1647,7 +1657,7 @@ add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
msg_bytes_enqueue = Enqueue + Bytes}.
-add_in_memory_counts(Bytes, #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+add_in_memory_counts(#{size := Bytes}, #?MODULE{msg_bytes_in_memory = InMemoryBytes,
msgs_ready_in_memory = InMemoryCount} = State) ->
State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes,
msgs_ready_in_memory = InMemoryCount + 1}.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 6b623304e8..598765e4cb 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -119,6 +119,7 @@ all_tests() ->
queue_length_in_memory_limit_basic_get,
queue_length_in_memory_limit_subscribe,
queue_length_in_memory_limit,
+ queue_length_in_memory_limit_returns,
queue_length_in_memory_bytes_limit_basic_get,
queue_length_in_memory_bytes_limit_subscribe,
queue_length_in_memory_bytes_limit,
@@ -1924,7 +1925,7 @@ queue_length_in_memory_limit(Config) ->
?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
-
+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})),
@@ -1936,6 +1937,48 @@ queue_length_in_memory_limit(Config) ->
?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+queue_length_in_memory_limit_returns(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 2}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+ Msg3 = <<"msg111">>,
+ Msg4 = <<"msg111">>,
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false})),
+
+ {#'basic.get_ok'{delivery_tag = DTag2}, #amqp_msg{payload = Msg2}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+
+ publish(Ch, QQ, Msg3),
+ publish(Ch, QQ, Msg4),
+
+ %% Ensure that returns are subject to in memory limits too
+ wait_for_messages(Config, [[QQ, <<"4">>, <<"2">>, <<"2">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = true,
+ requeue = true}),
+ wait_for_messages(Config, [[QQ, <<"4">>, <<"4">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg3) + byte_size(Msg4)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+
queue_length_in_memory_bytes_limit_basic_get(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),