summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl103
-rw-r--r--src/rabbit_quorum_queue.erl10
-rw-r--r--test/quorum_queue_SUITE.erl82
3 files changed, 165 insertions, 30 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 2d5c267227..d365d41a96 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -22,6 +22,7 @@
-compile(inline).
-include_lib("ra/include/ra.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-export([
init/1,
@@ -42,6 +43,8 @@
query_consumers/1,
usage/1,
+ zero/1,
+
%% misc
dehydrate_state/1,
@@ -226,7 +229,9 @@
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
- PrefixMsgs :: non_neg_integer()}
+ PrefixMsgs :: non_neg_integer()},
+ msg_bytes_enqueue = 0 :: non_neg_integer(),
+ msg_bytes_checkout = 0 :: non_neg_integer()
}).
-opaque state() :: #state{}.
@@ -266,6 +271,9 @@ update_state(Conf, State) ->
become_leader_handler = BLH,
shadow_copy_interval = SHI}.
+zero(_) ->
+ 0.
+
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
-spec apply(ra_machine:command_meta_data(), command(),
@@ -275,7 +283,8 @@ apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
msg = RawMsg}, Effects0, State00) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
{ok, State0, Effects1} ->
- {State, Effects, ok} = checkout(State0, Effects1),
+ {State, Effects, ok} = checkout(add_bytes_enqueue(RawMsg, State0),
+ Effects1),
{append_to_master_index(RaftIdx, State), Effects, ok};
{duplicate, State, Effects} ->
{State, Effects, ok}
@@ -543,12 +552,16 @@ tick(_Ts, #state{name = Name,
queue_resource = QName,
messages = Messages,
ra_indexes = Indexes,
- consumers = Cons} = State) ->
+ consumers = Cons,
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {Name,
maps:size(Messages), % Ready
num_checked_out(State), % checked out
rabbit_fifo_index:size(Indexes), %% Total
- maps:size(Cons)}, % Consumers
+ maps:size(Cons), % Consumers
+ EnqueueBytes,
+ CheckoutBytes},
[{mod_call, rabbit_quorum_queue,
update_metrics, [QName, Metrics]}, {aux, emit}].
@@ -556,13 +569,18 @@ tick(_Ts, #state{name = Name,
overview(#state{consumers = Cons,
enqueuers = Enqs,
messages = Messages,
- ra_indexes = Indexes} = State) ->
+ ra_indexes = Indexes,
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes
+ } = State) ->
#{type => ?MODULE,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
num_enqueuers => maps:size(Enqs),
num_ready_messages => maps:size(Messages),
- num_messages => rabbit_fifo_index:size(Indexes)}.
+ num_messages => rabbit_fifo_index:size(Indexes),
+ enqueue_message_bytes => EnqueueBytes,
+ checkout_message_bytes => CheckoutBytes}.
-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) ->
[delivery_msg()].
@@ -806,12 +824,17 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
Checked = maps:without(MsgIds, Checked0),
Discarded = maps:with(MsgIds, Checked0),
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
+ State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) ->
+ add_bytes_settle(RawMsg, Acc);
+ (_, Acc) ->
+ Acc
+ end, State0, maps:values(Discarded)),
%% need to pass the length of discarded as $prefix_msgs would be filtered
%% by the above list comprehension
- {State1, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
+ {State2, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
maps:size(Discarded),
- Con0, Checked, Effects0, State0),
- {State, Effects, _} = checkout(State1, Effects1),
+ Con0, Checked, Effects0, State1),
+ {State, Effects, _} = checkout(State2, Effects1),
% settle metrics are incremented separately
update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects).
@@ -873,8 +896,9 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
1, Header0),
Msg = {RaftId, {Header, RawMsg}},
% this should not affect the release cursor in any way
- State0#state{messages = maps:put(MsgNum, Msg, Messages),
- returns = lqueue:in(MsgNum, Returns)}.
+ add_bytes_return(RawMsg,
+ State0#state{messages = maps:put(MsgNum, Msg, Messages),
+ returns = lqueue:in(MsgNum, Returns)}).
return_all(State, Checked) ->
maps:fold(fun (_, '$prefix_msg', S) ->
@@ -993,13 +1017,16 @@ checkout_one(#state{service_queue = SQ0,
{Cons, SQ, []} = % we expect no effects
update_or_remove_sub(ConsumerId, Con,
Cons0, SQ1, []),
- State = State0#state{service_queue = SQ,
- messages = Messages,
- consumers = Cons},
- Msg = case ConsumerMsg of
- '$prefix_msg' -> '$prefix_msg';
- {_, {_, M}} -> M
- end,
+ State1 = State0#state{service_queue = SQ,
+ messages = Messages,
+ consumers = Cons},
+ {State, Msg} =
+ case ConsumerMsg of
+ '$prefix_msg' ->
+ {State1, '$prefix_msg'};
+ {_, {_, {_, RawMsg} = M}} ->
+ {add_bytes_checkout(RawMsg, State1), M}
+ end,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
@@ -1147,6 +1174,35 @@ make_purge() -> #purge{}.
make_update_state(Config) ->
#update_state{config = Config}.
+add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
+ Bytes = message_size(Msg),
+ State#state{msg_bytes_enqueue = Enqueue + Bytes}.
+
+add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue } = State) ->
+ Bytes = message_size(Msg),
+ State#state{msg_bytes_checkout = Checkout + Bytes,
+ msg_bytes_enqueue = Enqueue - Bytes}.
+
+add_bytes_settle(Msg, #state{msg_bytes_checkout = Checkout} = State) ->
+ Bytes = message_size(Msg),
+ State#state{msg_bytes_checkout = Checkout - Bytes}.
+
+add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue} = State) ->
+ Bytes = message_size(Msg),
+ State#state{msg_bytes_checkout = Checkout - Bytes,
+ msg_bytes_enqueue = Enqueue + Bytes}.
+
+message_size(#basic_message{content = Content}) ->
+ #content{payload_fragments_rev = PFR} = Content,
+ iolist_size(PFR);
+message_size(B) when is_binary(B) ->
+ byte_size(B);
+message_size(Msg) ->
+ %% probably only hit this for testing so ok to use erts_debug
+ erts_debug:size(Msg).
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -1170,7 +1226,8 @@ make_update_state(Config) ->
test_init(Name) ->
init(#{name => Name,
- queue_resource => queue_resource,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(Name, utf8)),
shadow_copy_interval => 0}).
enq_enq_checkout_test() ->
@@ -1529,13 +1586,15 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
tick_test() ->
Cid = {<<"c">>, self()},
Cid2 = {<<"c2">>, self()},
- {S0, _} = enq(1, 1, fst, test_init(test)),
- {S1, _} = enq(2, 2, snd, S0),
+ {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)),
+ {S1, _} = enq(2, 2, <<"snd">>, S0),
{S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
{S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3),
- [{mod_call, _, _, [_, {test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4),
+ [{mod_call, _, _,
+ [#resource{},
+ {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = tick(1, S4),
ok.
enq_deq_snapshot_recover_test() ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index bb8af13b9d..2394822763 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -150,8 +150,7 @@ ra_machine(Q) ->
ra_machine_config(Q = #amqqueue{name = QName}) ->
#{dead_letter_handler => dlx_mfa(Q),
queue_resource => QName,
- become_leader_handler => {?MODULE, become_leader, [QName]},
- metrics_handler => {?MODULE, update_metrics, [QName]}}.
+ become_leader_handler => {?MODULE, become_leader, [QName]}}.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
Node = node(ChPid),
@@ -198,14 +197,17 @@ rpc_delete_metrics(QName) ->
ets:delete(queue_metrics, QName),
ok.
-update_metrics(QName, {Name, MR, MU, M, C}) ->
+update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
R = reductions(Name),
rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
Util = case C of
0 -> 0;
_ -> rabbit_fifo:usage(Name)
end,
- Infos = [{consumers, C}, {consumer_utilisation, Util} | infos(QName)],
+ Infos = [{consumers, C}, {consumer_utilisation, Util},
+ {message_bytes_ready, MsgBytesReady},
+ {message_bytes_unacknowledged, MsgBytesUnack},
+ {message_bytes, MsgBytesReady + MsgBytesUnack} | infos(QName)],
rabbit_core_metrics:queue_stats(QName, Infos),
rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
{messages, M},
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index c94fb9ddab..d698618494 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -120,6 +120,7 @@ all_tests() ->
delete_immediately_by_resource,
consume_redelivery_count,
subscribe_redelivery_count,
+ message_bytes_metrics,
memory_alarm_rolls_wal
].
@@ -2039,7 +2040,7 @@ consume_redelivery_count(Config) ->
requeue = true}),
%% wait for requeueing
timer:sleep(500),
-
+
{#'basic.get_ok'{delivery_tag = DeliveryTag1,
redelivered = true},
#amqp_msg{props = #'P_basic'{headers = H1}}} =
@@ -2058,14 +2059,71 @@ consume_redelivery_count(Config) ->
?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
multiple = false,
- requeue = true}).
+ requeue = true}),
+ ok.
-memory_alarm_rolls_wal(Config) ->
+message_bytes_metrics(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ QRes = rabbit_misc:r(<<"/">>, queue, QQ),
+
+ publish(Ch, QQ),
+
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_until(fun() ->
+ {3, 3, 0} == get_message_bytes(Leader, QRes)
+ end),
+
+ subscribe(Ch, QQ, false),
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ wait_until(fun() ->
+ {3, 0, 3} == get_message_bytes(Leader, QRes)
+ end),
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_until(fun() ->
+ {0, 0, 0} == get_message_bytes(Leader, QRes)
+ end)
+ end,
+
+ %% Let's publish and then close the consumer channel. Messages must be
+ %% returned to the queue
+ publish(Ch, QQ),
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ wait_until(fun() ->
+ {3, 0, 3} == get_message_bytes(Leader, QRes)
+ end),
+
+ rabbit_ct_client_helpers:close_channel(Ch),
+
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_until(fun() ->
+ {3, 3, 0} == get_message_bytes(Leader, QRes)
+ end),
+ ok.
+
+memory_alarm_rolls_wal(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []),
[Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"),
ok = rpc:call(Server, rabbit_alarm, set_alarm,
@@ -2079,7 +2137,11 @@ memory_alarm_rolls_wal(Config) ->
[{{resource_limit, memory, Server}, []}]),
timer:sleep(1000),
[Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"),
- ?assert(Wal1 == Wal2).
+ ?assert(Wal1 == Wal2),
+ ok = rpc:call(Server, rabbit_alarm, clear_alarm,
+ [{{resource_limit, memory, Server}, []}]),
+ timer:sleep(1000),
+ ok.
%%----------------------------------------------------------------------------
@@ -2219,6 +2281,7 @@ dirty_query(Servers, QName, Fun) ->
fun(N) ->
case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of
{ok, {_, Msgs}, _} ->
+ ct:pal("Msgs ~w", [Msgs]),
Msgs;
_ ->
undefined
@@ -2258,3 +2321,14 @@ delete_queues() ->
stop_node(Config, Server) ->
rabbit_ct_broker_helpers:rabbitmqctl(Config, Server, ["stop"]).
+
+get_message_bytes(Leader, QRes) ->
+ case rpc:call(Leader, ets, lookup, [queue_metrics, QRes]) of
+ [{QRes, Props, _}] ->
+ {proplists:get_value(message_bytes, Props),
+ proplists:get_value(message_bytes_ready, Props),
+ proplists:get_value(message_bytes_unacknowledged, Props)};
+ _ ->
+ []
+ end.
+