summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2018-12-10 12:36:56 +0000
committerkjnilsson <knilsson@pivotal.io>2018-12-14 13:52:09 +0000
commit3676541e7a30b69333ea887450c8ddf6e3c3510d (patch)
tree2822189583461010a9da9c04c3a24bd8e61be64f /src
parent685f8ea30c0654f9264ec3b25a9ccceb1a2e9a3e (diff)
downloadrabbitmq-server-git-3676541e7a30b69333ea887450c8ddf6e3c3510d.tar.gz
Report message bytes in quorum queue stats
[#161505138]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl103
-rw-r--r--src/rabbit_quorum_queue.erl10
2 files changed, 87 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 2d5c267227..d365d41a96 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -22,6 +22,7 @@
-compile(inline).
-include_lib("ra/include/ra.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-export([
init/1,
@@ -42,6 +43,8 @@
query_consumers/1,
usage/1,
+ zero/1,
+
%% misc
dehydrate_state/1,
@@ -226,7 +229,9 @@
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
- PrefixMsgs :: non_neg_integer()}
+ PrefixMsgs :: non_neg_integer()},
+ msg_bytes_enqueue = 0 :: non_neg_integer(),
+ msg_bytes_checkout = 0 :: non_neg_integer()
}).
-opaque state() :: #state{}.
@@ -266,6 +271,9 @@ update_state(Conf, State) ->
become_leader_handler = BLH,
shadow_copy_interval = SHI}.
+zero(_) ->
+ 0.
+
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
-spec apply(ra_machine:command_meta_data(), command(),
@@ -275,7 +283,8 @@ apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
msg = RawMsg}, Effects0, State00) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
{ok, State0, Effects1} ->
- {State, Effects, ok} = checkout(State0, Effects1),
+ {State, Effects, ok} = checkout(add_bytes_enqueue(RawMsg, State0),
+ Effects1),
{append_to_master_index(RaftIdx, State), Effects, ok};
{duplicate, State, Effects} ->
{State, Effects, ok}
@@ -543,12 +552,16 @@ tick(_Ts, #state{name = Name,
queue_resource = QName,
messages = Messages,
ra_indexes = Indexes,
- consumers = Cons} = State) ->
+ consumers = Cons,
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {Name,
maps:size(Messages), % Ready
num_checked_out(State), % checked out
rabbit_fifo_index:size(Indexes), %% Total
- maps:size(Cons)}, % Consumers
+ maps:size(Cons), % Consumers
+ EnqueueBytes,
+ CheckoutBytes},
[{mod_call, rabbit_quorum_queue,
update_metrics, [QName, Metrics]}, {aux, emit}].
@@ -556,13 +569,18 @@ tick(_Ts, #state{name = Name,
overview(#state{consumers = Cons,
enqueuers = Enqs,
messages = Messages,
- ra_indexes = Indexes} = State) ->
+ ra_indexes = Indexes,
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes
+ } = State) ->
#{type => ?MODULE,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
num_enqueuers => maps:size(Enqs),
num_ready_messages => maps:size(Messages),
- num_messages => rabbit_fifo_index:size(Indexes)}.
+ num_messages => rabbit_fifo_index:size(Indexes),
+ enqueue_message_bytes => EnqueueBytes,
+ checkout_message_bytes => CheckoutBytes}.
-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) ->
[delivery_msg()].
@@ -806,12 +824,17 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
Checked = maps:without(MsgIds, Checked0),
Discarded = maps:with(MsgIds, Checked0),
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
+ State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) ->
+ add_bytes_settle(RawMsg, Acc);
+ (_, Acc) ->
+ Acc
+ end, State0, maps:values(Discarded)),
%% need to pass the length of discarded as $prefix_msgs would be filtered
%% by the above list comprehension
- {State1, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
+ {State2, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
maps:size(Discarded),
- Con0, Checked, Effects0, State0),
- {State, Effects, _} = checkout(State1, Effects1),
+ Con0, Checked, Effects0, State1),
+ {State, Effects, _} = checkout(State2, Effects1),
% settle metrics are incremented separately
update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects).
@@ -873,8 +896,9 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
1, Header0),
Msg = {RaftId, {Header, RawMsg}},
% this should not affect the release cursor in any way
- State0#state{messages = maps:put(MsgNum, Msg, Messages),
- returns = lqueue:in(MsgNum, Returns)}.
+ add_bytes_return(RawMsg,
+ State0#state{messages = maps:put(MsgNum, Msg, Messages),
+ returns = lqueue:in(MsgNum, Returns)}).
return_all(State, Checked) ->
maps:fold(fun (_, '$prefix_msg', S) ->
@@ -993,13 +1017,16 @@ checkout_one(#state{service_queue = SQ0,
{Cons, SQ, []} = % we expect no effects
update_or_remove_sub(ConsumerId, Con,
Cons0, SQ1, []),
- State = State0#state{service_queue = SQ,
- messages = Messages,
- consumers = Cons},
- Msg = case ConsumerMsg of
- '$prefix_msg' -> '$prefix_msg';
- {_, {_, M}} -> M
- end,
+ State1 = State0#state{service_queue = SQ,
+ messages = Messages,
+ consumers = Cons},
+ {State, Msg} =
+ case ConsumerMsg of
+ '$prefix_msg' ->
+ {State1, '$prefix_msg'};
+ {_, {_, {_, RawMsg} = M}} ->
+ {add_bytes_checkout(RawMsg, State1), M}
+ end,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
@@ -1147,6 +1174,35 @@ make_purge() -> #purge{}.
make_update_state(Config) ->
#update_state{config = Config}.
+add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
+ Bytes = message_size(Msg),
+ State#state{msg_bytes_enqueue = Enqueue + Bytes}.
+
+add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue } = State) ->
+ Bytes = message_size(Msg),
+ State#state{msg_bytes_checkout = Checkout + Bytes,
+ msg_bytes_enqueue = Enqueue - Bytes}.
+
+add_bytes_settle(Msg, #state{msg_bytes_checkout = Checkout} = State) ->
+ Bytes = message_size(Msg),
+ State#state{msg_bytes_checkout = Checkout - Bytes}.
+
+add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue} = State) ->
+ Bytes = message_size(Msg),
+ State#state{msg_bytes_checkout = Checkout - Bytes,
+ msg_bytes_enqueue = Enqueue + Bytes}.
+
+message_size(#basic_message{content = Content}) ->
+ #content{payload_fragments_rev = PFR} = Content,
+ iolist_size(PFR);
+message_size(B) when is_binary(B) ->
+ byte_size(B);
+message_size(Msg) ->
+ %% probably only hit this for testing so ok to use erts_debug
+ erts_debug:size(Msg).
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -1170,7 +1226,8 @@ make_update_state(Config) ->
test_init(Name) ->
init(#{name => Name,
- queue_resource => queue_resource,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(Name, utf8)),
shadow_copy_interval => 0}).
enq_enq_checkout_test() ->
@@ -1529,13 +1586,15 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
tick_test() ->
Cid = {<<"c">>, self()},
Cid2 = {<<"c2">>, self()},
- {S0, _} = enq(1, 1, fst, test_init(test)),
- {S1, _} = enq(2, 2, snd, S0),
+ {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)),
+ {S1, _} = enq(2, 2, <<"snd">>, S0),
{S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
{S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3),
- [{mod_call, _, _, [_, {test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4),
+ [{mod_call, _, _,
+ [#resource{},
+ {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = tick(1, S4),
ok.
enq_deq_snapshot_recover_test() ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index bb8af13b9d..2394822763 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -150,8 +150,7 @@ ra_machine(Q) ->
ra_machine_config(Q = #amqqueue{name = QName}) ->
#{dead_letter_handler => dlx_mfa(Q),
queue_resource => QName,
- become_leader_handler => {?MODULE, become_leader, [QName]},
- metrics_handler => {?MODULE, update_metrics, [QName]}}.
+ become_leader_handler => {?MODULE, become_leader, [QName]}}.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
Node = node(ChPid),
@@ -198,14 +197,17 @@ rpc_delete_metrics(QName) ->
ets:delete(queue_metrics, QName),
ok.
-update_metrics(QName, {Name, MR, MU, M, C}) ->
+update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
R = reductions(Name),
rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
Util = case C of
0 -> 0;
_ -> rabbit_fifo:usage(Name)
end,
- Infos = [{consumers, C}, {consumer_utilisation, Util} | infos(QName)],
+ Infos = [{consumers, C}, {consumer_utilisation, Util},
+ {message_bytes_ready, MsgBytesReady},
+ {message_bytes_unacknowledged, MsgBytesUnack},
+ {message_bytes, MsgBytesReady + MsgBytesUnack} | infos(QName)],
rabbit_core_metrics:queue_stats(QName, Infos),
rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
{messages, M},