diff options
author | Michal Kuratczyk <mkuratczyk@pivotal.io> | 2021-04-29 16:35:46 +0200 |
---|---|---|
committer | Michal Kuratczyk <mkuratczyk@pivotal.io> | 2021-04-29 16:48:29 +0200 |
commit | 073db9ec6af17e4775272cd1963d3d3dfd9cd05d (patch) | |
tree | 72718467cb4821abfe8b6512555a72b2b0484a3d | |
parent | e6d25fd06c178d917996e3cd132de632e77470ed (diff) | |
download | rabbitmq-server-git-global-counters-idea.tar.gz |
Current progressglobal-counters-idea
* moved unroutable message handling to rabbit_queue_type
* move more metric emitting to rabbit_queue_type
* added tests for most metrics
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 7 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_global_counters.erl | 196 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_type.erl | 85 | ||||
-rw-r--r-- | deps/rabbit/test/queue_type_SUITE.erl | 80 | ||||
-rw-r--r-- | deps/rabbit_common/src/rabbit_core_metrics.erl | 8 |
5 files changed, 269 insertions, 107 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 6d4d793c38..5f8c409d23 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2105,13 +2105,6 @@ notify_limiter(Limiter, Acked) -> end end. -deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, - confirm = false, - mandatory = false}, - _RoutedToQs = []}, State) -> %% optimisation - ?INCR_STATS(exchange_stats, XName, 1, publish, State), - ?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State), - State; deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, mandatory = Mandatory, diff --git a/deps/rabbit/src/rabbit_global_counters.erl b/deps/rabbit/src/rabbit_global_counters.erl index 2e7ae40f94..bdaf44c23b 100644 --- a/deps/rabbit/src/rabbit_global_counters.erl +++ b/deps/rabbit/src/rabbit_global_counters.erl @@ -12,65 +12,71 @@ init/0, overview/0, prometheus_format/0, - messages_received/1, + messages_published/1, messages_routed/1, - messages_acknowledged/1, - channel_get_ack/1, - channel_get_empty/1, - channel_get/1, - channel_messages_delivered_ack/1, - channel_messages_delivered/1, - channel_messages_redelivered/1 + messages_delivered_consume_ack/1, + messages_delivered_consume_autoack/1, + messages_delivered_get_ack/1, + messages_delivered_get_autoack/1, + messages_redelivered/1, + basic_get_empty/1, + messages_unroutable_dropped/1, + messages_unroutable_returned/1 ]). --define(MESSAGES_RECEIVED, 1). +-define(MESSAGES_PUBLISHED, 1). -define(MESSAGES_ROUTED, 2). --define(MESSAGES_ACKNOWLEDGED, 3). --define(CHANNEL_GET_ACK, 4). --define(CHANNEL_GET_EMPTY, 5). --define(CHANNEL_GET, 6). --define(CHANNEL_MESSAGES_DELIVERED_ACK, 7). --define(CHANNEL_MESSAGES_DELIVERED, 8). --define(CHANNEL_MESSAGES_REDELIVERED, 9). +-define(MESSAGES_DELIVERED_CONSUME_ACK, 3). +-define(MESSAGES_DELIVERED_CONSUME_AUTOACK, 4). +-define(MESSAGES_DELIVERED_GET_ACK, 5). +-define(MESSAGES_DELIVERED_GET_AUTOACK, 6). +-define(MESSAGES_REDELIVERED, 7). +-define(BASIC_GET_EMPTY, 8). +-define(MESSAGES_UNROUTABLE_DROPPED, 9). +-define(MESSAGES_UNROUTABLE_RETURNED, 10). -define(COUNTERS, [ { - messages_received_total, ?MESSAGES_RECEIVED, counter, - "Total number of messages received from clients" - }, - { - messages_acknowledged_total, ?MESSAGES_ACKNOWLEDGED, counter, - "Total number of message acknowledgements received from consumers" + messages_published_total, ?MESSAGES_PUBLISHED, counter, + "Total number of messages published to queues and streams" }, { messages_routed_total, ?MESSAGES_ROUTED, counter, "Total number of messages routed to queues" }, { - channel_get_ack_total, ?CHANNEL_GET_ACK, counter, - "Total number of messages fetched with basic.get in manual acknowledgement mode" + messages_delivered_consume_ack_total, ?MESSAGES_DELIVERED_CONSUME_ACK, counter, + "Total number of messages consumed using basic.consume with manual acknowledgment" }, { - channel_get_empty_total, ?CHANNEL_GET_EMPTY, counter, - "Total number of times basic.get operations fetched no message" + messages_delivered_consume_autoack_total, ?MESSAGES_DELIVERED_CONSUME_AUTOACK, counter, + "Total number of messages consumed using basic.consume with automatic acknowledgment" + }, + { + messages_delivered_get_ack_total, ?MESSAGES_DELIVERED_GET_ACK, counter, + "Total number of messages consumed using basic.get with manual acknowledgment" }, { - channel_get_total, ?CHANNEL_GET, counter, - "Total number of messages fetched with basic.get in automatic acknowledgement mode" + messages_delivered_get_autoack_total, ?MESSAGES_DELIVERED_GET_AUTOACK, counter, + "Total number of messages consumed using basic.get with automatic acknowledgment" }, { - channel_messages_delivered_ack_total, ?CHANNEL_MESSAGES_DELIVERED_ACK, counter, - "Total number of messages delivered to consumers in manual acknowledgement mode" + messages_redelivered_total, ?MESSAGES_REDELIVERED, counter, + "Total number of messages redelivered to consumers" }, { - channel_messages_delivered_total, ?CHANNEL_MESSAGES_DELIVERED, counter, - "Total number of messages delivered to consumers in automatic acknowledgement mode" + basic_get_empty_total, ?BASIC_GET_EMPTY, counter, + "Total number of times basic.get operations fetched no message" }, { - channel_messages_redelivered_total, ?CHANNEL_MESSAGES_REDELIVERED, counter, - "Total number of messages redelivered to consumers" + messages_unroutable_dropped_total, ?MESSAGES_UNROUTABLE_DROPPED, counter, + "Total number of messages published as non-mandatory into an exchange and dropped as unroutable" + }, + { + messages_unroutable_returned_total, ?MESSAGES_UNROUTABLE_RETURNED, counter, + "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable" } ]). @@ -100,33 +106,105 @@ prometheus_format() -> [{Key, counters:get(Counters, Index), Type, Help} || {Key, Index, Type, Help} <- ?COUNTERS]. -messages_received(Num) -> - counters:add(persistent_term:get(?MODULE), ?MESSAGES_RECEIVED, Num). +% TODO - these are received by queues, not from clients (doesn't account for unroutable) +messages_published(Num) -> + counters:add(persistent_term:get(?MODULE), ?MESSAGES_PUBLISHED, Num). % formerly known as queue_messages_published_total messages_routed(Num) -> counters:add(persistent_term:get(?MODULE), ?MESSAGES_ROUTED, Num). -%% doesn't work when consuming messages through the API -messages_acknowledged(Num) -> - counters:add(persistent_term:get(?MODULE), ?MESSAGES_ACKNOWLEDGED, Num). - -channel_get_ack(Num) -> - counters:add(persistent_term:get(?MODULE), ?CHANNEL_GET_ACK, Num). - -channel_get_empty(Num) -> - counters:add(persistent_term:get(?MODULE), ?CHANNEL_GET_EMPTY, Num). - -% TODO these are auto-ack only -channel_get(Num) -> - counters:add(persistent_term:get(?MODULE), ?CHANNEL_GET, Num). - -channel_messages_delivered_ack(Num) -> - counters:add(persistent_term:get(?MODULE), ?CHANNEL_MESSAGES_DELIVERED_ACK, Num). - -% TODO these are auto-ack only but the name doesn't reflect that -channel_messages_delivered(Num) -> - counters:add(persistent_term:get(?MODULE), ?CHANNEL_MESSAGES_DELIVERED, Num). - -channel_messages_redelivered(Num) -> - counters:add(persistent_term:get(?MODULE), ?CHANNEL_MESSAGES_REDELIVERED, Num). +messages_delivered_consume_ack(Num) -> + counters:add(persistent_term:get(?MODULE), ?MESSAGES_DELIVERED_CONSUME_ACK, Num). + +messages_delivered_consume_autoack(Num) -> + counters:add(persistent_term:get(?MODULE), ?MESSAGES_DELIVERED_CONSUME_AUTOACK, Num). + +messages_delivered_get_ack(Num) -> + counters:add(persistent_term:get(?MODULE), ?MESSAGES_DELIVERED_GET_ACK, Num). + +messages_delivered_get_autoack(Num) -> + counters:add(persistent_term:get(?MODULE), ?MESSAGES_DELIVERED_GET_AUTOACK, Num). + +% not implemented yet +messages_redelivered(Num) -> + counters:add(persistent_term:get(?MODULE), ?MESSAGES_REDELIVERED, Num). + +basic_get_empty(Num) -> + counters:add(persistent_term:get(?MODULE), ?BASIC_GET_EMPTY, Num). + +% implemented in rabbit_core_metrics (it doesn't reach a queue) +messages_unroutable_returned(Num) -> + counters:add(persistent_term:get(?MODULE), ?MESSAGES_UNROUTABLE_RETURNED, Num). + +% implemented in rabbit_core_metrics (it doesn't reach a queue) +messages_unroutable_dropped(Num) -> + counters:add(persistent_term:get(?MODULE), ?MESSAGES_UNROUTABLE_DROPPED, Num). + +% TODO +% channel_messages_redelivered_total "Total number of messages redelivered to consumers" +% +% connection_incoming_bytes_total "Total number of bytes received on a connection" +% connection_outgoing_bytes_total "Total number of bytes sent on a connection" +% connection_process_reductions_total "Total number of connection process reductions" +% connection_incoming_packets_total "Total number of packets received on a connection" +% connection_outgoing_packets_total "Total number of packets sent on a connection" +% +% io_read_ops_total "Total number of I/O read operations" +% io_read_bytes_total "Total number of I/O bytes read" +% io_write_ops_total "Total number of I/O write operations" +% io_write_bytes_total "Total number of I/O bytes written" +% io_sync_ops_total "Total number of I/O sync operations" +% io_seek_ops_total "Total number of I/O seek operations" +% io_open_attempt_ops_total "Total number of file open attempts" +% io_reopen_ops_total "Total number of times files have been reopened" +% +% schema_db_ram_tx_total "Total number of Schema DB memory transactions" +% schema_db_disk_tx_total "Total number of Schema DB disk transactions" +% msg_store_read_total "Total number of Message Store read operations" +% msg_store_write_total "Total number of Message Store write operations" +% queue_index_read_ops_total "Total number of Queue Index read operations" +% queue_index_write_ops_total "Total number of Queue Index write operations" +% queue_index_journal_write_ops_total "Total number of Queue Index Journal write operations" +% io_read_time_seconds_total "Total I/O read time" +% io_write_time_seconds_total "Total I/O write time" +% io_sync_time_seconds_total "Total I/O sync time" +% io_seek_time_seconds_total "Total I/O seek time" +% io_open_attempt_time_seconds_total "Total file open attempts time" +% raft_term_total "Current Raft term number" +% queue_disk_reads_total "Total number of times queue read messages from disk" +% queue_disk_writes_total "Total number of times queue wrote messages to disk" + +% DONE +% channel_messages_published_total "Total number of messages published into an exchange on a channel" +% channel_messages_confirmed_total "Total number of messages published into an exchange and confirmed on the channel" +% channel_messages_unroutable_returned_total "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable" +% channel_messages_unroutable_dropped_total "Total number of messages published as non-mandatory into an exchange and dropped as unroutable" +% channel_get_empty_total "Total number of times basic.get operations fetched no message" +% channel_get_ack_total "Total number of messages fetched with basic.get in manual acknowledgement mode" +% channel_get_total "Total number of messages fetched with basic.get in automatic acknowledgement mode" +% channel_messages_delivered_ack_total "Total number of messages delivered to consumers in manual acknowledgement mode" +% channel_messages_delivered_total "Total number of messages delivered to consumers in automatic acknowledgement mode" +% queue_messages_published_total "Total number of messages published to queues" + +% IGNORED (IS THIS USEFUL?) +% channel_process_reductions_total "Total number of channel process reductions" +% queue_process_reductions_total "Total number of queue process reductions" + +% NOT NECESSARY (DON'T GO TO ZERO) +% erlang_gc_runs_total "Total number of Erlang garbage collector runs" +% erlang_gc_reclaimed_bytes_total "Total number of bytes of memory reclaimed by Erlang garbage collector" +% erlang_scheduler_context_switches_total "Total number of Erlang scheduler context switches" +% connections_opened_total "Total number of connections opened" +% connections_closed_total "Total number of connections closed or terminated" +% channels_opened_total "Total number of channels opened" +% channels_closed_total "Total number of channels closed" +% queues_declared_total "Total number of queues declared" +% queues_created_total "Total number of queues created" +% queues_deleted_total "Total number of queues deleted" +% auth_attempts_total "Total number of authorization attempts" +% auth_attempts_succeeded_total "Total number of successful authentication attempts" +% auth_attempts_failed_total "Total number of failed authentication attempts" +% auth_attempts_detailed_total "Total number of authorization attempts with source info" +% auth_attempts_detailed_succeeded_total "Total number of successful authorization attempts with source info" +% auth_attempts_detailed_failed_total "Total number of failed authorization attempts with source info"
\ No newline at end of file diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 39b1b2cdf7..7877d55226 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -1,6 +1,6 @@ -module(rabbit_queue_type). -include("amqqueue.hrl"). --include_lib("rabbit_common/include/resource.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -export([ init/0, @@ -441,38 +441,55 @@ module(QRef, Ctxs) -> stateless | state()) -> {ok, state(), actions()}. deliver(Qs, Delivery, stateless) -> - rabbit_global_counters:messages_received(1), - rabbit_global_counters:messages_routed(length(Qs)), - _ = lists:map(fun(Q) -> - Mod = amqqueue:get_type(Q), - _ = Mod:deliver([{Q, stateless}], Delivery) - end, Qs), + case Qs of + [] -> + case Delivery#delivery.mandatory of + false -> rabbit_global_counters:messages_unroutable_dropped(1); + true -> rabbit_global_counters:messages_unroutable_returned(1) + end; + _ -> + rabbit_global_counters:messages_routed(1), + _ = lists:map(fun(Q) -> + Mod = amqqueue:get_type(Q), + _ = Mod:deliver([{Q, stateless}], Delivery) + end, Qs), + rabbit_global_counters:messages_published(length(Qs)) + end, {ok, stateless, []}; deliver(Qs, Delivery, #?STATE{} = State0) -> - rabbit_global_counters:messages_received(1), - rabbit_global_counters:messages_routed(length(Qs)), - %% TODO: optimise single queue case? - %% sort by queue type - then dispatch each group - ByType = lists:foldl( - fun (Q, Acc) -> - T = amqqueue:get_type(Q), - Ctx = get_ctx(Q, State0), - maps:update_with( - T, fun (A) -> - [{Q, Ctx#ctx.state} | A] - end, [{Q, Ctx#ctx.state}], Acc) - end, #{}, Qs), - %%% dispatch each group to queue type interface? - {Xs, Actions} = maps:fold(fun(Mod, QTSs, {X0, A0}) -> - {X, A} = Mod:deliver(QTSs, Delivery), - {X0 ++ X, A0 ++ A} - end, {[], []}, ByType), - State = lists:foldl( - fun({Q, S}, Acc) -> - Ctx = get_ctx_with(Q, Acc, S), - set_ctx(qref(Q), Ctx#ctx{state = S}, Acc) - end, State0, Xs), - return_ok(State, Actions). + case Qs of + [] -> + case Delivery#delivery.mandatory of + false -> rabbit_global_counters:messages_unroutable_dropped(1); + true -> rabbit_global_counters:messages_unroutable_returned(1) + end, + return_ok(State0, []); + _ -> + rabbit_global_counters:messages_routed(1), + %% TODO: optimise single queue case? + %% sort by queue type - then dispatch each group + ByType = lists:foldl( + fun (Q, Acc) -> + T = amqqueue:get_type(Q), + Ctx = get_ctx(Q, State0), + maps:update_with( + T, fun (A) -> + [{Q, Ctx#ctx.state} | A] + end, [{Q, Ctx#ctx.state}], Acc) + end, #{}, Qs), + %%% dispatch each group to queue type interface? + {Xs, Actions} = maps:fold(fun(Mod, QTSs, {X0, A0}) -> + {X, A} = Mod:deliver(QTSs, Delivery), + rabbit_global_counters:messages_published(length(Qs)), + {X0 ++ X, A0 ++ A} + end, {[], []}, ByType), + State = lists:foldl( + fun({Q, S}, Acc) -> + Ctx = get_ctx_with(Q, Acc, S), + set_ctx(qref(Q), Ctx#ctx{state = S}, Acc) + end, State0, Xs), + return_ok(State, Actions) + end. -spec settle(queue_ref(), settle_op(), rabbit_types:ctag(), @@ -481,7 +498,6 @@ deliver(Qs, Delivery, #?STATE{} = State0) -> {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}. settle(QRef, Op, CTag, MsgIds, Ctxs) when ?QREF(QRef) -> - rabbit_global_counters:messages_acknowledged(length(MsgIds)), case get_ctx(QRef, Ctxs, undefined) of undefined -> %% if we receive a settlement and there is no queue state it means @@ -515,8 +531,13 @@ dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) -> Mod = amqqueue:get_type(Q), case Mod:dequeue(NoAck, LimiterPid, CTag, State0) of {ok, Num, Msg, State} -> + case NoAck of + false -> rabbit_global_counters:messages_delivered_get_ack(1); + true -> rabbit_global_counters:messages_delivered_get_autoack(1) + end, {ok, Num, Msg, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)}; {empty, State} -> + rabbit_global_counters:basic_get_empty(1), {empty, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)}; {error, _} = Err -> Err; diff --git a/deps/rabbit/test/queue_type_SUITE.erl b/deps/rabbit/test/queue_type_SUITE.erl index aed5ad4ccb..c9779a851b 100644 --- a/deps/rabbit/test/queue_type_SUITE.erl +++ b/deps/rabbit/test/queue_type_SUITE.erl @@ -170,6 +170,47 @@ smoke(Config) -> end, %% get and ack basic_ack(Ch, basic_get(Ch, QName)), + + %% published a dropped message + publish(Ch, <<"not_found">>, <<"dropped">>), + + %% publish a returned message + publish(Ch, <<"amq.direct">>, <<"not_found">>, <<"returned">>, mandatory), + + % publish a message routed to multiple queues (there are separate message counters) + ?assertEqual({'queue.declare_ok', <<"fanout1">>, 0, 0}, + declare(Ch, <<"fanout1">>, [{<<"x-queue-type">>, longstr, + ?config(queue_type, Config)}])), + + ?assertEqual({'queue.declare_ok', <<"fanout2">>, 0, 0}, + declare(Ch, <<"fanout2">>, [{<<"x-queue-type">>, longstr, + ?config(queue_type, Config)}])), + + bind(Ch, <<"amq.fanout">>, <<"fanout1">>), + bind(Ch, <<"amq.fanout">>, <<"fanout2">>), + publish(Ch, <<"amq.fanout">>, <<"foo">>, <<"fanout">>), + + %% publish and get with manual ack + % publish(Ch, QName, <<"autoack">>), + % basic_get_autoack(Ch, QName), + + close_channel(Ch), + + + ?assertEqual(get_global_counters(Config), + #{ + basic_get_empty_total => 2, + messages_delivered_consume_ack_total => 0, + messages_delivered_consume_autoack_total => 0, + messages_delivered_get_ack_total => 2, + messages_delivered_get_autoack_total => 0, + messages_published_total => 5, + messages_redelivered_total => 0, + messages_routed_total => 4, + messages_unroutable_dropped_total => 1, + messages_unroutable_returned_total => 1 + } + ), ok. ack_after_queue_delete(Config) -> @@ -224,21 +265,43 @@ delete(Ch, Q) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q}). publish(Ch, Queue, Msg) -> + publish(Ch, <<>>, Queue, Msg). + +publish(Ch, Exchange, RoutingKey, Msg) -> ok = amqp_channel:cast(Ch, - #'basic.publish'{routing_key = Queue}, + #'basic.publish'{exchange = Exchange, routing_key = RoutingKey}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}, payload = Msg}). +publish(Ch, Exchange, RoutingKey, Msg, mandatory) -> + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = Exchange, routing_key = RoutingKey, mandatory = true}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = Msg}), + receive + #'basic.cancel_ok'{} -> + ok; + #'basic.return'{} -> + ok + after 5000 -> + exit(basic_publish_timeout) + end. + basic_get(Ch, Queue) -> {GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue, no_ack = false}), ?assertMatch({#'basic.get_ok'{}, #amqp_msg{}}, Reply), GetOk#'basic.get_ok'.delivery_tag. +basic_get_autoack(Ch, Queue) -> + ?assertMatch(#'basic.get_empty'{}, + amqp_channel:call(Ch, #'basic.get'{queue = Queue, + no_ack = true})). + basic_get_empty(Ch, Queue) -> ?assertMatch(#'basic.get_empty'{}, - amqp_channel:call(Ch, #'basic.get'{queue = Queue, - no_ack = false})). + amqp_channel:call(Ch, #'basic.get'{queue = Queue, + no_ack = false})). subscribe(Ch, Queue, CTag) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, @@ -265,6 +328,14 @@ basic_nack(Ch, DTag) -> requeue = true, multiple = false}). +bind(Ch, Exchange, Queue) -> + Binding = #'queue.bind'{queue = Queue, + exchange = Exchange}, + #'queue.bind_ok'{} = amqp_channel:call(Ch, Binding). + +close_channel(Ch) -> + amqp_channel:close(Ch). + flush() -> receive Any -> @@ -273,3 +344,6 @@ flush() -> after 0 -> ok end. + +get_global_counters(Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []). diff --git a/deps/rabbit_common/src/rabbit_core_metrics.erl b/deps/rabbit_common/src/rabbit_core_metrics.erl index 6534242e1f..6b9dd0a9ae 100644 --- a/deps/rabbit_common/src/rabbit_core_metrics.erl +++ b/deps/rabbit_common/src/rabbit_core_metrics.erl @@ -193,32 +193,28 @@ channel_stats(queue_stats, get, Id, Value) -> channel_stats(queue_stats, get_no_ack, Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), - rabbit_global_counters:channel_get(Value), ok; channel_stats(queue_stats, deliver, Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), - rabbit_global_counters:channel_messages_delivered_ack(Value), + rabbit_global_counters:messages_delivered_consume_ack(Value), ok; channel_stats(queue_stats, deliver_no_ack, Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), - rabbit_global_counters:channel_messages_delivered(Value), + rabbit_global_counters:messages_delivered_consume_autoack(Value), ok; channel_stats(queue_stats, redeliver, Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), - rabbit_global_counters:channel_messages_redelivered(Value), ok; channel_stats(queue_stats, ack, Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), - rabbit_global_counters:channel_get_ack(Value), ok; channel_stats(queue_stats, get_empty, Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), - rabbit_global_counters:channel_get_empty(Value), ok. delete(Table, Key) -> |