summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichal Kuratczyk <mkuratczyk@pivotal.io>2021-04-29 16:35:46 +0200
committerMichal Kuratczyk <mkuratczyk@pivotal.io>2021-04-29 16:48:29 +0200
commit073db9ec6af17e4775272cd1963d3d3dfd9cd05d (patch)
tree72718467cb4821abfe8b6512555a72b2b0484a3d
parente6d25fd06c178d917996e3cd132de632e77470ed (diff)
downloadrabbitmq-server-git-global-counters-idea.tar.gz
Current progressglobal-counters-idea
* moved unroutable message handling to rabbit_queue_type * move more metric emitting to rabbit_queue_type * added tests for most metrics
-rw-r--r--deps/rabbit/src/rabbit_channel.erl7
-rw-r--r--deps/rabbit/src/rabbit_global_counters.erl196
-rw-r--r--deps/rabbit/src/rabbit_queue_type.erl85
-rw-r--r--deps/rabbit/test/queue_type_SUITE.erl80
-rw-r--r--deps/rabbit_common/src/rabbit_core_metrics.erl8
5 files changed, 269 insertions, 107 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 6d4d793c38..5f8c409d23 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -2105,13 +2105,6 @@ notify_limiter(Limiter, Acked) ->
end
end.
-deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
- confirm = false,
- mandatory = false},
- _RoutedToQs = []}, State) -> %% optimisation
- ?INCR_STATS(exchange_stats, XName, 1, publish, State),
- ?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State),
- State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
mandatory = Mandatory,
diff --git a/deps/rabbit/src/rabbit_global_counters.erl b/deps/rabbit/src/rabbit_global_counters.erl
index 2e7ae40f94..bdaf44c23b 100644
--- a/deps/rabbit/src/rabbit_global_counters.erl
+++ b/deps/rabbit/src/rabbit_global_counters.erl
@@ -12,65 +12,71 @@
init/0,
overview/0,
prometheus_format/0,
- messages_received/1,
+ messages_published/1,
messages_routed/1,
- messages_acknowledged/1,
- channel_get_ack/1,
- channel_get_empty/1,
- channel_get/1,
- channel_messages_delivered_ack/1,
- channel_messages_delivered/1,
- channel_messages_redelivered/1
+ messages_delivered_consume_ack/1,
+ messages_delivered_consume_autoack/1,
+ messages_delivered_get_ack/1,
+ messages_delivered_get_autoack/1,
+ messages_redelivered/1,
+ basic_get_empty/1,
+ messages_unroutable_dropped/1,
+ messages_unroutable_returned/1
]).
--define(MESSAGES_RECEIVED, 1).
+-define(MESSAGES_PUBLISHED, 1).
-define(MESSAGES_ROUTED, 2).
--define(MESSAGES_ACKNOWLEDGED, 3).
--define(CHANNEL_GET_ACK, 4).
--define(CHANNEL_GET_EMPTY, 5).
--define(CHANNEL_GET, 6).
--define(CHANNEL_MESSAGES_DELIVERED_ACK, 7).
--define(CHANNEL_MESSAGES_DELIVERED, 8).
--define(CHANNEL_MESSAGES_REDELIVERED, 9).
+-define(MESSAGES_DELIVERED_CONSUME_ACK, 3).
+-define(MESSAGES_DELIVERED_CONSUME_AUTOACK, 4).
+-define(MESSAGES_DELIVERED_GET_ACK, 5).
+-define(MESSAGES_DELIVERED_GET_AUTOACK, 6).
+-define(MESSAGES_REDELIVERED, 7).
+-define(BASIC_GET_EMPTY, 8).
+-define(MESSAGES_UNROUTABLE_DROPPED, 9).
+-define(MESSAGES_UNROUTABLE_RETURNED, 10).
-define(COUNTERS,
[
{
- messages_received_total, ?MESSAGES_RECEIVED, counter,
- "Total number of messages received from clients"
- },
- {
- messages_acknowledged_total, ?MESSAGES_ACKNOWLEDGED, counter,
- "Total number of message acknowledgements received from consumers"
+ messages_published_total, ?MESSAGES_PUBLISHED, counter,
+ "Total number of messages published to queues and streams"
},
{
messages_routed_total, ?MESSAGES_ROUTED, counter,
"Total number of messages routed to queues"
},
{
- channel_get_ack_total, ?CHANNEL_GET_ACK, counter,
- "Total number of messages fetched with basic.get in manual acknowledgement mode"
+ messages_delivered_consume_ack_total, ?MESSAGES_DELIVERED_CONSUME_ACK, counter,
+ "Total number of messages consumed using basic.consume with manual acknowledgment"
},
{
- channel_get_empty_total, ?CHANNEL_GET_EMPTY, counter,
- "Total number of times basic.get operations fetched no message"
+ messages_delivered_consume_autoack_total, ?MESSAGES_DELIVERED_CONSUME_AUTOACK, counter,
+ "Total number of messages consumed using basic.consume with automatic acknowledgment"
+ },
+ {
+ messages_delivered_get_ack_total, ?MESSAGES_DELIVERED_GET_ACK, counter,
+ "Total number of messages consumed using basic.get with manual acknowledgment"
},
{
- channel_get_total, ?CHANNEL_GET, counter,
- "Total number of messages fetched with basic.get in automatic acknowledgement mode"
+ messages_delivered_get_autoack_total, ?MESSAGES_DELIVERED_GET_AUTOACK, counter,
+ "Total number of messages consumed using basic.get with automatic acknowledgment"
},
{
- channel_messages_delivered_ack_total, ?CHANNEL_MESSAGES_DELIVERED_ACK, counter,
- "Total number of messages delivered to consumers in manual acknowledgement mode"
+ messages_redelivered_total, ?MESSAGES_REDELIVERED, counter,
+ "Total number of messages redelivered to consumers"
},
{
- channel_messages_delivered_total, ?CHANNEL_MESSAGES_DELIVERED, counter,
- "Total number of messages delivered to consumers in automatic acknowledgement mode"
+ basic_get_empty_total, ?BASIC_GET_EMPTY, counter,
+ "Total number of times basic.get operations fetched no message"
},
{
- channel_messages_redelivered_total, ?CHANNEL_MESSAGES_REDELIVERED, counter,
- "Total number of messages redelivered to consumers"
+ messages_unroutable_dropped_total, ?MESSAGES_UNROUTABLE_DROPPED, counter,
+ "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"
+ },
+ {
+ messages_unroutable_returned_total, ?MESSAGES_UNROUTABLE_RETURNED, counter,
+ "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"
}
]).
@@ -100,33 +106,105 @@ prometheus_format() ->
[{Key, counters:get(Counters, Index), Type, Help} ||
{Key, Index, Type, Help} <- ?COUNTERS].
-messages_received(Num) ->
- counters:add(persistent_term:get(?MODULE), ?MESSAGES_RECEIVED, Num).
+% TODO - these are received by queues, not from clients (doesn't account for unroutable)
+messages_published(Num) ->
+ counters:add(persistent_term:get(?MODULE), ?MESSAGES_PUBLISHED, Num).
% formerly known as queue_messages_published_total
messages_routed(Num) ->
counters:add(persistent_term:get(?MODULE), ?MESSAGES_ROUTED, Num).
-%% doesn't work when consuming messages through the API
-messages_acknowledged(Num) ->
- counters:add(persistent_term:get(?MODULE), ?MESSAGES_ACKNOWLEDGED, Num).
-
-channel_get_ack(Num) ->
- counters:add(persistent_term:get(?MODULE), ?CHANNEL_GET_ACK, Num).
-
-channel_get_empty(Num) ->
- counters:add(persistent_term:get(?MODULE), ?CHANNEL_GET_EMPTY, Num).
-
-% TODO these are auto-ack only
-channel_get(Num) ->
- counters:add(persistent_term:get(?MODULE), ?CHANNEL_GET, Num).
-
-channel_messages_delivered_ack(Num) ->
- counters:add(persistent_term:get(?MODULE), ?CHANNEL_MESSAGES_DELIVERED_ACK, Num).
-
-% TODO these are auto-ack only but the name doesn't reflect that
-channel_messages_delivered(Num) ->
- counters:add(persistent_term:get(?MODULE), ?CHANNEL_MESSAGES_DELIVERED, Num).
-
-channel_messages_redelivered(Num) ->
- counters:add(persistent_term:get(?MODULE), ?CHANNEL_MESSAGES_REDELIVERED, Num).
+messages_delivered_consume_ack(Num) ->
+ counters:add(persistent_term:get(?MODULE), ?MESSAGES_DELIVERED_CONSUME_ACK, Num).
+
+messages_delivered_consume_autoack(Num) ->
+ counters:add(persistent_term:get(?MODULE), ?MESSAGES_DELIVERED_CONSUME_AUTOACK, Num).
+
+messages_delivered_get_ack(Num) ->
+ counters:add(persistent_term:get(?MODULE), ?MESSAGES_DELIVERED_GET_ACK, Num).
+
+messages_delivered_get_autoack(Num) ->
+ counters:add(persistent_term:get(?MODULE), ?MESSAGES_DELIVERED_GET_AUTOACK, Num).
+
+% not implemented yet
+messages_redelivered(Num) ->
+ counters:add(persistent_term:get(?MODULE), ?MESSAGES_REDELIVERED, Num).
+
+basic_get_empty(Num) ->
+ counters:add(persistent_term:get(?MODULE), ?BASIC_GET_EMPTY, Num).
+
+% implemented in rabbit_core_metrics (it doesn't reach a queue)
+messages_unroutable_returned(Num) ->
+ counters:add(persistent_term:get(?MODULE), ?MESSAGES_UNROUTABLE_RETURNED, Num).
+
+% implemented in rabbit_core_metrics (it doesn't reach a queue)
+messages_unroutable_dropped(Num) ->
+ counters:add(persistent_term:get(?MODULE), ?MESSAGES_UNROUTABLE_DROPPED, Num).
+
+% TODO
+% channel_messages_redelivered_total "Total number of messages redelivered to consumers"
+%
+% connection_incoming_bytes_total "Total number of bytes received on a connection"
+% connection_outgoing_bytes_total "Total number of bytes sent on a connection"
+% connection_process_reductions_total "Total number of connection process reductions"
+% connection_incoming_packets_total "Total number of packets received on a connection"
+% connection_outgoing_packets_total "Total number of packets sent on a connection"
+%
+% io_read_ops_total "Total number of I/O read operations"
+% io_read_bytes_total "Total number of I/O bytes read"
+% io_write_ops_total "Total number of I/O write operations"
+% io_write_bytes_total "Total number of I/O bytes written"
+% io_sync_ops_total "Total number of I/O sync operations"
+% io_seek_ops_total "Total number of I/O seek operations"
+% io_open_attempt_ops_total "Total number of file open attempts"
+% io_reopen_ops_total "Total number of times files have been reopened"
+%
+% schema_db_ram_tx_total "Total number of Schema DB memory transactions"
+% schema_db_disk_tx_total "Total number of Schema DB disk transactions"
+% msg_store_read_total "Total number of Message Store read operations"
+% msg_store_write_total "Total number of Message Store write operations"
+% queue_index_read_ops_total "Total number of Queue Index read operations"
+% queue_index_write_ops_total "Total number of Queue Index write operations"
+% queue_index_journal_write_ops_total "Total number of Queue Index Journal write operations"
+% io_read_time_seconds_total "Total I/O read time"
+% io_write_time_seconds_total "Total I/O write time"
+% io_sync_time_seconds_total "Total I/O sync time"
+% io_seek_time_seconds_total "Total I/O seek time"
+% io_open_attempt_time_seconds_total "Total file open attempts time"
+% raft_term_total "Current Raft term number"
+% queue_disk_reads_total "Total number of times queue read messages from disk"
+% queue_disk_writes_total "Total number of times queue wrote messages to disk"
+
+% DONE
+% channel_messages_published_total "Total number of messages published into an exchange on a channel"
+% channel_messages_confirmed_total "Total number of messages published into an exchange and confirmed on the channel"
+% channel_messages_unroutable_returned_total "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"
+% channel_messages_unroutable_dropped_total "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"
+% channel_get_empty_total "Total number of times basic.get operations fetched no message"
+% channel_get_ack_total "Total number of messages fetched with basic.get in manual acknowledgement mode"
+% channel_get_total "Total number of messages fetched with basic.get in automatic acknowledgement mode"
+% channel_messages_delivered_ack_total "Total number of messages delivered to consumers in manual acknowledgement mode"
+% channel_messages_delivered_total "Total number of messages delivered to consumers in automatic acknowledgement mode"
+% queue_messages_published_total "Total number of messages published to queues"
+
+% IGNORED (IS THIS USEFUL?)
+% channel_process_reductions_total "Total number of channel process reductions"
+% queue_process_reductions_total "Total number of queue process reductions"
+
+% NOT NECESSARY (DON'T GO TO ZERO)
+% erlang_gc_runs_total "Total number of Erlang garbage collector runs"
+% erlang_gc_reclaimed_bytes_total "Total number of bytes of memory reclaimed by Erlang garbage collector"
+% erlang_scheduler_context_switches_total "Total number of Erlang scheduler context switches"
+% connections_opened_total "Total number of connections opened"
+% connections_closed_total "Total number of connections closed or terminated"
+% channels_opened_total "Total number of channels opened"
+% channels_closed_total "Total number of channels closed"
+% queues_declared_total "Total number of queues declared"
+% queues_created_total "Total number of queues created"
+% queues_deleted_total "Total number of queues deleted"
+% auth_attempts_total "Total number of authorization attempts"
+% auth_attempts_succeeded_total "Total number of successful authentication attempts"
+% auth_attempts_failed_total "Total number of failed authentication attempts"
+% auth_attempts_detailed_total "Total number of authorization attempts with source info"
+% auth_attempts_detailed_succeeded_total "Total number of successful authorization attempts with source info"
+% auth_attempts_detailed_failed_total "Total number of failed authorization attempts with source info" \ No newline at end of file
diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl
index 39b1b2cdf7..7877d55226 100644
--- a/deps/rabbit/src/rabbit_queue_type.erl
+++ b/deps/rabbit/src/rabbit_queue_type.erl
@@ -1,6 +1,6 @@
-module(rabbit_queue_type).
-include("amqqueue.hrl").
--include_lib("rabbit_common/include/resource.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-export([
init/0,
@@ -441,38 +441,55 @@ module(QRef, Ctxs) ->
stateless | state()) ->
{ok, state(), actions()}.
deliver(Qs, Delivery, stateless) ->
- rabbit_global_counters:messages_received(1),
- rabbit_global_counters:messages_routed(length(Qs)),
- _ = lists:map(fun(Q) ->
- Mod = amqqueue:get_type(Q),
- _ = Mod:deliver([{Q, stateless}], Delivery)
- end, Qs),
+ case Qs of
+ [] ->
+ case Delivery#delivery.mandatory of
+ false -> rabbit_global_counters:messages_unroutable_dropped(1);
+ true -> rabbit_global_counters:messages_unroutable_returned(1)
+ end;
+ _ ->
+ rabbit_global_counters:messages_routed(1),
+ _ = lists:map(fun(Q) ->
+ Mod = amqqueue:get_type(Q),
+ _ = Mod:deliver([{Q, stateless}], Delivery)
+ end, Qs),
+ rabbit_global_counters:messages_published(length(Qs))
+ end,
{ok, stateless, []};
deliver(Qs, Delivery, #?STATE{} = State0) ->
- rabbit_global_counters:messages_received(1),
- rabbit_global_counters:messages_routed(length(Qs)),
- %% TODO: optimise single queue case?
- %% sort by queue type - then dispatch each group
- ByType = lists:foldl(
- fun (Q, Acc) ->
- T = amqqueue:get_type(Q),
- Ctx = get_ctx(Q, State0),
- maps:update_with(
- T, fun (A) ->
- [{Q, Ctx#ctx.state} | A]
- end, [{Q, Ctx#ctx.state}], Acc)
- end, #{}, Qs),
- %%% dispatch each group to queue type interface?
- {Xs, Actions} = maps:fold(fun(Mod, QTSs, {X0, A0}) ->
- {X, A} = Mod:deliver(QTSs, Delivery),
- {X0 ++ X, A0 ++ A}
- end, {[], []}, ByType),
- State = lists:foldl(
- fun({Q, S}, Acc) ->
- Ctx = get_ctx_with(Q, Acc, S),
- set_ctx(qref(Q), Ctx#ctx{state = S}, Acc)
- end, State0, Xs),
- return_ok(State, Actions).
+ case Qs of
+ [] ->
+ case Delivery#delivery.mandatory of
+ false -> rabbit_global_counters:messages_unroutable_dropped(1);
+ true -> rabbit_global_counters:messages_unroutable_returned(1)
+ end,
+ return_ok(State0, []);
+ _ ->
+ rabbit_global_counters:messages_routed(1),
+ %% TODO: optimise single queue case?
+ %% sort by queue type - then dispatch each group
+ ByType = lists:foldl(
+ fun (Q, Acc) ->
+ T = amqqueue:get_type(Q),
+ Ctx = get_ctx(Q, State0),
+ maps:update_with(
+ T, fun (A) ->
+ [{Q, Ctx#ctx.state} | A]
+ end, [{Q, Ctx#ctx.state}], Acc)
+ end, #{}, Qs),
+ %%% dispatch each group to queue type interface?
+ {Xs, Actions} = maps:fold(fun(Mod, QTSs, {X0, A0}) ->
+ {X, A} = Mod:deliver(QTSs, Delivery),
+ rabbit_global_counters:messages_published(length(Qs)),
+ {X0 ++ X, A0 ++ A}
+ end, {[], []}, ByType),
+ State = lists:foldl(
+ fun({Q, S}, Acc) ->
+ Ctx = get_ctx_with(Q, Acc, S),
+ set_ctx(qref(Q), Ctx#ctx{state = S}, Acc)
+ end, State0, Xs),
+ return_ok(State, Actions)
+ end.
-spec settle(queue_ref(), settle_op(), rabbit_types:ctag(),
@@ -481,7 +498,6 @@ deliver(Qs, Delivery, #?STATE{} = State0) ->
{'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}.
settle(QRef, Op, CTag, MsgIds, Ctxs)
when ?QREF(QRef) ->
- rabbit_global_counters:messages_acknowledged(length(MsgIds)),
case get_ctx(QRef, Ctxs, undefined) of
undefined ->
%% if we receive a settlement and there is no queue state it means
@@ -515,8 +531,13 @@ dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) ->
Mod = amqqueue:get_type(Q),
case Mod:dequeue(NoAck, LimiterPid, CTag, State0) of
{ok, Num, Msg, State} ->
+ case NoAck of
+ false -> rabbit_global_counters:messages_delivered_get_ack(1);
+ true -> rabbit_global_counters:messages_delivered_get_autoack(1)
+ end,
{ok, Num, Msg, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
{empty, State} ->
+ rabbit_global_counters:basic_get_empty(1),
{empty, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
{error, _} = Err ->
Err;
diff --git a/deps/rabbit/test/queue_type_SUITE.erl b/deps/rabbit/test/queue_type_SUITE.erl
index aed5ad4ccb..c9779a851b 100644
--- a/deps/rabbit/test/queue_type_SUITE.erl
+++ b/deps/rabbit/test/queue_type_SUITE.erl
@@ -170,6 +170,47 @@ smoke(Config) ->
end,
%% get and ack
basic_ack(Ch, basic_get(Ch, QName)),
+
+ %% published a dropped message
+ publish(Ch, <<"not_found">>, <<"dropped">>),
+
+ %% publish a returned message
+ publish(Ch, <<"amq.direct">>, <<"not_found">>, <<"returned">>, mandatory),
+
+ % publish a message routed to multiple queues (there are separate message counters)
+ ?assertEqual({'queue.declare_ok', <<"fanout1">>, 0, 0},
+ declare(Ch, <<"fanout1">>, [{<<"x-queue-type">>, longstr,
+ ?config(queue_type, Config)}])),
+
+ ?assertEqual({'queue.declare_ok', <<"fanout2">>, 0, 0},
+ declare(Ch, <<"fanout2">>, [{<<"x-queue-type">>, longstr,
+ ?config(queue_type, Config)}])),
+
+ bind(Ch, <<"amq.fanout">>, <<"fanout1">>),
+ bind(Ch, <<"amq.fanout">>, <<"fanout2">>),
+ publish(Ch, <<"amq.fanout">>, <<"foo">>, <<"fanout">>),
+
+ %% publish and get with manual ack
+ % publish(Ch, QName, <<"autoack">>),
+ % basic_get_autoack(Ch, QName),
+
+ close_channel(Ch),
+
+
+ ?assertEqual(get_global_counters(Config),
+ #{
+ basic_get_empty_total => 2,
+ messages_delivered_consume_ack_total => 0,
+ messages_delivered_consume_autoack_total => 0,
+ messages_delivered_get_ack_total => 2,
+ messages_delivered_get_autoack_total => 0,
+ messages_published_total => 5,
+ messages_redelivered_total => 0,
+ messages_routed_total => 4,
+ messages_unroutable_dropped_total => 1,
+ messages_unroutable_returned_total => 1
+ }
+ ),
ok.
ack_after_queue_delete(Config) ->
@@ -224,21 +265,43 @@ delete(Ch, Q) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
publish(Ch, Queue, Msg) ->
+ publish(Ch, <<>>, Queue, Msg).
+
+publish(Ch, Exchange, RoutingKey, Msg) ->
ok = amqp_channel:cast(Ch,
- #'basic.publish'{routing_key = Queue},
+ #'basic.publish'{exchange = Exchange, routing_key = RoutingKey},
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
payload = Msg}).
+publish(Ch, Exchange, RoutingKey, Msg, mandatory) ->
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{exchange = Exchange, routing_key = RoutingKey, mandatory = true},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = Msg}),
+ receive
+ #'basic.cancel_ok'{} ->
+ ok;
+ #'basic.return'{} ->
+ ok
+ after 5000 ->
+ exit(basic_publish_timeout)
+ end.
+
basic_get(Ch, Queue) ->
{GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue,
no_ack = false}),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{}}, Reply),
GetOk#'basic.get_ok'.delivery_tag.
+basic_get_autoack(Ch, Queue) ->
+ ?assertMatch(#'basic.get_empty'{},
+ amqp_channel:call(Ch, #'basic.get'{queue = Queue,
+ no_ack = true})).
+
basic_get_empty(Ch, Queue) ->
?assertMatch(#'basic.get_empty'{},
- amqp_channel:call(Ch, #'basic.get'{queue = Queue,
- no_ack = false})).
+ amqp_channel:call(Ch, #'basic.get'{queue = Queue,
+ no_ack = false})).
subscribe(Ch, Queue, CTag) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
@@ -265,6 +328,14 @@ basic_nack(Ch, DTag) ->
requeue = true,
multiple = false}).
+bind(Ch, Exchange, Queue) ->
+ Binding = #'queue.bind'{queue = Queue,
+ exchange = Exchange},
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, Binding).
+
+close_channel(Ch) ->
+ amqp_channel:close(Ch).
+
flush() ->
receive
Any ->
@@ -273,3 +344,6 @@ flush() ->
after 0 ->
ok
end.
+
+get_global_counters(Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []).
diff --git a/deps/rabbit_common/src/rabbit_core_metrics.erl b/deps/rabbit_common/src/rabbit_core_metrics.erl
index 6534242e1f..6b9dd0a9ae 100644
--- a/deps/rabbit_common/src/rabbit_core_metrics.erl
+++ b/deps/rabbit_common/src/rabbit_core_metrics.erl
@@ -193,32 +193,28 @@ channel_stats(queue_stats, get, Id, Value) ->
channel_stats(queue_stats, get_no_ack, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
- rabbit_global_counters:channel_get(Value),
ok;
channel_stats(queue_stats, deliver, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
- rabbit_global_counters:channel_messages_delivered_ack(Value),
+ rabbit_global_counters:messages_delivered_consume_ack(Value),
ok;
channel_stats(queue_stats, deliver_no_ack, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
- rabbit_global_counters:channel_messages_delivered(Value),
+ rabbit_global_counters:messages_delivered_consume_autoack(Value),
ok;
channel_stats(queue_stats, redeliver, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
- rabbit_global_counters:channel_messages_redelivered(Value),
ok;
channel_stats(queue_stats, ack, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
- rabbit_global_counters:channel_get_ack(Value),
ok;
channel_stats(queue_stats, get_empty, Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
- rabbit_global_counters:channel_get_empty(Value),
ok.
delete(Table, Key) ->