diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 103 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 10 |
2 files changed, 87 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 2d5c267227..d365d41a96 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -22,6 +22,7 @@ -compile(inline). -include_lib("ra/include/ra.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -export([ init/1, @@ -42,6 +43,8 @@ query_consumers/1, usage/1, + zero/1, + %% misc dehydrate_state/1, @@ -226,7 +229,9 @@ %% This is done so that consumers are still served in a deterministic %% order on recovery. prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(), - PrefixMsgs :: non_neg_integer()} + PrefixMsgs :: non_neg_integer()}, + msg_bytes_enqueue = 0 :: non_neg_integer(), + msg_bytes_checkout = 0 :: non_neg_integer() }). -opaque state() :: #state{}. @@ -266,6 +271,9 @@ update_state(Conf, State) -> become_leader_handler = BLH, shadow_copy_interval = SHI}. +zero(_) -> + 0. + % msg_ids are scoped per consumer % ra_indexes holds all raft indexes for enqueues currently on queue -spec apply(ra_machine:command_meta_data(), command(), @@ -275,7 +283,8 @@ apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq, msg = RawMsg}, Effects0, State00) -> case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of {ok, State0, Effects1} -> - {State, Effects, ok} = checkout(State0, Effects1), + {State, Effects, ok} = checkout(add_bytes_enqueue(RawMsg, State0), + Effects1), {append_to_master_index(RaftIdx, State), Effects, ok}; {duplicate, State, Effects} -> {State, Effects, ok} @@ -543,12 +552,16 @@ tick(_Ts, #state{name = Name, queue_resource = QName, messages = Messages, ra_indexes = Indexes, - consumers = Cons} = State) -> + consumers = Cons, + msg_bytes_enqueue = EnqueueBytes, + msg_bytes_checkout = CheckoutBytes} = State) -> Metrics = {Name, maps:size(Messages), % Ready num_checked_out(State), % checked out rabbit_fifo_index:size(Indexes), %% Total - maps:size(Cons)}, % Consumers + maps:size(Cons), % Consumers + EnqueueBytes, + CheckoutBytes}, [{mod_call, rabbit_quorum_queue, update_metrics, [QName, Metrics]}, {aux, emit}]. @@ -556,13 +569,18 @@ tick(_Ts, #state{name = Name, overview(#state{consumers = Cons, enqueuers = Enqs, messages = Messages, - ra_indexes = Indexes} = State) -> + ra_indexes = Indexes, + msg_bytes_enqueue = EnqueueBytes, + msg_bytes_checkout = CheckoutBytes + } = State) -> #{type => ?MODULE, num_consumers => maps:size(Cons), num_checked_out => num_checked_out(State), num_enqueuers => maps:size(Enqs), num_ready_messages => maps:size(Messages), - num_messages => rabbit_fifo_index:size(Indexes)}. + num_messages => rabbit_fifo_index:size(Indexes), + enqueue_message_bytes => EnqueueBytes, + checkout_message_bytes => CheckoutBytes}. -spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) -> [delivery_msg()]. @@ -806,12 +824,17 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId, Checked = maps:without(MsgIds, Checked0), Discarded = maps:with(MsgIds, Checked0), MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], + State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) -> + add_bytes_settle(RawMsg, Acc); + (_, Acc) -> + Acc + end, State0, maps:values(Discarded)), %% need to pass the length of discarded as $prefix_msgs would be filtered %% by the above list comprehension - {State1, Effects1, _} = complete(ConsumerId, MsgRaftIdxs, + {State2, Effects1, _} = complete(ConsumerId, MsgRaftIdxs, maps:size(Discarded), - Con0, Checked, Effects0, State0), - {State, Effects, _} = checkout(State1, Effects1), + Con0, Checked, Effects0, State1), + {State, Effects, _} = checkout(State2, Effects1), % settle metrics are incremented separately update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects). @@ -873,8 +896,9 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, 1, Header0), Msg = {RaftId, {Header, RawMsg}}, % this should not affect the release cursor in any way - State0#state{messages = maps:put(MsgNum, Msg, Messages), - returns = lqueue:in(MsgNum, Returns)}. + add_bytes_return(RawMsg, + State0#state{messages = maps:put(MsgNum, Msg, Messages), + returns = lqueue:in(MsgNum, Returns)}). return_all(State, Checked) -> maps:fold(fun (_, '$prefix_msg', S) -> @@ -993,13 +1017,16 @@ checkout_one(#state{service_queue = SQ0, {Cons, SQ, []} = % we expect no effects update_or_remove_sub(ConsumerId, Con, Cons0, SQ1, []), - State = State0#state{service_queue = SQ, - messages = Messages, - consumers = Cons}, - Msg = case ConsumerMsg of - '$prefix_msg' -> '$prefix_msg'; - {_, {_, M}} -> M - end, + State1 = State0#state{service_queue = SQ, + messages = Messages, + consumers = Cons}, + {State, Msg} = + case ConsumerMsg of + '$prefix_msg' -> + {State1, '$prefix_msg'}; + {_, {_, {_, RawMsg} = M}} -> + {add_bytes_checkout(RawMsg, State1), M} + end, {success, ConsumerId, Next, Msg, State}; error -> %% consumer did not exist but was queued, recurse @@ -1147,6 +1174,35 @@ make_purge() -> #purge{}. make_update_state(Config) -> #update_state{config = Config}. +add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) -> + Bytes = message_size(Msg), + State#state{msg_bytes_enqueue = Enqueue + Bytes}. + +add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout, + msg_bytes_enqueue = Enqueue } = State) -> + Bytes = message_size(Msg), + State#state{msg_bytes_checkout = Checkout + Bytes, + msg_bytes_enqueue = Enqueue - Bytes}. + +add_bytes_settle(Msg, #state{msg_bytes_checkout = Checkout} = State) -> + Bytes = message_size(Msg), + State#state{msg_bytes_checkout = Checkout - Bytes}. + +add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout, + msg_bytes_enqueue = Enqueue} = State) -> + Bytes = message_size(Msg), + State#state{msg_bytes_checkout = Checkout - Bytes, + msg_bytes_enqueue = Enqueue + Bytes}. + +message_size(#basic_message{content = Content}) -> + #content{payload_fragments_rev = PFR} = Content, + iolist_size(PFR); +message_size(B) when is_binary(B) -> + byte_size(B); +message_size(Msg) -> + %% probably only hit this for testing so ok to use erts_debug + erts_debug:size(Msg). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -1170,7 +1226,8 @@ make_update_state(Config) -> test_init(Name) -> init(#{name => Name, - queue_resource => queue_resource, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(Name, utf8)), shadow_copy_interval => 0}). enq_enq_checkout_test() -> @@ -1529,13 +1586,15 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() -> tick_test() -> Cid = {<<"c">>, self()}, Cid2 = {<<"c2">>, self()}, - {S0, _} = enq(1, 1, fst, test_init(test)), - {S1, _} = enq(2, 2, snd, S0), + {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)), + {S1, _} = enq(2, 2, <<"snd">>, S0), {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1), {S3, {_, _}} = deq(4, Cid2, unsettled, S2), {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3), - [{mod_call, _, _, [_, {test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4), + [{mod_call, _, _, + [#resource{}, + {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = tick(1, S4), ok. enq_deq_snapshot_recover_test() -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index bb8af13b9d..2394822763 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -150,8 +150,7 @@ ra_machine(Q) -> ra_machine_config(Q = #amqqueue{name = QName}) -> #{dead_letter_handler => dlx_mfa(Q), queue_resource => QName, - become_leader_handler => {?MODULE, become_leader, [QName]}, - metrics_handler => {?MODULE, update_metrics, [QName]}}. + become_leader_handler => {?MODULE, become_leader, [QName]}}. cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> Node = node(ChPid), @@ -198,14 +197,17 @@ rpc_delete_metrics(QName) -> ets:delete(queue_metrics, QName), ok. -update_metrics(QName, {Name, MR, MU, M, C}) -> +update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> R = reductions(Name), rabbit_core_metrics:queue_stats(QName, MR, MU, M, R), Util = case C of 0 -> 0; _ -> rabbit_fifo:usage(Name) end, - Infos = [{consumers, C}, {consumer_utilisation, Util} | infos(QName)], + Infos = [{consumers, C}, {consumer_utilisation, Util}, + {message_bytes_ready, MsgBytesReady}, + {message_bytes_unacknowledged, MsgBytesUnack}, + {message_bytes, MsgBytesReady + MsgBytesUnack} | infos(QName)], rabbit_core_metrics:queue_stats(QName, Infos), rabbit_event:notify(queue_stats, Infos ++ [{name, QName}, {messages, M}, |
