summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-03-26 20:25:16 +0000
committerDiana Corbacho <diana@rabbitmq.com>2019-03-27 09:03:01 +0000
commitce55899c883839230f6e3a6546bb4db9087905bb (patch)
tree6e2013c790cfdc417e0cbc8f6b62210ce35902b1 /src
parent1b87e590a232b878fa1869aff26eef675093a9bc (diff)
downloadrabbitmq-server-git-ce55899c883839230f6e3a6546bb4db9087905bb.tar.gz
Quorum queue in memory message limits
[#164735591]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_fifo.erl85
-rw-r--r--src/rabbit_fifo.hrl10
-rw-r--r--src/rabbit_policies.erl20
-rw-r--r--src/rabbit_quorum_queue.erl4
5 files changed, 104 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a8fd5a5081..fac9d5e50f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -685,6 +685,8 @@ declare_args() ->
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
+ {<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2},
+ {<<"x-max-in-memory-bytes">>, fun check_non_neg_int_arg/2},
{<<"x-max-priority">>, fun check_max_priority_arg/2},
{<<"x-overflow">>, fun check_overflow/2},
{<<"x-queue-mode">>, fun check_queue_mode/2},
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index e43e94a385..549da8acbb 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -46,6 +46,7 @@
query_consumers/1,
query_stat/1,
query_single_active_consumer/1,
+ query_in_memory_usage/1,
usage/1,
zero/1,
@@ -130,6 +131,8 @@ update_config(Conf, State) ->
SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
MaxLength = maps:get(max_length, Conf, undefined),
MaxBytes = maps:get(max_bytes, Conf, undefined),
+ MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined),
+ MaxMemoryBytes = maps:get(max_in_memory_bytes, Conf, undefined),
DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
@@ -143,6 +146,8 @@ update_config(Conf, State) ->
become_leader_handler = BLH,
max_length = MaxLength,
max_bytes = MaxBytes,
+ max_in_memory_length = MaxMemoryLength,
+ max_in_memory_bytes = MaxMemoryBytes,
consumer_strategy = ConsumerStrategy,
delivery_limit = DeliveryLimit}}.
@@ -267,6 +272,7 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
{once, 1, simple_prefetch},
State0),
{success, _, MsgId, Msg, State2} = checkout_one(State1),
+ %% TODO handle this checkout_one!!!
case Settlement of
unsettled ->
{_, Pid} = ConsumerId,
@@ -698,6 +704,10 @@ query_single_active_consumer(_) ->
query_stat(#?MODULE{consumers = Consumers} = State) ->
{messages_ready(State), maps:size(Consumers)}.
+query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length}) ->
+ {Length, Bytes}.
+
-spec usage(atom()) -> float().
usage(Name) when is_atom(Name) ->
case ets:lookup(rabbit_fifo_usage, Name) of
@@ -878,11 +888,14 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
case take_next_msg(State0) of
- {FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}},
+ {FullMsg = {_MsgId, {RaftIdxToDrop, {#{size := Bytes} = Header, Msg}}},
State1} ->
Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
- Bytes = message_size(Msg),
- State = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}),
+ State1 = add_bytes_drop(Bytes, State1#?MODULE{ra_indexes = Indexes}),
+ State = case Msg of
+ 'empty' -> subtract_in_memory_counts(Header, State1);
+ _ -> State1
+ end,
Effects = dead_letter_effects(maxlen, #{none => FullMsg},
State, Effects0),
{State, Effects};
@@ -897,8 +910,15 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
Size = message_size(RawMsg),
- Msg = {RaftIdx, {#{size => Size}, RawMsg}}, % indexed message with header map
- State = add_bytes_enqueue(Size, State0),
+ {State1, Msg} =
+ case evaluate_memory_limit(Size, State0) of
+ true ->
+ {State0, {RaftIdx, {#{size => Size}, 'empty'}}}; % indexed message with header map
+ false ->
+ {add_in_memory_counts(Size, State0),
+ {RaftIdx, {#{size => Size}, RawMsg}}} % indexed message with header map
+ end,
+ State = add_bytes_enqueue(Size, State1),
State#?MODULE{messages = Messages#{NextMsgNum => Msg},
% this is probably only done to record it when low_msg_num
% is undefined
@@ -1033,10 +1053,10 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
Checked = maps:without(MsgIds, Checked0),
Discarded = maps:with(MsgIds, Checked0),
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
- State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) ->
- add_bytes_settle(RawMsg, Acc);
- ({'$prefix_msg', _} = M, Acc) ->
- add_bytes_settle(M, Acc)
+ State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$prefix_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc)
end, State0, maps:values(Discarded)),
%% need to pass the length of discarded as $prefix_msgs would be filtered
%% by the above list comprehension
@@ -1115,7 +1135,7 @@ return_one(0, {'$prefix_msg', Header0},
Checked = Con#consumer.checked_out,
{State1, Effects} = complete(ConsumerId, [], 1, Con, Checked,
Effects0, State0),
- {add_bytes_settle(Msg, State1), Effects};
+ {add_bytes_settle(Header, State1), Effects};
_ ->
%% this should not affect the release cursor in any way
{add_bytes_return(Msg,
@@ -1139,7 +1159,11 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
Checked = Con#consumer.checked_out,
{State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked,
Effects, State0),
- {add_bytes_settle(RawMsg, State1), Effects1};
+ State2 = case RawMsg of
+ 'empty' -> State1;
+ _ -> add_in_memory_counts(maps:get(size, Header), State1)
+ end,
+ {add_bytes_settle(Header, State2), Effects1};
_ ->
%% this should not affect the release cursor in any way
{add_bytes_return(RawMsg,
@@ -1171,6 +1195,8 @@ checkout(#{index := Index}, State0, Effects0) ->
{State, ok, Effects}
end.
+checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, Effects, Acc) ->
+ checkout0(checkout_one(State), [send_log_effect(ConsumerId, RaftIdx, MsgId, Header) | Effects], Acc);
checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
DelMsg = {MsgId, Msg},
Acc = maps:update_with(ConsumerId,
@@ -1202,6 +1228,15 @@ evaluate_limit(OldIndexes, Result,
{State0, Result, Effects0}
end.
+evaluate_memory_limit(_Size, #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
+ max_in_memory_bytes = undefined}}) ->
+ false;
+evaluate_memory_limit(Size, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
+ max_in_memory_bytes = MaxBytes},
+ msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length}) ->
+ (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes).
+
append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
Effects;
append_send_msg_effects(Effects0, AccMap) ->
@@ -1259,6 +1294,11 @@ take_next_msg(#?MODULE{returns = Returns,
send_msg_effect({CTag, CPid}, Msgs) ->
{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}.
+send_log_effect({CTag, CPid}, RaftIdx, MsgId, Header) ->
+ {log, RaftIdx, fun({enqueue, _, _, Msg}) ->
+ {send_msg, CPid, {delivery, CTag, [{MsgId, {Header, Msg}}]}, ra_event}
+ end}.
+
checkout_one(#?MODULE{service_queue = SQ0,
messages = Messages0,
consumers = Cons0} = InitState) ->
@@ -1298,8 +1338,13 @@ checkout_one(#?MODULE{service_queue = SQ0,
{'$prefix_msg', _} ->
{add_bytes_checkout(ConsumerMsg, State1),
ConsumerMsg};
- {_, {_, {_, RawMsg} = M}} ->
- {add_bytes_checkout(RawMsg, State1),
+ {_, {_, {Header, 'empty'}} = M} ->
+ {add_bytes_checkout(maps:get(size, Header), State1),
+ M};
+ {_, {_, {Header, RawMsg} = M}} ->
+ {subtract_in_memory_counts(
+ Header,
+ add_bytes_checkout(RawMsg, State1)),
M}
end,
{success, ConsumerId, Next, Msg, State};
@@ -1505,8 +1550,7 @@ add_bytes_checkout(Msg, #?MODULE{msg_bytes_checkout = Checkout,
State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
msg_bytes_enqueue = Enqueue - Bytes}.
-add_bytes_settle(Msg, #?MODULE{msg_bytes_checkout = Checkout} = State) ->
- Bytes = message_size(Msg),
+add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) ->
State#?MODULE{msg_bytes_checkout = Checkout - Bytes}.
add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout,
@@ -1515,6 +1559,17 @@ add_bytes_return(Msg, #?MODULE{msg_bytes_checkout = Checkout,
State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
msg_bytes_enqueue = Enqueue + Bytes}.
+add_in_memory_counts(Bytes, #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State) ->
+ State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes,
+ msgs_ready_in_memory = InMemoryCount + 1}.
+
+subtract_in_memory_counts(#{size := Bytes},
+ #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State) ->
+ State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes,
+ msgs_ready_in_memory = InMemoryCount - 1}.
+
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 968ae07739..c6c58820c6 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -108,7 +108,9 @@
%% whether single active consumer is on or not for this queue
consumer_strategy = competing :: consumer_strategy(),
%% the maximum number of unsuccessful delivery attempts permitted
- delivery_limit :: maybe(non_neg_integer())
+ delivery_limit :: maybe(non_neg_integer()),
+ max_in_memory_length :: maybe(non_neg_integer()),
+ max_in_memory_bytes :: maybe(non_neg_integer())
}).
-record(rabbit_fifo,
@@ -159,7 +161,9 @@
msg_bytes_checkout = 0 :: non_neg_integer(),
%% waiting consumers, one is picked active consumer is cancelled or dies
%% used only when single active consumer is on
- waiting_consumers = [] :: [{consumer_id(), consumer()}]
+ waiting_consumers = [] :: [{consumer_id(), consumer()}],
+ msg_bytes_in_memory = 0 :: non_neg_integer(),
+ msgs_ready_in_memory = 0 :: non_neg_integer()
}).
-type config() :: #{name := atom(),
@@ -169,5 +173,7 @@
release_cursor_interval => non_neg_integer(),
max_length => non_neg_integer(),
max_bytes => non_neg_integer(),
+ max_in_memory_length => non_neg_integer(),
+ max_in_memory_bytes => non_neg_integer(),
single_active_consumer_on => boolean(),
delivery_limit => non_neg_integer()}.
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index b4501dbf84..7878bed02d 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -41,6 +41,8 @@ register() ->
{policy_validator, <<"expires">>},
{policy_validator, <<"max-length">>},
{policy_validator, <<"max-length-bytes">>},
+ {policy_validator, <<"max-in-memory-length">>},
+ {policy_validator, <<"max-in-memory-bytes">>},
{policy_validator, <<"queue-mode">>},
{policy_validator, <<"overflow">>},
{policy_validator, <<"delivery-limit">>},
@@ -48,11 +50,15 @@ register() ->
{operator_policy_validator, <<"message-ttl">>},
{operator_policy_validator, <<"max-length">>},
{operator_policy_validator, <<"max-length-bytes">>},
+ {operator_policy_validator, <<"max-in-memory-length">>},
+ {operator_policy_validator, <<"max-in-memory-bytes">>},
{operator_policy_validator, <<"delivery-limit">>},
{policy_merge_strategy, <<"expires">>},
{policy_merge_strategy, <<"message-ttl">>},
{policy_merge_strategy, <<"max-length">>},
{policy_merge_strategy, <<"max-length-bytes">>},
+ {policy_merge_strategy, <<"max-in-memory-length">>},
+ {policy_merge_strategy, <<"max-in-memory-bytes">>},
{policy_merge_strategy, <<"delivery-limit">>}]],
ok.
@@ -103,6 +109,18 @@ validate_policy0(<<"max-length-bytes">>, Value)
validate_policy0(<<"max-length-bytes">>, Value) ->
{error, "~p is not a valid maximum length in bytes", [Value]};
+validate_policy0(<<"max-in-memory-length">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"max-in-memory-length">>, Value) ->
+ {error, "~p is not a valid maximum memory in bytes", [Value]};
+
+validate_policy0(<<"max-in-memory-bytes">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"max-in-memory-bytes">>, Value) ->
+ {error, "~p is not a valid maximum memory in bytes", [Value]};
+
validate_policy0(<<"queue-mode">>, <<"default">>) ->
ok;
validate_policy0(<<"queue-mode">>, <<"lazy">>) ->
@@ -125,5 +143,7 @@ validate_policy0(<<"delivery-limit">>, Value) ->
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal);
+merge_policy_value(<<"max-in-memory-length">>, Val, OpVal) -> min(Val, OpVal);
+merge_policy_value(<<"max-in-memory-bytes">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 260e36d510..b640e04ed4 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -164,6 +164,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
+ MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q),
+ MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q),
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
@@ -171,6 +173,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
become_leader_handler => {?MODULE, become_leader, [QName]},
max_length => MaxLength,
max_bytes => MaxBytes,
+ max_in_memory_length => MaxMemoryLength,
+ max_in_memory_bytes => MaxMemoryBytes,
single_active_consumer_on => single_active_consumer_on(Q),
delivery_limit => DeliveryLimit
}.