summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2019-04-11 17:05:19 +0100
committerGitHub <noreply@github.com>2019-04-11 17:05:19 +0100
commit96b9fb0f5cf6e563203e4b71af3a636cb3e95bfb (patch)
tree31471f84fa456235647628b708e28c8edbdbdc7e /src
parent46211bfa4de1aa8c532d16709823971127a9c552 (diff)
parent2ad83292a53a7d86ad56c7adff245d28989e9b32 (diff)
downloadrabbitmq-server-git-96b9fb0f5cf6e563203e4b71af3a636cb3e95bfb.tar.gz
Merge pull request #1970 from rabbitmq/in-memory-limits
In memory limits for quorum queues
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_fifo.erl270
-rw-r--r--src/rabbit_fifo.hrl14
-rw-r--r--src/rabbit_policies.erl20
-rw-r--r--src/rabbit_quorum_queue.erl18
6 files changed, 251 insertions, 75 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 1210e467ec..b3b47ccee6 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -540,6 +540,8 @@ start_loaded_apps(Apps, RestartTypes) ->
%% make Ra use a custom logger that dispatches to lager instead of the
%% default OTP logger
application:set_env(ra, logger_module, rabbit_log_ra_shim),
+ %% use a larger segments size for queues
+ application:set_env(ra, segment_max_entries, 32768),
ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of
undefined ->
[];
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a8fd5a5081..fac9d5e50f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -685,6 +685,8 @@ declare_args() ->
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
+ {<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2},
+ {<<"x-max-in-memory-bytes">>, fun check_non_neg_int_arg/2},
{<<"x-max-priority">>, fun check_max_priority_arg/2},
{<<"x-overflow">>, fun check_overflow/2},
{<<"x-queue-mode">>, fun check_queue_mode/2},
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 1d53061a8a..609fa0111c 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -46,6 +46,7 @@
query_consumers/1,
query_stat/1,
query_single_active_consumer/1,
+ query_in_memory_usage/1,
usage/1,
zero/1,
@@ -125,7 +126,7 @@
init(#{name := Name,
queue_resource := Resource} = Conf) ->
update_config(Conf, #?MODULE{cfg = #cfg{name = Name,
- resource = Resource}}).
+ resource = Resource}}).
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
@@ -133,6 +134,8 @@ update_config(Conf, State) ->
SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
MaxLength = maps:get(max_length, Conf, undefined),
MaxBytes = maps:get(max_bytes, Conf, undefined),
+ MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined),
+ MaxMemoryBytes = maps:get(max_in_memory_bytes, Conf, undefined),
DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
@@ -146,6 +149,8 @@ update_config(Conf, State) ->
become_leader_handler = BLH,
max_length = MaxLength,
max_bytes = MaxBytes,
+ max_in_memory_length = MaxMemoryLength,
+ max_in_memory_bytes = MaxMemoryBytes,
consumer_strategy = ConsumerStrategy,
delivery_limit = DeliveryLimit}}.
@@ -252,9 +257,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
%% credit for unknown consumer - just ignore
{State0, ok}
end;
-apply(Meta, #checkout{spec = {dequeue, Settlement},
- meta = ConsumerMeta,
- consumer_id = ConsumerId},
+apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
+ meta = ConsumerMeta,
+ consumer_id = ConsumerId},
#?MODULE{consumers = Consumers} = State0) ->
Exists = maps:is_key(ConsumerId, Consumers),
case messages_ready(State0) of
@@ -268,16 +273,23 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
{once, 1, simple_prefetch},
State0),
{success, _, MsgId, Msg, State2} = checkout_one(State1),
- case Settlement of
- unsettled ->
- {_, Pid} = ConsumerId,
- {State2, {dequeue, {MsgId, Msg}, Ready-1},
- [{monitor, process, Pid}]};
- settled ->
- %% immediately settle the checkout
- {State, _, Effects} =
- apply(Meta, make_settle(ConsumerId, [MsgId]),
- State2),
+ {State, Effects} = case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State3, _, Effects0} =
+ apply(Meta, make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State3, Effects0}
+ end,
+ case Msg of
+ {RaftIdx, {Header, 'empty'}} ->
+ %% TODO add here new log effect with reply
+ {State, '$ra_no_reply',
+ reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)};
+ _ ->
{State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
end
end;
@@ -305,7 +317,9 @@ apply(#{index := RaftIdx}, #purge{},
returns = lqueue:new(),
msg_bytes_enqueue = 0,
prefix_msgs = {[], []},
- low_msg_num = undefined},
+ low_msg_num = undefined,
+ msg_bytes_in_memory = 0,
+ msgs_ready_in_memory = 0},
[]),
%% as we're not checking out after a purge (no point) we have to
%% reverse the effects ourselves
@@ -728,6 +742,10 @@ query_single_active_consumer(_) ->
query_stat(#?MODULE{consumers = Consumers} = State) ->
{messages_ready(State), maps:size(Consumers)}.
+query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length}) ->
+ {Length, Bytes}.
+
-spec usage(atom()) -> float().
usage(Name) when is_atom(Name) ->
case ets:lookup(rabbit_fifo_usage, Name) of
@@ -909,17 +927,23 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
case take_next_msg(State0) of
- {FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}},
+ {FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}},
State1} ->
Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
- Bytes = message_size(Msg),
- State = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}),
+ State2 = add_bytes_drop(Header, State1#?MODULE{ra_indexes = Indexes}),
+ State = case Msg of
+ 'empty' -> State2;
+ _ -> subtract_in_memory_counts(Header, State2)
+ end,
Effects = dead_letter_effects(maxlen, #{none => FullMsg},
State, Effects0),
{State, Effects};
- {{'$prefix_msg', #{size := Bytes}}, State1} ->
- State = add_bytes_drop(Bytes, State1),
- {State, Effects0};
+ {{'$prefix_msg', Header}, State1} ->
+ State2 = subtract_in_memory_counts(Header, add_bytes_drop(Header, State1)),
+ {State2, Effects0};
+ {{'$empty_msg', Header}, State1} ->
+ State2 = add_bytes_drop(Header, State1),
+ {State2, Effects0};
empty ->
{State0, Effects0}
end.
@@ -927,14 +951,21 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
- Size = message_size(RawMsg),
- Msg = {RaftIdx, {#{size => Size}, RawMsg}}, % indexed message with header map
- State = add_bytes_enqueue(Size, State0),
+ Header = #{size => message_size(RawMsg)},
+ {State1, Msg} =
+ case evaluate_memory_limit(Header, State0) of
+ true ->
+ {State0, {RaftIdx, {Header, 'empty'}}}; % indexed message with header map
+ false ->
+ {add_in_memory_counts(Header, State0),
+ {RaftIdx, {Header, RawMsg}}} % indexed message with header map
+ end,
+ State = add_bytes_enqueue(Header, State1),
State#?MODULE{messages = Messages#{NextMsgNum => Msg},
- % this is probably only done to record it when low_msg_num
- % is undefined
- low_msg_num = min(LowMsgNum, NextMsgNum),
- next_msg_num = NextMsgNum + 1}.
+ %% this is probably only done to record it when low_msg_num
+ %% is undefined
+ low_msg_num = min(LowMsgNum, NextMsgNum),
+ next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
#?MODULE{ra_indexes = Indexes0} = State0) ->
@@ -1014,7 +1045,8 @@ snd(T) ->
return(Meta, ConsumerId, Returned,
Effects0, #?MODULE{service_queue = SQ0} = State0) ->
{State1, Effects1} = maps:fold(
- fun(MsgId, {'$prefix_msg', _} = Msg, {S0, E0}) ->
+ fun(MsgId, {Tag, _} = Msg, {S0, E0}) when Tag == '$prefix_msg';
+ Tag == '$empty_msg'->
return_one(MsgId, 0, Msg, S0, E0, ConsumerId);
(MsgId, {MsgNum, Msg}, {S0, E0}) ->
return_one(MsgId, MsgNum, Msg, S0, E0,
@@ -1042,10 +1074,12 @@ complete(ConsumerId, Discarded,
SQ0, Effects0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
MsgRaftIdxs),
- State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) ->
- add_bytes_settle(RawMsg, Acc);
- ({'$prefix_msg', _} = M, Acc) ->
- add_bytes_settle(M, Acc)
+ State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$prefix_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$empty_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc)
end, State0, maps:values(Discarded)),
{State1#?MODULE{consumers = Cons,
ra_indexes = Indexes,
@@ -1128,24 +1162,33 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
{Potential, Cursors0}
end.
-return_one(MsgId, 0, {'$prefix_msg', Header0},
+return_one(MsgId, 0, {Tag, Header0},
#?MODULE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
- Effects0, ConsumerId) ->
+ Effects0, ConsumerId) when Tag == '$prefix_msg'; Tag == '$empty_msg' ->
#consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
1, Header0),
- Msg = {'$prefix_msg', Header},
+ Msg0 = {Tag, Header},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
- complete(ConsumerId, #{MsgId => Msg}, Con0, Effects0, State0);
+ complete(ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0);
_ ->
%% this should not affect the release cursor in any way
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
+ {Msg, State1} = case Tag of
+ '$empty_msg' -> {Msg0, State0};
+ _ -> case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{'$empty_msg', Header}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
+ end,
{add_bytes_return(
- Msg,
- State0#?MODULE{consumers = Consumers#{ConsumerId => Con},
+ Header,
+ State1#?MODULE{consumers = Consumers#{ConsumerId => Con},
returns = lqueue:in(Msg, Returns)}),
Effects0}
end;
@@ -1157,19 +1200,28 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
#consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
1, Header0),
- Msg = {RaftId, {Header, RawMsg}},
+ Msg0 = {RaftId, {Header, RawMsg}},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
- DlMsg = {MsgNum, Msg},
+ DlMsg = {MsgNum, Msg0},
Effects = dead_letter_effects(delivery_limit, #{none => DlMsg},
State0, Effects0),
complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
_ ->
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
%% this should not affect the release cursor in any way
+ {Msg, State1} = case RawMsg of
+ 'empty' -> {Msg0, State0};
+ _ -> case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{RaftId, {Header, 'empty'}}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
+ end,
{add_bytes_return(
- RawMsg,
- State0#?MODULE{consumers = Consumers#{ConsumerId => Con},
+ Header,
+ State1#?MODULE{consumers = Consumers#{ConsumerId => Con},
returns = lqueue:in({MsgNum, Msg}, Returns)}),
Effects0}
end.
@@ -1182,6 +1234,8 @@ return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}},
lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) ->
return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) ->
+ return_one(MsgId, 0, Msg, S, E, ConsumerId);
({MsgId, {MsgNum, Msg}}, {S, E}) ->
return_one(MsgId, MsgNum, Msg, S, E, ConsumerId)
end, {State, Effects0}, Checked).
@@ -1190,7 +1244,7 @@ return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
%% reverses the effects list
checkout(#{index := Index}, State0, Effects0) ->
{State1, _Result, Effects1} = checkout0(checkout_one(State0),
- Effects0, #{}),
+ Effects0, {#{}, #{}}),
case evaluate_limit(false, State1, Effects1) of
{State, true, Effects} ->
update_smallest_raft_index(Index, State, Effects);
@@ -1198,19 +1252,26 @@ checkout(#{index := Index}, State0, Effects0) ->
{State, ok, Effects}
end.
-checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
+checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, Effects,
+ {SendAcc, LogAcc0}) ->
+ DelMsg = {RaftIdx, {MsgId, Header}},
+ LogAcc = maps:update_with(ConsumerId,
+ fun (M) -> [DelMsg | M] end,
+ [DelMsg], LogAcc0),
+ checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
+checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, {SendAcc0, LogAcc}) ->
DelMsg = {MsgId, Msg},
- Acc = maps:update_with(ConsumerId,
- fun (M) -> [DelMsg | M] end,
- [DelMsg], Acc0),
- checkout0(checkout_one(State), Effects, Acc);
-checkout0({Activity, State0}, Effects0, Acc) ->
+ SendAcc = maps:update_with(ConsumerId,
+ fun (M) -> [DelMsg | M] end,
+ [DelMsg], SendAcc0),
+ checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
+checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
Effects1 = case Activity of
nochange ->
- append_send_msg_effects(Effects0, Acc);
+ append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc);
inactive ->
[{aux, inactive}
- | append_send_msg_effects(Effects0, Acc)]
+ | append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc)]
end,
{State0, ok, lists:reverse(Effects1)}.
@@ -1228,6 +1289,15 @@ evaluate_limit(Result, State0, Effects0) ->
{State0, Result, Effects0}
end.
+evaluate_memory_limit(_Header, #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
+ max_in_memory_bytes = undefined}}) ->
+ false;
+evaluate_memory_limit(#{size := Size}, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
+ max_in_memory_bytes = MaxBytes},
+ msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length}) ->
+ (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes).
+
append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
Effects;
append_send_msg_effects(Effects0, AccMap) ->
@@ -1236,6 +1306,11 @@ append_send_msg_effects(Effects0, AccMap) ->
end, Effects0, AccMap),
[{aux, active} | Effects].
+append_log_effects(Effects0, AccMap) ->
+ maps:fold(fun (C, Msgs, Ef) ->
+ [send_log_effect(C, lists:reverse(Msgs)) | Ef]
+ end, Effects0, AccMap).
+
%% next message is determined as follows:
%% First we check if there are are prefex returns
%% Then we check if there are current returns
@@ -1244,6 +1319,9 @@ append_send_msg_effects(Effects0, AccMap) ->
%%
%% When we return it is always done to the current return queue
%% for both prefix messages and current messages
+take_next_msg(#?MODULE{prefix_msgs = {[{'$empty_msg', _} = Msg | Rem], P}} = State) ->
+ %% there are prefix returns, these should be served first
+ {Msg, State#?MODULE{prefix_msgs = {Rem, P}}};
take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) ->
%% there are prefix returns, these should be served first
{{'$prefix_msg', Header},
@@ -1276,15 +1354,38 @@ take_next_msg(#?MODULE{returns = Returns,
end
end;
empty ->
- [Header | Rem] = P,
- %% There are prefix msgs
- {{'$prefix_msg', Header},
- State#?MODULE{prefix_msgs = {R, Rem}}}
+ [Msg | Rem] = P,
+ case Msg of
+ {Header, 'empty'} ->
+ %% There are prefix msgs
+ {{'$empty_msg', Header},
+ State#?MODULE{prefix_msgs = {R, Rem}}};
+ Header ->
+ {{'$prefix_msg', Header},
+ State#?MODULE{prefix_msgs = {R, Rem}}}
+ end
end.
send_msg_effect({CTag, CPid}, Msgs) ->
{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}.
+send_log_effect({CTag, CPid}, IdxMsgs) ->
+ {RaftIdxs, Data} = lists:unzip(IdxMsgs),
+ {log, RaftIdxs,
+ fun(Log) ->
+ Msgs = lists:zipwith(fun({enqueue, _, _, Msg}, {MsgId, Header}) ->
+ {MsgId, {Header, Msg}}
+ end, Log, Data),
+ [{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}]
+ end}.
+
+reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
+ {log, [RaftIdx],
+ fun([{enqueue, _, _, Msg}]) ->
+ [{reply, From, {wrap_reply,
+ {dequeue, {MsgId, {Header, Msg}}, Ready}}}]
+ end}.
+
checkout_one(#?MODULE{service_queue = SQ0,
messages = Messages0,
consumers = Cons0} = InitState) ->
@@ -1321,11 +1422,20 @@ checkout_one(#?MODULE{service_queue = SQ0,
consumers = Cons},
{State, Msg} =
case ConsumerMsg of
- {'$prefix_msg', _} ->
- {add_bytes_checkout(ConsumerMsg, State1),
+ {'$prefix_msg', Header} ->
+ {subtract_in_memory_counts(
+ Header, add_bytes_checkout(Header, State1)),
+ ConsumerMsg};
+ {'$empty_msg', Header} ->
+ {add_bytes_checkout(Header, State1),
ConsumerMsg};
- {_, {_, {_, RawMsg} = M}} ->
- {add_bytes_checkout(RawMsg, State1),
+ {_, {_, {Header, 'empty'}} = M} ->
+ {add_bytes_checkout(Header, State1),
+ M};
+ {_, {_, {Header, _} = M}} ->
+ {subtract_in_memory_counts(
+ Header,
+ add_bytes_checkout(Header, State1)),
M}
end,
{success, ConsumerId, Next, Msg, State};
@@ -1439,13 +1549,19 @@ dehydrate_state(#?MODULE{messages = Messages,
%% TODO: optimise this function as far as possible
PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) ->
[Header | Acc];
+ ({'$empty_msg', _} = Msg, Acc) ->
+ [Msg | Acc];
+ ({_, {_, {Header, 'empty'}}}, Acc) ->
+ [{'$empty_msg', Header} | Acc];
({_, {_, {Header, _}}}, Acc) ->
[Header | Acc]
end,
lists:reverse(PrefRet0),
lqueue:to_list(Returns)),
- PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {Header, _}}}, Acc) ->
- [Header| Acc]
+ PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_, 'empty'} = Msg}}, Acc) ->
+ [Msg | Acc];
+ ({_, {_RaftIdx, {Header, _}}}, Acc) ->
+ [Header | Acc]
end,
lists:reverse(PrefMsg0),
lists:sort(maps:to_list(Messages))),
@@ -1465,6 +1581,10 @@ dehydrate_state(#?MODULE{messages = Messages,
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
M;
+ (_, {'$empty_msg', _} = M) ->
+ M;
+ (_, {_, {_, {Header, 'empty'}}}) ->
+ {'$empty_msg', Header};
(_, {_, {_, {Header, _}}}) ->
{'$prefix_msg', Header}
end, Checked0),
@@ -1523,33 +1643,43 @@ make_purge_nodes(Nodes) ->
make_update_config(Config) ->
#update_config{config = Config}.
-add_bytes_enqueue(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
+add_bytes_enqueue(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}.
-add_bytes_drop(Bytes, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
+add_bytes_drop(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}.
-add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout,
+add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State) ->
- Bytes = message_size(Msg),
State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
msg_bytes_enqueue = Enqueue - Bytes}.
-add_bytes_settle(Msg, #?MODULE{msg_bytes_checkout = Checkout} = State) ->
- Bytes = message_size(Msg),
+add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) ->
State#?MODULE{msg_bytes_checkout = Checkout - Bytes}.
-add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout,
+add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue} = State) ->
- Bytes = message_size(Msg),
State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
msg_bytes_enqueue = Enqueue + Bytes}.
+add_in_memory_counts(#{size := Bytes}, #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State) ->
+ State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes,
+ msgs_ready_in_memory = InMemoryCount + 1}.
+
+subtract_in_memory_counts(#{size := Bytes},
+ #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State) ->
+ State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes,
+ msgs_ready_in_memory = InMemoryCount - 1}.
+
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
message_size({'$prefix_msg', #{size := B}}) ->
B;
+message_size({'$empty_msg', #{size := B}}) ->
+ B;
message_size(B) when is_binary(B) ->
byte_size(B);
message_size(Msg) ->
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 968ae07739..be9dc682bb 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -108,7 +108,9 @@
%% whether single active consumer is on or not for this queue
consumer_strategy = competing :: consumer_strategy(),
%% the maximum number of unsuccessful delivery attempts permitted
- delivery_limit :: maybe(non_neg_integer())
+ delivery_limit :: maybe(non_neg_integer()),
+ max_in_memory_length :: maybe(non_neg_integer()),
+ max_in_memory_bytes :: maybe(non_neg_integer())
}).
-record(rabbit_fifo,
@@ -153,13 +155,15 @@
%% overflow calculations).
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
- prefix_msgs = {[], []} :: {Return :: [msg_header()],
- PrefixMsgs :: [msg_header()]},
+ prefix_msgs = {[], []} :: {Return :: [msg_header() | {'$empty_msg', msg_header()}],
+ PrefixMsgs :: [msg_header() | {msg_header(), 'empty'}]},
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer(),
%% waiting consumers, one is picked active consumer is cancelled or dies
%% used only when single active consumer is on
- waiting_consumers = [] :: [{consumer_id(), consumer()}]
+ waiting_consumers = [] :: [{consumer_id(), consumer()}],
+ msg_bytes_in_memory = 0 :: non_neg_integer(),
+ msgs_ready_in_memory = 0 :: non_neg_integer()
}).
-type config() :: #{name := atom(),
@@ -169,5 +173,7 @@
release_cursor_interval => non_neg_integer(),
max_length => non_neg_integer(),
max_bytes => non_neg_integer(),
+ max_in_memory_length => non_neg_integer(),
+ max_in_memory_bytes => non_neg_integer(),
single_active_consumer_on => boolean(),
delivery_limit => non_neg_integer()}.
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index b4501dbf84..7878bed02d 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -41,6 +41,8 @@ register() ->
{policy_validator, <<"expires">>},
{policy_validator, <<"max-length">>},
{policy_validator, <<"max-length-bytes">>},
+ {policy_validator, <<"max-in-memory-length">>},
+ {policy_validator, <<"max-in-memory-bytes">>},
{policy_validator, <<"queue-mode">>},
{policy_validator, <<"overflow">>},
{policy_validator, <<"delivery-limit">>},
@@ -48,11 +50,15 @@ register() ->
{operator_policy_validator, <<"message-ttl">>},
{operator_policy_validator, <<"max-length">>},
{operator_policy_validator, <<"max-length-bytes">>},
+ {operator_policy_validator, <<"max-in-memory-length">>},
+ {operator_policy_validator, <<"max-in-memory-bytes">>},
{operator_policy_validator, <<"delivery-limit">>},
{policy_merge_strategy, <<"expires">>},
{policy_merge_strategy, <<"message-ttl">>},
{policy_merge_strategy, <<"max-length">>},
{policy_merge_strategy, <<"max-length-bytes">>},
+ {policy_merge_strategy, <<"max-in-memory-length">>},
+ {policy_merge_strategy, <<"max-in-memory-bytes">>},
{policy_merge_strategy, <<"delivery-limit">>}]],
ok.
@@ -103,6 +109,18 @@ validate_policy0(<<"max-length-bytes">>, Value)
validate_policy0(<<"max-length-bytes">>, Value) ->
{error, "~p is not a valid maximum length in bytes", [Value]};
+validate_policy0(<<"max-in-memory-length">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"max-in-memory-length">>, Value) ->
+ {error, "~p is not a valid maximum memory in bytes", [Value]};
+
+validate_policy0(<<"max-in-memory-bytes">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"max-in-memory-bytes">>, Value) ->
+ {error, "~p is not a valid maximum memory in bytes", [Value]};
+
validate_policy0(<<"queue-mode">>, <<"default">>) ->
ok;
validate_policy0(<<"queue-mode">>, <<"lazy">>) ->
@@ -125,5 +143,7 @@ validate_policy0(<<"delivery-limit">>, Value) ->
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal);
+merge_policy_value(<<"max-in-memory-length">>, Val, OpVal) -> min(Val, OpVal);
+merge_policy_value(<<"max-in-memory-bytes">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index cbd9980491..8258584e3c 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -61,7 +61,9 @@
members,
open_files,
single_active_consumer_pid,
- single_active_consumer_ctag
+ single_active_consumer_ctag,
+ messages_ram,
+ message_bytes_ram
]).
-define(RPC_TIMEOUT, 1000).
@@ -164,6 +166,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
+ MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q),
+ MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q),
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
@@ -171,6 +175,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
become_leader_handler => {?MODULE, become_leader, [QName]},
max_length => MaxLength,
max_bytes => MaxBytes,
+ max_in_memory_length => MaxMemoryLength,
+ max_in_memory_bytes => MaxMemoryBytes,
single_active_consumer_on => single_active_consumer_on(Q),
delivery_limit => DeliveryLimit
}.
@@ -982,6 +988,16 @@ i(single_active_consumer_ctag, Q) when ?is_amqqueue(Q) ->
''
end;
i(type, _) -> quorum;
+i(messages_ram, Q) when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ {ok, {_, {Length, _}}, _} = ra:local_query(QPid,
+ fun rabbit_fifo:query_in_memory_usage/1),
+ Length;
+i(message_bytes_ram, Q) when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ {ok, {_, {_, Bytes}}, _} = ra:local_query(QPid,
+ fun rabbit_fifo:query_in_memory_usage/1),
+ Bytes;
i(_K, _Q) -> ''.
open_files(Name) ->