summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_core_metrics_gc.erl2
-rw-r--r--src/rabbit_fifo.erl18
-rw-r--r--src/rabbit_quorum_queue.erl29
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl2
5 files changed, 38 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 38454d79e0..5782bc6e23 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1296,17 +1296,18 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
QName = qname(State1),
AckRequired = not NoAck,
TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers),
- ConsumerIsActive = case {SingleActiveConsumerOn, State1#q.active_consumer} of
- {true, TheConsumer} ->
- true;
- {true, _} ->
- false;
- {false, _} ->
- true
- end,
+ {ConsumerIsActive, ActivityStatus} =
+ case {SingleActiveConsumerOn, State1#q.active_consumer} of
+ {true, TheConsumer} ->
+ {true, single_active};
+ {true, _} ->
+ {false, waiting};
+ {false, _} ->
+ {true, up}
+ end,
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
- PrefetchCount, ConsumerIsActive, Args),
+ PrefetchCount, ConsumerIsActive, ActivityStatus, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, PrefetchCount,
Args, none, ActingUser),
@@ -1427,7 +1428,7 @@ maybe_notify_consumer_updated(#q{single_active_consumer_on = true} = State, _Pre
{Tag, Ack, Prefetch, Args} = rabbit_queue_consumers:get_infos(Consumer),
rabbit_core_metrics:consumer_updated(
ChPid, Tag, false, Ack, qname(State),
- Prefetch, true, Args
+ Prefetch, true, single_active, Args
),
ok;
_ ->
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index 9c516bda3d..d4c065e64f 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -160,7 +160,7 @@ gc_process_and_entity(Table, GbSet) ->
({{Pid, Id} = Key, _, _, _, _}, none)
when Table == channel_exchange_metrics ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
- ({{Id, Pid, _} = Key, _, _, _, _, _}, none)
+ ({{Id, Pid, _} = Key, _, _, _, _, _, _}, none)
when Table == consumer_created ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index b7a949313c..ad02205e02 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -464,7 +464,7 @@ apply(_, {down, ConsumerPid, noconnection},
%% TODO: need to increment credit here
%% with the size of the Checked map
Credit = increase_credit(C, maps:size(Checked0)),
- Eff1 = ConsumerUpdateActiveFun(St, K, C, false, Eff),
+ Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff),
{maps:put(K, C#consumer{suspected_down = true,
credit = Credit,
checked_out = #{}}, Co),
@@ -531,7 +531,7 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
{Cons1, SQ, Effects} =
maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
when node(P) =:= Node ->
- EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, EAcc),
+ EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc),
update_or_remove_sub(
ConsumerId, C#consumer{suspected_down = false},
CAcc, SQAcc, EAcc1);
@@ -548,11 +548,11 @@ apply(_, #update_config{config = Conf}, State) ->
{update_config(Conf, State), ok}.
consumer_active_flag_update_function(#state{consumer_strategy = default}) ->
- fun(State, ConsumerId, Consumer, Active, Effects) ->
- consumer_update_active_effects(State, ConsumerId, Consumer, Active, Effects)
+ fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
+ consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects)
end;
consumer_active_flag_update_function(#state{consumer_strategy = single_active}) ->
- fun(_, _, _, _, Effects) ->
+ fun(_, _, _, _, _, Effects) ->
Effects
end.
@@ -869,7 +869,8 @@ cancel_consumer(ConsumerId,
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
Effects2 = consumer_update_active_effects(State, NewActiveConsumerId,
- NewActiveConsumer, true, Effects1),
+ NewActiveConsumer, true,
+ single_active, Effects1),
{State, Effects2};
error ->
% The cancelled consumer is not the active one
@@ -883,7 +884,8 @@ cancel_consumer(ConsumerId,
end.
consumer_update_active_effects(#state{queue_resource = QName },
- ConsumerId, #consumer{meta = Meta}, Active,
+ ConsumerId, #consumer{meta = Meta},
+ Active, ActivityStatus,
Effects) ->
Ack = maps:get(ack, Meta, undefined),
Prefetch = maps:get(prefetch, Meta, undefined),
@@ -891,7 +893,7 @@ consumer_update_active_effects(#state{queue_resource = QName },
[{mod_call,
rabbit_quorum_queue,
update_consumer_handler,
- [QName, ConsumerId, false, Ack, Prefetch, Active, Args]}
+ [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
| Effects].
cancel_consumer0(ConsumerId,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index dc616899f4..6c8c99516d 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -26,7 +26,7 @@
-export([dead_letter_publish/4]).
-export([queue_name/1]).
-export([cluster_state/1, status/2]).
--export([update_consumer_handler/7, update_consumer/8]).
+-export([update_consumer_handler/8, update_consumer/9]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
-export([become_leader/2, update_metrics/2]).
-export([rpc_delete_metrics/1]).
@@ -164,13 +164,13 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
_ -> false
end.
-update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, Args) ->
+update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
- [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args]).
+ [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args]).
-update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args) ->
+update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired,
- QName, Prefetch, Active, Args).
+ QName, Prefetch, Active, ActivityStatus, Args).
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]).
@@ -440,19 +440,20 @@ basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum} = Q, NoAck, ChP
fun rabbit_fifo:query_single_active_consumer/1),
SingleActiveConsumerOn = single_active_consumer_on(Q),
- IsSingleActiveConsumer = case {SingleActiveConsumerOn, SacResult} of
- {false, _} ->
- true;
- {true, {value, {ConsumerTag, ChPid}}} ->
- true;
- _ ->
- false
- end,
+ {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
+ {false, _} ->
+ {true, up};
+ {true, {value, {ConsumerTag, ChPid}}} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end,
%% TODO: emit as rabbit_fifo effect
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, QName,
- ConsumerPrefetchCount, IsSingleActiveConsumer, Args),
+ ConsumerPrefetchCount, IsSingleActiveConsumer,
+ ActivityStatus, Args),
{ok, QState}.
basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) ->
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index 0518c05dbd..7ae43aa7e1 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -300,7 +300,7 @@ consumer_metrics(Config) ->
CTag = <<"tag">>,
rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics,
consumer_created, [DeadPid, CTag, true, true,
- QName, 1, false, []]),
+ QName, 1, false, waiting, []]),
Id = {QName, DeadPid, CTag},
[_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, [consumer_created, Id]),