diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 85 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 10 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 4 |
5 files changed, 104 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a8fd5a5081..fac9d5e50f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -685,6 +685,8 @@ declare_args() -> {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, {<<"x-max-length">>, fun check_non_neg_int_arg/2}, {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}, + {<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2}, + {<<"x-max-in-memory-bytes">>, fun check_non_neg_int_arg/2}, {<<"x-max-priority">>, fun check_max_priority_arg/2}, {<<"x-overflow">>, fun check_overflow/2}, {<<"x-queue-mode">>, fun check_queue_mode/2}, diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index e43e94a385..549da8acbb 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -46,6 +46,7 @@ query_consumers/1, query_stat/1, query_single_active_consumer/1, + query_in_memory_usage/1, usage/1, zero/1, @@ -130,6 +131,8 @@ update_config(Conf, State) -> SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY), MaxLength = maps:get(max_length, Conf, undefined), MaxBytes = maps:get(max_bytes, Conf, undefined), + MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined), + MaxMemoryBytes = maps:get(max_in_memory_bytes, Conf, undefined), DeliveryLimit = maps:get(delivery_limit, Conf, undefined), ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of true -> @@ -143,6 +146,8 @@ update_config(Conf, State) -> become_leader_handler = BLH, max_length = MaxLength, max_bytes = MaxBytes, + max_in_memory_length = MaxMemoryLength, + max_in_memory_bytes = MaxMemoryBytes, consumer_strategy = ConsumerStrategy, delivery_limit = DeliveryLimit}}. @@ -267,6 +272,7 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, {once, 1, simple_prefetch}, State0), {success, _, MsgId, Msg, State2} = checkout_one(State1), + %% TODO handle this checkout_one!!! case Settlement of unsettled -> {_, Pid} = ConsumerId, @@ -698,6 +704,10 @@ query_single_active_consumer(_) -> query_stat(#?MODULE{consumers = Consumers} = State) -> {messages_ready(State), maps:size(Consumers)}. +query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes, + msgs_ready_in_memory = Length}) -> + {Length, Bytes}. + -spec usage(atom()) -> float(). usage(Name) when is_atom(Name) -> case ets:lookup(rabbit_fifo_usage, Name) of @@ -878,11 +888,14 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> case take_next_msg(State0) of - {FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}}, + {FullMsg = {_MsgId, {RaftIdxToDrop, {#{size := Bytes} = Header, Msg}}}, State1} -> Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0), - Bytes = message_size(Msg), - State = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}), + State1 = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}), + State = case Msg of + 'empty' -> subtract_in_memory_counts(Header, State1); + _ -> State1 + end, Effects = dead_letter_effects(maxlen, #{none => FullMsg}, State, Effects0), {State, Effects}; @@ -897,8 +910,15 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> Size = message_size(RawMsg), - Msg = {RaftIdx, {#{size => Size}, RawMsg}}, % indexed message with header map - State = add_bytes_enqueue(Size, State0), + {State1, Msg} = + case evaluate_memory_limit(Size, State0) of + true -> + {State0, {RaftIdx, {#{size => Size}, 'empty'}}}; % indexed message with header map + false -> + {add_in_memory_counts(Size, State0), + {RaftIdx, {#{size => Size}, RawMsg}}} % indexed message with header map + end, + State = add_bytes_enqueue(Size, State1), State#?MODULE{messages = Messages#{NextMsgNum => Msg}, % this is probably only done to record it when low_msg_num % is undefined @@ -1033,10 +1053,10 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, Checked = maps:without(MsgIds, Checked0), Discarded = maps:with(MsgIds, Checked0), MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], - State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) -> - add_bytes_settle(RawMsg, Acc); - ({'$prefix_msg', _} = M, Acc) -> - add_bytes_settle(M, Acc) + State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> + add_bytes_settle(Header, Acc); + ({'$prefix_msg', Header}, Acc) -> + add_bytes_settle(Header, Acc) end, State0, maps:values(Discarded)), %% need to pass the length of discarded as $prefix_msgs would be filtered %% by the above list comprehension @@ -1115,7 +1135,7 @@ return_one(0, {'$prefix_msg', Header0}, Checked = Con#consumer.checked_out, {State1, Effects} = complete(ConsumerId, [], 1, Con, Checked, Effects0, State0), - {add_bytes_settle(Msg, State1), Effects}; + {add_bytes_settle(Header, State1), Effects}; _ -> %% this should not affect the release cursor in any way {add_bytes_return(Msg, @@ -1139,7 +1159,11 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, Checked = Con#consumer.checked_out, {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, Effects, State0), - {add_bytes_settle(RawMsg, State1), Effects1}; + State2 = case RawMsg of + 'empty' -> State1; + _ -> add_in_memory_counts(maps:get(size, Header), State1) + end, + {add_bytes_settle(Header, State2), Effects1}; _ -> %% this should not affect the release cursor in any way {add_bytes_return(RawMsg, @@ -1171,6 +1195,8 @@ checkout(#{index := Index}, State0, Effects0) -> {State, ok, Effects} end. +checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, Effects, Acc) -> + checkout0(checkout_one(State), [send_log_effect(ConsumerId, RaftIdx, MsgId, Header) | Effects], Acc); checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) -> DelMsg = {MsgId, Msg}, Acc = maps:update_with(ConsumerId, @@ -1202,6 +1228,15 @@ evaluate_limit(OldIndexes, Result, {State0, Result, Effects0} end. +evaluate_memory_limit(_Size, #?MODULE{cfg = #cfg{max_in_memory_length = undefined, + max_in_memory_bytes = undefined}}) -> + false; +evaluate_memory_limit(Size, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength, + max_in_memory_bytes = MaxBytes}, + msg_bytes_in_memory = Bytes, + msgs_ready_in_memory = Length}) -> + (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes). + append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 -> Effects; append_send_msg_effects(Effects0, AccMap) -> @@ -1259,6 +1294,11 @@ take_next_msg(#?MODULE{returns = Returns, send_msg_effect({CTag, CPid}, Msgs) -> {send_msg, CPid, {delivery, CTag, Msgs}, ra_event}. +send_log_effect({CTag, CPid}, RaftIdx, MsgId, Header) -> + {log, RaftIdx, fun({enqueue, _, _, Msg}) -> + {send_msg, CPid, {delivery, CTag, [{MsgId, {Header, Msg}}]}, ra_event} + end}. + checkout_one(#?MODULE{service_queue = SQ0, messages = Messages0, consumers = Cons0} = InitState) -> @@ -1298,8 +1338,13 @@ checkout_one(#?MODULE{service_queue = SQ0, {'$prefix_msg', _} -> {add_bytes_checkout(ConsumerMsg, State1), ConsumerMsg}; - {_, {_, {_, RawMsg} = M}} -> - {add_bytes_checkout(RawMsg, State1), + {_, {_, {Header, 'empty'}} = M} -> + {add_bytes_checkout(maps:get(size, Header), State1), + M}; + {_, {_, {Header, RawMsg} = M}} -> + {subtract_in_memory_counts( + Header, + add_bytes_checkout(RawMsg, State1)), M} end, {success, ConsumerId, Next, Msg, State}; @@ -1505,8 +1550,7 @@ add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout, State#?MODULE{msg_bytes_checkout = Checkout + Bytes, msg_bytes_enqueue = Enqueue - Bytes}. -add_bytes_settle(Msg, #?MODULE{msg_bytes_checkout = Checkout} = State) -> - Bytes = message_size(Msg), +add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) -> State#?MODULE{msg_bytes_checkout = Checkout - Bytes}. add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout, @@ -1515,6 +1559,17 @@ add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout, State#?MODULE{msg_bytes_checkout = Checkout - Bytes, msg_bytes_enqueue = Enqueue + Bytes}. +add_in_memory_counts(Bytes, #?MODULE{msg_bytes_in_memory = InMemoryBytes, + msgs_ready_in_memory = InMemoryCount} = State) -> + State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes, + msgs_ready_in_memory = InMemoryCount + 1}. + +subtract_in_memory_counts(#{size := Bytes}, + #?MODULE{msg_bytes_in_memory = InMemoryBytes, + msgs_ready_in_memory = InMemoryCount} = State) -> + State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes, + msgs_ready_in_memory = InMemoryCount - 1}. + message_size(#basic_message{content = Content}) -> #content{payload_fragments_rev = PFR} = Content, iolist_size(PFR); diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index 968ae07739..c6c58820c6 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -108,7 +108,9 @@ %% whether single active consumer is on or not for this queue consumer_strategy = competing :: consumer_strategy(), %% the maximum number of unsuccessful delivery attempts permitted - delivery_limit :: maybe(non_neg_integer()) + delivery_limit :: maybe(non_neg_integer()), + max_in_memory_length :: maybe(non_neg_integer()), + max_in_memory_bytes :: maybe(non_neg_integer()) }). -record(rabbit_fifo, @@ -159,7 +161,9 @@ msg_bytes_checkout = 0 :: non_neg_integer(), %% waiting consumers, one is picked active consumer is cancelled or dies %% used only when single active consumer is on - waiting_consumers = [] :: [{consumer_id(), consumer()}] + waiting_consumers = [] :: [{consumer_id(), consumer()}], + msg_bytes_in_memory = 0 :: non_neg_integer(), + msgs_ready_in_memory = 0 :: non_neg_integer() }). -type config() :: #{name := atom(), @@ -169,5 +173,7 @@ release_cursor_interval => non_neg_integer(), max_length => non_neg_integer(), max_bytes => non_neg_integer(), + max_in_memory_length => non_neg_integer(), + max_in_memory_bytes => non_neg_integer(), single_active_consumer_on => boolean(), delivery_limit => non_neg_integer()}. diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index b4501dbf84..7878bed02d 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -41,6 +41,8 @@ register() -> {policy_validator, <<"expires">>}, {policy_validator, <<"max-length">>}, {policy_validator, <<"max-length-bytes">>}, + {policy_validator, <<"max-in-memory-length">>}, + {policy_validator, <<"max-in-memory-bytes">>}, {policy_validator, <<"queue-mode">>}, {policy_validator, <<"overflow">>}, {policy_validator, <<"delivery-limit">>}, @@ -48,11 +50,15 @@ register() -> {operator_policy_validator, <<"message-ttl">>}, {operator_policy_validator, <<"max-length">>}, {operator_policy_validator, <<"max-length-bytes">>}, + {operator_policy_validator, <<"max-in-memory-length">>}, + {operator_policy_validator, <<"max-in-memory-bytes">>}, {operator_policy_validator, <<"delivery-limit">>}, {policy_merge_strategy, <<"expires">>}, {policy_merge_strategy, <<"message-ttl">>}, {policy_merge_strategy, <<"max-length">>}, {policy_merge_strategy, <<"max-length-bytes">>}, + {policy_merge_strategy, <<"max-in-memory-length">>}, + {policy_merge_strategy, <<"max-in-memory-bytes">>}, {policy_merge_strategy, <<"delivery-limit">>}]], ok. @@ -103,6 +109,18 @@ validate_policy0(<<"max-length-bytes">>, Value) validate_policy0(<<"max-length-bytes">>, Value) -> {error, "~p is not a valid maximum length in bytes", [Value]}; +validate_policy0(<<"max-in-memory-length">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"max-in-memory-length">>, Value) -> + {error, "~p is not a valid maximum memory in bytes", [Value]}; + +validate_policy0(<<"max-in-memory-bytes">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"max-in-memory-bytes">>, Value) -> + {error, "~p is not a valid maximum memory in bytes", [Value]}; + validate_policy0(<<"queue-mode">>, <<"default">>) -> ok; validate_policy0(<<"queue-mode">>, <<"lazy">>) -> @@ -125,5 +143,7 @@ validate_policy0(<<"delivery-limit">>, Value) -> merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal); +merge_policy_value(<<"max-in-memory-length">>, Val, OpVal) -> min(Val, OpVal); +merge_policy_value(<<"max-in-memory-bytes">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 260e36d510..b640e04ed4 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -164,6 +164,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> %% take the minimum value of the policy and the queue arg if present MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), + MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q), + MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q), DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q), #{name => Name, queue_resource => QName, @@ -171,6 +173,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> become_leader_handler => {?MODULE, become_leader, [QName]}, max_length => MaxLength, max_bytes => MaxBytes, + max_in_memory_length => MaxMemoryLength, + max_in_memory_bytes => MaxMemoryBytes, single_active_consumer_on => single_active_consumer_on(Q), delivery_limit => DeliveryLimit }. |
