diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 270 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 14 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 18 |
6 files changed, 251 insertions, 75 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 1210e467ec..b3b47ccee6 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -540,6 +540,8 @@ start_loaded_apps(Apps, RestartTypes) -> %% make Ra use a custom logger that dispatches to lager instead of the %% default OTP logger application:set_env(ra, logger_module, rabbit_log_ra_shim), + %% use a larger segments size for queues + application:set_env(ra, segment_max_entries, 32768), ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of undefined -> []; diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a8fd5a5081..fac9d5e50f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -685,6 +685,8 @@ declare_args() -> {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, {<<"x-max-length">>, fun check_non_neg_int_arg/2}, {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}, + {<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2}, + {<<"x-max-in-memory-bytes">>, fun check_non_neg_int_arg/2}, {<<"x-max-priority">>, fun check_max_priority_arg/2}, {<<"x-overflow">>, fun check_overflow/2}, {<<"x-queue-mode">>, fun check_queue_mode/2}, diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 1d53061a8a..609fa0111c 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -46,6 +46,7 @@ query_consumers/1, query_stat/1, query_single_active_consumer/1, + query_in_memory_usage/1, usage/1, zero/1, @@ -125,7 +126,7 @@ init(#{name := Name, queue_resource := Resource} = Conf) -> update_config(Conf, #?MODULE{cfg = #cfg{name = Name, - resource = Resource}}). + resource = Resource}}). update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), @@ -133,6 +134,8 @@ update_config(Conf, State) -> SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY), MaxLength = maps:get(max_length, Conf, undefined), MaxBytes = maps:get(max_bytes, Conf, undefined), + MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined), + MaxMemoryBytes = maps:get(max_in_memory_bytes, Conf, undefined), DeliveryLimit = maps:get(delivery_limit, Conf, undefined), ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of true -> @@ -146,6 +149,8 @@ update_config(Conf, State) -> become_leader_handler = BLH, max_length = MaxLength, max_bytes = MaxBytes, + max_in_memory_length = MaxMemoryLength, + max_in_memory_bytes = MaxMemoryBytes, consumer_strategy = ConsumerStrategy, delivery_limit = DeliveryLimit}}. @@ -252,9 +257,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, %% credit for unknown consumer - just ignore {State0, ok} end; -apply(Meta, #checkout{spec = {dequeue, Settlement}, - meta = ConsumerMeta, - consumer_id = ConsumerId}, +apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement}, + meta = ConsumerMeta, + consumer_id = ConsumerId}, #?MODULE{consumers = Consumers} = State0) -> Exists = maps:is_key(ConsumerId, Consumers), case messages_ready(State0) of @@ -268,16 +273,23 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, {once, 1, simple_prefetch}, State0), {success, _, MsgId, Msg, State2} = checkout_one(State1), - case Settlement of - unsettled -> - {_, Pid} = ConsumerId, - {State2, {dequeue, {MsgId, Msg}, Ready-1}, - [{monitor, process, Pid}]}; - settled -> - %% immediately settle the checkout - {State, _, Effects} = - apply(Meta, make_settle(ConsumerId, [MsgId]), - State2), + {State, Effects} = case Settlement of + unsettled -> + {_, Pid} = ConsumerId, + {State2, [{monitor, process, Pid}]}; + settled -> + %% immediately settle the checkout + {State3, _, Effects0} = + apply(Meta, make_settle(ConsumerId, [MsgId]), + State2), + {State3, Effects0} + end, + case Msg of + {RaftIdx, {Header, 'empty'}} -> + %% TODO add here new log effect with reply + {State, '$ra_no_reply', + reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)}; + _ -> {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects} end end; @@ -305,7 +317,9 @@ apply(#{index := RaftIdx}, #purge{}, returns = lqueue:new(), msg_bytes_enqueue = 0, prefix_msgs = {[], []}, - low_msg_num = undefined}, + low_msg_num = undefined, + msg_bytes_in_memory = 0, + msgs_ready_in_memory = 0}, []), %% as we're not checking out after a purge (no point) we have to %% reverse the effects ourselves @@ -728,6 +742,10 @@ query_single_active_consumer(_) -> query_stat(#?MODULE{consumers = Consumers} = State) -> {messages_ready(State), maps:size(Consumers)}. +query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes, + msgs_ready_in_memory = Length}) -> + {Length, Bytes}. + -spec usage(atom()) -> float(). usage(Name) when is_atom(Name) -> case ets:lookup(rabbit_fifo_usage, Name) of @@ -909,17 +927,23 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> case take_next_msg(State0) of - {FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}}, + {FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}}, State1} -> Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0), - Bytes = message_size(Msg), - State = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}), + State2 = add_bytes_drop(Header, State1#?MODULE{ra_indexes = Indexes}), + State = case Msg of + 'empty' -> State2; + _ -> subtract_in_memory_counts(Header, State2) + end, Effects = dead_letter_effects(maxlen, #{none => FullMsg}, State, Effects0), {State, Effects}; - {{'$prefix_msg', #{size := Bytes}}, State1} -> - State = add_bytes_drop(Bytes, State1), - {State, Effects0}; + {{'$prefix_msg', Header}, State1} -> + State2 = subtract_in_memory_counts(Header, add_bytes_drop(Header, State1)), + {State2, Effects0}; + {{'$empty_msg', Header}, State1} -> + State2 = add_bytes_drop(Header, State1), + {State2, Effects0}; empty -> {State0, Effects0} end. @@ -927,14 +951,21 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> - Size = message_size(RawMsg), - Msg = {RaftIdx, {#{size => Size}, RawMsg}}, % indexed message with header map - State = add_bytes_enqueue(Size, State0), + Header = #{size => message_size(RawMsg)}, + {State1, Msg} = + case evaluate_memory_limit(Header, State0) of + true -> + {State0, {RaftIdx, {Header, 'empty'}}}; % indexed message with header map + false -> + {add_in_memory_counts(Header, State0), + {RaftIdx, {Header, RawMsg}}} % indexed message with header map + end, + State = add_bytes_enqueue(Header, State1), State#?MODULE{messages = Messages#{NextMsgNum => Msg}, - % this is probably only done to record it when low_msg_num - % is undefined - low_msg_num = min(LowMsgNum, NextMsgNum), - next_msg_num = NextMsgNum + 1}. + %% this is probably only done to record it when low_msg_num + %% is undefined + low_msg_num = min(LowMsgNum, NextMsgNum), + next_msg_num = NextMsgNum + 1}. append_to_master_index(RaftIdx, #?MODULE{ra_indexes = Indexes0} = State0) -> @@ -1014,7 +1045,8 @@ snd(T) -> return(Meta, ConsumerId, Returned, Effects0, #?MODULE{service_queue = SQ0} = State0) -> {State1, Effects1} = maps:fold( - fun(MsgId, {'$prefix_msg', _} = Msg, {S0, E0}) -> + fun(MsgId, {Tag, _} = Msg, {S0, E0}) when Tag == '$prefix_msg'; + Tag == '$empty_msg'-> return_one(MsgId, 0, Msg, S0, E0, ConsumerId); (MsgId, {MsgNum, Msg}, {S0, E0}) -> return_one(MsgId, MsgNum, Msg, S0, E0, @@ -1042,10 +1074,12 @@ complete(ConsumerId, Discarded, SQ0, Effects0), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), - State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) -> - add_bytes_settle(RawMsg, Acc); - ({'$prefix_msg', _} = M, Acc) -> - add_bytes_settle(M, Acc) + State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> + add_bytes_settle(Header, Acc); + ({'$prefix_msg', Header}, Acc) -> + add_bytes_settle(Header, Acc); + ({'$empty_msg', Header}, Acc) -> + add_bytes_settle(Header, Acc) end, State0, maps:values(Discarded)), {State1#?MODULE{consumers = Cons, ra_indexes = Indexes, @@ -1128,24 +1162,33 @@ find_next_cursor(Smallest, Cursors0, Potential) -> {Potential, Cursors0} end. -return_one(MsgId, 0, {'$prefix_msg', Header0}, +return_one(MsgId, 0, {Tag, Header0}, #?MODULE{returns = Returns, consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, - Effects0, ConsumerId) -> + Effects0, ConsumerId) when Tag == '$prefix_msg'; Tag == '$empty_msg' -> #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), - Msg = {'$prefix_msg', Header}, + Msg0 = {Tag, Header}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> - complete(ConsumerId, #{MsgId => Msg}, Con0, Effects0, State0); + complete(ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0); _ -> %% this should not affect the release cursor in any way Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, + {Msg, State1} = case Tag of + '$empty_msg' -> {Msg0, State0}; + _ -> case evaluate_memory_limit(Header, State0) of + true -> + {{'$empty_msg', Header}, State0}; + false -> + {Msg0, add_in_memory_counts(Header, State0)} + end + end, {add_bytes_return( - Msg, - State0#?MODULE{consumers = Consumers#{ConsumerId => Con}, + Header, + State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, returns = lqueue:in(Msg, Returns)}), Effects0} end; @@ -1157,19 +1200,28 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), - Msg = {RaftId, {Header, RawMsg}}, + Msg0 = {RaftId, {Header, RawMsg}}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> - DlMsg = {MsgNum, Msg}, + DlMsg = {MsgNum, Msg0}, Effects = dead_letter_effects(delivery_limit, #{none => DlMsg}, State0, Effects0), complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0); _ -> Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, %% this should not affect the release cursor in any way + {Msg, State1} = case RawMsg of + 'empty' -> {Msg0, State0}; + _ -> case evaluate_memory_limit(Header, State0) of + true -> + {{RaftId, {Header, 'empty'}}, State0}; + false -> + {Msg0, add_in_memory_counts(Header, State0)} + end + end, {add_bytes_return( - RawMsg, - State0#?MODULE{consumers = Consumers#{ConsumerId => Con}, + Header, + State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} end. @@ -1182,6 +1234,8 @@ return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}}, lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) -> return_one(MsgId, 0, Msg, S, E, ConsumerId); + ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) -> + return_one(MsgId, 0, Msg, S, E, ConsumerId); ({MsgId, {MsgNum, Msg}}, {S, E}) -> return_one(MsgId, MsgNum, Msg, S, E, ConsumerId) end, {State, Effects0}, Checked). @@ -1190,7 +1244,7 @@ return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, %% reverses the effects list checkout(#{index := Index}, State0, Effects0) -> {State1, _Result, Effects1} = checkout0(checkout_one(State0), - Effects0, #{}), + Effects0, {#{}, #{}}), case evaluate_limit(false, State1, Effects1) of {State, true, Effects} -> update_smallest_raft_index(Index, State, Effects); @@ -1198,19 +1252,26 @@ checkout(#{index := Index}, State0, Effects0) -> {State, ok, Effects} end. -checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) -> +checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, Effects, + {SendAcc, LogAcc0}) -> + DelMsg = {RaftIdx, {MsgId, Header}}, + LogAcc = maps:update_with(ConsumerId, + fun (M) -> [DelMsg | M] end, + [DelMsg], LogAcc0), + checkout0(checkout_one(State), Effects, {SendAcc, LogAcc}); +checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, {SendAcc0, LogAcc}) -> DelMsg = {MsgId, Msg}, - Acc = maps:update_with(ConsumerId, - fun (M) -> [DelMsg | M] end, - [DelMsg], Acc0), - checkout0(checkout_one(State), Effects, Acc); -checkout0({Activity, State0}, Effects0, Acc) -> + SendAcc = maps:update_with(ConsumerId, + fun (M) -> [DelMsg | M] end, + [DelMsg], SendAcc0), + checkout0(checkout_one(State), Effects, {SendAcc, LogAcc}); +checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) -> Effects1 = case Activity of nochange -> - append_send_msg_effects(Effects0, Acc); + append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc); inactive -> [{aux, inactive} - | append_send_msg_effects(Effects0, Acc)] + | append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc)] end, {State0, ok, lists:reverse(Effects1)}. @@ -1228,6 +1289,15 @@ evaluate_limit(Result, State0, Effects0) -> {State0, Result, Effects0} end. +evaluate_memory_limit(_Header, #?MODULE{cfg = #cfg{max_in_memory_length = undefined, + max_in_memory_bytes = undefined}}) -> + false; +evaluate_memory_limit(#{size := Size}, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength, + max_in_memory_bytes = MaxBytes}, + msg_bytes_in_memory = Bytes, + msgs_ready_in_memory = Length}) -> + (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes). + append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 -> Effects; append_send_msg_effects(Effects0, AccMap) -> @@ -1236,6 +1306,11 @@ append_send_msg_effects(Effects0, AccMap) -> end, Effects0, AccMap), [{aux, active} | Effects]. +append_log_effects(Effects0, AccMap) -> + maps:fold(fun (C, Msgs, Ef) -> + [send_log_effect(C, lists:reverse(Msgs)) | Ef] + end, Effects0, AccMap). + %% next message is determined as follows: %% First we check if there are are prefex returns %% Then we check if there are current returns @@ -1244,6 +1319,9 @@ append_send_msg_effects(Effects0, AccMap) -> %% %% When we return it is always done to the current return queue %% for both prefix messages and current messages +take_next_msg(#?MODULE{prefix_msgs = {[{'$empty_msg', _} = Msg | Rem], P}} = State) -> + %% there are prefix returns, these should be served first + {Msg, State#?MODULE{prefix_msgs = {Rem, P}}}; take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) -> %% there are prefix returns, these should be served first {{'$prefix_msg', Header}, @@ -1276,15 +1354,38 @@ take_next_msg(#?MODULE{returns = Returns, end end; empty -> - [Header | Rem] = P, - %% There are prefix msgs - {{'$prefix_msg', Header}, - State#?MODULE{prefix_msgs = {R, Rem}}} + [Msg | Rem] = P, + case Msg of + {Header, 'empty'} -> + %% There are prefix msgs + {{'$empty_msg', Header}, + State#?MODULE{prefix_msgs = {R, Rem}}}; + Header -> + {{'$prefix_msg', Header}, + State#?MODULE{prefix_msgs = {R, Rem}}} + end end. send_msg_effect({CTag, CPid}, Msgs) -> {send_msg, CPid, {delivery, CTag, Msgs}, ra_event}. +send_log_effect({CTag, CPid}, IdxMsgs) -> + {RaftIdxs, Data} = lists:unzip(IdxMsgs), + {log, RaftIdxs, + fun(Log) -> + Msgs = lists:zipwith(fun({enqueue, _, _, Msg}, {MsgId, Header}) -> + {MsgId, {Header, Msg}} + end, Log, Data), + [{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}] + end}. + +reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> + {log, [RaftIdx], + fun([{enqueue, _, _, Msg}]) -> + [{reply, From, {wrap_reply, + {dequeue, {MsgId, {Header, Msg}}, Ready}}}] + end}. + checkout_one(#?MODULE{service_queue = SQ0, messages = Messages0, consumers = Cons0} = InitState) -> @@ -1321,11 +1422,20 @@ checkout_one(#?MODULE{service_queue = SQ0, consumers = Cons}, {State, Msg} = case ConsumerMsg of - {'$prefix_msg', _} -> - {add_bytes_checkout(ConsumerMsg, State1), + {'$prefix_msg', Header} -> + {subtract_in_memory_counts( + Header, add_bytes_checkout(Header, State1)), + ConsumerMsg}; + {'$empty_msg', Header} -> + {add_bytes_checkout(Header, State1), ConsumerMsg}; - {_, {_, {_, RawMsg} = M}} -> - {add_bytes_checkout(RawMsg, State1), + {_, {_, {Header, 'empty'}} = M} -> + {add_bytes_checkout(Header, State1), + M}; + {_, {_, {Header, _} = M}} -> + {subtract_in_memory_counts( + Header, + add_bytes_checkout(Header, State1)), M} end, {success, ConsumerId, Next, Msg, State}; @@ -1439,13 +1549,19 @@ dehydrate_state(#?MODULE{messages = Messages, %% TODO: optimise this function as far as possible PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) -> [Header | Acc]; + ({'$empty_msg', _} = Msg, Acc) -> + [Msg | Acc]; + ({_, {_, {Header, 'empty'}}}, Acc) -> + [{'$empty_msg', Header} | Acc]; ({_, {_, {Header, _}}}, Acc) -> [Header | Acc] end, lists:reverse(PrefRet0), lqueue:to_list(Returns)), - PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {Header, _}}}, Acc) -> - [Header| Acc] + PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_, 'empty'} = Msg}}, Acc) -> + [Msg | Acc]; + ({_, {_RaftIdx, {Header, _}}}, Acc) -> + [Header | Acc] end, lists:reverse(PrefMsg0), lists:sort(maps:to_list(Messages))), @@ -1465,6 +1581,10 @@ dehydrate_state(#?MODULE{messages = Messages, dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> M; + (_, {'$empty_msg', _} = M) -> + M; + (_, {_, {_, {Header, 'empty'}}}) -> + {'$empty_msg', Header}; (_, {_, {_, {Header, _}}}) -> {'$prefix_msg', Header} end, Checked0), @@ -1523,33 +1643,43 @@ make_purge_nodes(Nodes) -> make_update_config(Config) -> #update_config{config = Config}. -add_bytes_enqueue(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> +add_bytes_enqueue(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}. -add_bytes_drop(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> +add_bytes_drop(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) -> State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}. -add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout, +add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue } = State) -> - Bytes = message_size(Msg), State#?MODULE{msg_bytes_checkout = Checkout + Bytes, msg_bytes_enqueue = Enqueue - Bytes}. -add_bytes_settle(Msg, #?MODULE{msg_bytes_checkout = Checkout} = State) -> - Bytes = message_size(Msg), +add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) -> State#?MODULE{msg_bytes_checkout = Checkout - Bytes}. -add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout, +add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout, msg_bytes_enqueue = Enqueue} = State) -> - Bytes = message_size(Msg), State#?MODULE{msg_bytes_checkout = Checkout - Bytes, msg_bytes_enqueue = Enqueue + Bytes}. +add_in_memory_counts(#{size := Bytes}, #?MODULE{msg_bytes_in_memory = InMemoryBytes, + msgs_ready_in_memory = InMemoryCount} = State) -> + State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes, + msgs_ready_in_memory = InMemoryCount + 1}. + +subtract_in_memory_counts(#{size := Bytes}, + #?MODULE{msg_bytes_in_memory = InMemoryBytes, + msgs_ready_in_memory = InMemoryCount} = State) -> + State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes, + msgs_ready_in_memory = InMemoryCount - 1}. + message_size(#basic_message{content = Content}) -> #content{payload_fragments_rev = PFR} = Content, iolist_size(PFR); message_size({'$prefix_msg', #{size := B}}) -> B; +message_size({'$empty_msg', #{size := B}}) -> + B; message_size(B) when is_binary(B) -> byte_size(B); message_size(Msg) -> diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index 968ae07739..be9dc682bb 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -108,7 +108,9 @@ %% whether single active consumer is on or not for this queue consumer_strategy = competing :: consumer_strategy(), %% the maximum number of unsuccessful delivery attempts permitted - delivery_limit :: maybe(non_neg_integer()) + delivery_limit :: maybe(non_neg_integer()), + max_in_memory_length :: maybe(non_neg_integer()), + max_in_memory_bytes :: maybe(non_neg_integer()) }). -record(rabbit_fifo, @@ -153,13 +155,15 @@ %% overflow calculations). %% This is done so that consumers are still served in a deterministic %% order on recovery. - prefix_msgs = {[], []} :: {Return :: [msg_header()], - PrefixMsgs :: [msg_header()]}, + prefix_msgs = {[], []} :: {Return :: [msg_header() | {'$empty_msg', msg_header()}], + PrefixMsgs :: [msg_header() | {msg_header(), 'empty'}]}, msg_bytes_enqueue = 0 :: non_neg_integer(), msg_bytes_checkout = 0 :: non_neg_integer(), %% waiting consumers, one is picked active consumer is cancelled or dies %% used only when single active consumer is on - waiting_consumers = [] :: [{consumer_id(), consumer()}] + waiting_consumers = [] :: [{consumer_id(), consumer()}], + msg_bytes_in_memory = 0 :: non_neg_integer(), + msgs_ready_in_memory = 0 :: non_neg_integer() }). -type config() :: #{name := atom(), @@ -169,5 +173,7 @@ release_cursor_interval => non_neg_integer(), max_length => non_neg_integer(), max_bytes => non_neg_integer(), + max_in_memory_length => non_neg_integer(), + max_in_memory_bytes => non_neg_integer(), single_active_consumer_on => boolean(), delivery_limit => non_neg_integer()}. diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index b4501dbf84..7878bed02d 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -41,6 +41,8 @@ register() -> {policy_validator, <<"expires">>}, {policy_validator, <<"max-length">>}, {policy_validator, <<"max-length-bytes">>}, + {policy_validator, <<"max-in-memory-length">>}, + {policy_validator, <<"max-in-memory-bytes">>}, {policy_validator, <<"queue-mode">>}, {policy_validator, <<"overflow">>}, {policy_validator, <<"delivery-limit">>}, @@ -48,11 +50,15 @@ register() -> {operator_policy_validator, <<"message-ttl">>}, {operator_policy_validator, <<"max-length">>}, {operator_policy_validator, <<"max-length-bytes">>}, + {operator_policy_validator, <<"max-in-memory-length">>}, + {operator_policy_validator, <<"max-in-memory-bytes">>}, {operator_policy_validator, <<"delivery-limit">>}, {policy_merge_strategy, <<"expires">>}, {policy_merge_strategy, <<"message-ttl">>}, {policy_merge_strategy, <<"max-length">>}, {policy_merge_strategy, <<"max-length-bytes">>}, + {policy_merge_strategy, <<"max-in-memory-length">>}, + {policy_merge_strategy, <<"max-in-memory-bytes">>}, {policy_merge_strategy, <<"delivery-limit">>}]], ok. @@ -103,6 +109,18 @@ validate_policy0(<<"max-length-bytes">>, Value) validate_policy0(<<"max-length-bytes">>, Value) -> {error, "~p is not a valid maximum length in bytes", [Value]}; +validate_policy0(<<"max-in-memory-length">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"max-in-memory-length">>, Value) -> + {error, "~p is not a valid maximum memory in bytes", [Value]}; + +validate_policy0(<<"max-in-memory-bytes">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"max-in-memory-bytes">>, Value) -> + {error, "~p is not a valid maximum memory in bytes", [Value]}; + validate_policy0(<<"queue-mode">>, <<"default">>) -> ok; validate_policy0(<<"queue-mode">>, <<"lazy">>) -> @@ -125,5 +143,7 @@ validate_policy0(<<"delivery-limit">>, Value) -> merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal); +merge_policy_value(<<"max-in-memory-length">>, Val, OpVal) -> min(Val, OpVal); +merge_policy_value(<<"max-in-memory-bytes">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index cbd9980491..8258584e3c 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -61,7 +61,9 @@ members, open_files, single_active_consumer_pid, - single_active_consumer_ctag + single_active_consumer_ctag, + messages_ram, + message_bytes_ram ]). -define(RPC_TIMEOUT, 1000). @@ -164,6 +166,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> %% take the minimum value of the policy and the queue arg if present MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), + MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q), + MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q), DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q), #{name => Name, queue_resource => QName, @@ -171,6 +175,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> become_leader_handler => {?MODULE, become_leader, [QName]}, max_length => MaxLength, max_bytes => MaxBytes, + max_in_memory_length => MaxMemoryLength, + max_in_memory_bytes => MaxMemoryBytes, single_active_consumer_on => single_active_consumer_on(Q), delivery_limit => DeliveryLimit }. @@ -982,6 +988,16 @@ i(single_active_consumer_ctag, Q) when ?is_amqqueue(Q) -> '' end; i(type, _) -> quorum; +i(messages_ram, Q) when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), + {ok, {_, {Length, _}}, _} = ra:local_query(QPid, + fun rabbit_fifo:query_in_memory_usage/1), + Length; +i(message_bytes_ram, Q) when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), + {ok, {_, {_, Bytes}}, _} = ra:local_query(QPid, + fun rabbit_fifo:query_in_memory_usage/1), + Bytes; i(_K, _Q) -> ''. open_files(Name) -> |
