diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-04-04 12:33:37 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-04-04 12:33:37 +0100 |
| commit | 27ae52584f210b46138ce2750024d23da7166753 (patch) | |
| tree | 006b1d33ed74daacfb7b94dab6f901bd2ca7fe68 | |
| parent | 279a35b6aaf9e9b7ce7394500b9641eadfaebd0a (diff) | |
| download | rabbitmq-server-git-27ae52584f210b46138ce2750024d23da7166753.tar.gz | |
Apply memory limit on returns
[#164735591]
| -rw-r--r-- | src/rabbit_fifo.erl | 76 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 45 |
2 files changed, 87 insertions, 34 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 129dab77ff..231ecf42c6 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -925,10 +925,10 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> case take_next_msg(State0) of - {FullMsg = {_MsgId, {RaftIdxToDrop, {#{size := Bytes} = Header, Msg}}}, + {FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}}, State1} -> Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0), - State2 = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}), + State2 = add_bytes_drop(Header, State1#?MODULE{ra_indexes = Indexes}), State = case Msg of 'empty' -> State2; _ -> subtract_in_memory_counts(Header, State2) @@ -936,11 +936,11 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> Effects = dead_letter_effects(maxlen, #{none => FullMsg}, State, Effects0), {State, Effects}; - {{'$prefix_msg', #{size := Bytes} = Header}, State1} -> - State2 = subtract_in_memory_counts(Header, add_bytes_drop(Bytes, State1)), + {{'$prefix_msg', Header}, State1} -> + State2 = subtract_in_memory_counts(Header, add_bytes_drop(Header, State1)), {State2, Effects0}; - {{'$empty_msg', #{size := Bytes}}, State1} -> - State2 = add_bytes_drop(Bytes, State1), + {{'$empty_msg', Header}, State1} -> + State2 = add_bytes_drop(Header, State1), {State2, Effects0}; empty -> {State0, Effects0} @@ -949,16 +949,16 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> - Size = message_size(RawMsg), + Header = #{size => message_size(RawMsg)}, {State1, Msg} = - case evaluate_memory_limit(Size, State0) of + case evaluate_memory_limit(Header, State0) of true -> - {State0, {RaftIdx, {#{size => Size}, 'empty'}}}; % indexed message with header map + {State0, {RaftIdx, {Header, 'empty'}}}; % indexed message with header map false -> - {add_in_memory_counts(Size, State0), - {RaftIdx, {#{size => Size}, RawMsg}}} % indexed message with header map + {add_in_memory_counts(Header, State0), + {RaftIdx, {Header, RawMsg}}} % indexed message with header map end, - State = add_bytes_enqueue(Size, State1), + State = add_bytes_enqueue(Header, State1), State#?MODULE{messages = Messages#{NextMsgNum => Msg}, %% this is probably only done to record it when low_msg_num %% is undefined @@ -1168,17 +1168,22 @@ return_one(MsgId, 0, {Tag, Header0}, #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), - Msg = {Tag, Header}, + Msg0 = {Tag, Header}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> - complete(ConsumerId, #{MsgId => Msg}, Con0, Effects0, State0); + complete(ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0); _ -> %% this should not affect the release cursor in any way Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, - State1 = case Tag of - '$empty_msg' -> State0; - _ -> add_in_memory_counts(maps:get(size, Header), State0) - end, + {Msg, State1} = case Tag of + '$empty_msg' -> {Msg0, State0}; + _ -> case evaluate_memory_limit(Header, State0) of + true -> + {{'$empty_msg', Header}, State0}; + false -> + {Msg0, add_in_memory_counts(Header, State0)} + end + end, {add_bytes_return( Header, State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, @@ -1193,20 +1198,25 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), - Msg = {RaftId, {Header, RawMsg}}, + Msg0 = {RaftId, {Header, RawMsg}}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> - DlMsg = {MsgNum, Msg}, + DlMsg = {MsgNum, Msg0}, Effects = dead_letter_effects(delivery_limit, #{none => DlMsg}, State0, Effects0), complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0); _ -> Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, %% this should not affect the release cursor in any way - State1 = case RawMsg of - 'empty' -> State0; - _ -> add_in_memory_counts(maps:get(size, Header), State0) - end, + {Msg, State1} = case RawMsg of + 'empty' -> {Msg0, State0}; + _ -> case evaluate_memory_limit(Header, State0) of + true -> + {{RaftId, {Header, 'empty'}}, State0}; + false -> + {Msg0, add_in_memory_counts(Header, State0)} + end + end, {add_bytes_return( Header, State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, @@ -1277,13 +1287,13 @@ evaluate_limit(Result, State0, Effects0) -> {State0, Result, Effects0} end. -evaluate_memory_limit(_Size, #?MODULE{cfg = #cfg{max_in_memory_length = undefined, - max_in_memory_bytes = undefined}}) -> +evaluate_memory_limit(_Header, #?MODULE{cfg = #cfg{max_in_memory_length = undefined, + max_in_memory_bytes = undefined}}) -> false; -evaluate_memory_limit(Size, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength, - max_in_memory_bytes = MaxBytes}, - msg_bytes_in_memory = Bytes, - msgs_ready_in_memory = Length}) -> +evaluate_memory_limit(#{size := Size}, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength, + max_in_memory_bytes = MaxBytes}, + msg_bytes_in_memory = Bytes, + msgs_ready_in_memory = Length}) -> (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes). append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 -> @@ -1628,10 +1638,10 @@ make_purge_nodes(Nodes) -> make_update_config(Config) -> #update_config{config = Config}. -add_bytes_enqueue(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> +add_bytes_enqueue(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}. -add_bytes_drop(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> +add_bytes_drop(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}. add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout, @@ -1647,7 +1657,7 @@ add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout, State#?MODULE{msg_bytes_checkout = Checkout - Bytes, msg_bytes_enqueue = Enqueue + Bytes}. -add_in_memory_counts(Bytes, #?MODULE{msg_bytes_in_memory = InMemoryBytes, +add_in_memory_counts(#{size := Bytes}, #?MODULE{msg_bytes_in_memory = InMemoryBytes, msgs_ready_in_memory = InMemoryCount} = State) -> State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes, msgs_ready_in_memory = InMemoryCount + 1}. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 6b623304e8..598765e4cb 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -119,6 +119,7 @@ all_tests() -> queue_length_in_memory_limit_basic_get, queue_length_in_memory_limit_subscribe, queue_length_in_memory_limit, + queue_length_in_memory_limit_returns, queue_length_in_memory_bytes_limit_basic_get, queue_length_in_memory_bytes_limit_subscribe, queue_length_in_memory_bytes_limit, @@ -1924,7 +1925,7 @@ queue_length_in_memory_limit(Config) -> ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), - + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = true})), @@ -1936,6 +1937,48 @@ queue_length_in_memory_limit(Config) -> ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}], dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). +queue_length_in_memory_limit_returns(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 2}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + Msg3 = <<"msg111">>, + Msg4 = <<"msg111">>, + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false})), + + {#'basic.get_ok'{delivery_tag = DTag2}, #amqp_msg{payload = Msg2}} = + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false}), + + publish(Ch, QQ, Msg3), + publish(Ch, QQ, Msg4), + + %% Ensure that returns are subject to in memory limits too + wait_for_messages(Config, [[QQ, <<"4">>, <<"2">>, <<"2">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, + multiple = true, + requeue = true}), + wait_for_messages(Config, [[QQ, <<"4">>, <<"4">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg3) + byte_size(Msg4)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). + queue_length_in_memory_bytes_limit_basic_get(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
