diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-24 10:50:53 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-24 10:50:53 +0100 |
| commit | 9eaa79d5f80ec3025ce0dbbac5e81a60437dec7c (patch) | |
| tree | d0d41eca95019bcbf99e2446f272d0ee07c857be | |
| parent | 9f40da59d21e38c68659580338224d14c2a29337 (diff) | |
| download | rabbitmq-server-git-9eaa79d5f80ec3025ce0dbbac5e81a60437dec7c.tar.gz | |
Add consumer activity status to metrics
[#163298456]
Fixes #1838
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_core_metrics_gc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 29 | ||||
| -rw-r--r-- | test/rabbit_core_metrics_gc_SUITE.erl | 2 |
5 files changed, 38 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 38454d79e0..5782bc6e23 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1296,17 +1296,18 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, QName = qname(State1), AckRequired = not NoAck, TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers), - ConsumerIsActive = case {SingleActiveConsumerOn, State1#q.active_consumer} of - {true, TheConsumer} -> - true; - {true, _} -> - false; - {false, _} -> - true - end, + {ConsumerIsActive, ActivityStatus} = + case {SingleActiveConsumerOn, State1#q.active_consumer} of + {true, TheConsumer} -> + {true, single_active}; + {true, _} -> + {false, waiting}; + {false, _} -> + {true, up} + end, rabbit_core_metrics:consumer_created( ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, - PrefetchCount, ConsumerIsActive, Args), + PrefetchCount, ConsumerIsActive, ActivityStatus, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, PrefetchCount, Args, none, ActingUser), @@ -1427,7 +1428,7 @@ maybe_notify_consumer_updated(#q{single_active_consumer_on = true} = State, _Pre {Tag, Ack, Prefetch, Args} = rabbit_queue_consumers:get_infos(Consumer), rabbit_core_metrics:consumer_updated( ChPid, Tag, false, Ack, qname(State), - Prefetch, true, Args + Prefetch, true, single_active, Args ), ok; _ -> diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index 9c516bda3d..d4c065e64f 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -160,7 +160,7 @@ gc_process_and_entity(Table, GbSet) -> ({{Pid, Id} = Key, _, _, _, _}, none) when Table == channel_exchange_metrics -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); - ({{Id, Pid, _} = Key, _, _, _, _, _}, none) + ({{Id, Pid, _} = Key, _, _, _, _, _, _}, none) when Table == consumer_created -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); ({{{Pid, Id}, _} = Key, _, _, _, _}, none) -> diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index b7a949313c..ad02205e02 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -464,7 +464,7 @@ apply(_, {down, ConsumerPid, noconnection}, %% TODO: need to increment credit here %% with the size of the Checked map Credit = increase_credit(C, maps:size(Checked0)), - Eff1 = ConsumerUpdateActiveFun(St, K, C, false, Eff), + Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff), {maps:put(K, C#consumer{suspected_down = true, credit = Credit, checked_out = #{}}, Co), @@ -531,7 +531,7 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0, {Cons1, SQ, Effects} = maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) when node(P) =:= Node -> - EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, EAcc), + EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc), update_or_remove_sub( ConsumerId, C#consumer{suspected_down = false}, CAcc, SQAcc, EAcc1); @@ -548,11 +548,11 @@ apply(_, #update_config{config = Conf}, State) -> {update_config(Conf, State), ok}. consumer_active_flag_update_function(#state{consumer_strategy = default}) -> - fun(State, ConsumerId, Consumer, Active, Effects) -> - consumer_update_active_effects(State, ConsumerId, Consumer, Active, Effects) + fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> + consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) end; consumer_active_flag_update_function(#state{consumer_strategy = single_active}) -> - fun(_, _, _, _, Effects) -> + fun(_, _, _, _, _, Effects) -> Effects end. @@ -869,7 +869,8 @@ cancel_consumer(ConsumerId, service_queue = ServiceQueue1, waiting_consumers = RemainingWaitingConsumers}, Effects2 = consumer_update_active_effects(State, NewActiveConsumerId, - NewActiveConsumer, true, Effects1), + NewActiveConsumer, true, + single_active, Effects1), {State, Effects2}; error -> % The cancelled consumer is not the active one @@ -883,7 +884,8 @@ cancel_consumer(ConsumerId, end. consumer_update_active_effects(#state{queue_resource = QName }, - ConsumerId, #consumer{meta = Meta}, Active, + ConsumerId, #consumer{meta = Meta}, + Active, ActivityStatus, Effects) -> Ack = maps:get(ack, Meta, undefined), Prefetch = maps:get(prefetch, Meta, undefined), @@ -891,7 +893,7 @@ consumer_update_active_effects(#state{queue_resource = QName }, [{mod_call, rabbit_quorum_queue, update_consumer_handler, - [QName, ConsumerId, false, Ack, Prefetch, Active, Args]} + [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} | Effects]. cancel_consumer0(ConsumerId, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index dc616899f4..6c8c99516d 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -26,7 +26,7 @@ -export([dead_letter_publish/4]). -export([queue_name/1]). -export([cluster_state/1, status/2]). --export([update_consumer_handler/7, update_consumer/8]). +-export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). -export([become_leader/2, update_metrics/2]). -export([rpc_delete_metrics/1]). @@ -164,13 +164,13 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) -> _ -> false end. -update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, Args) -> +update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) -> local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer, - [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args]). + [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args]). -update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args) -> +update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) -> catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired, - QName, Prefetch, Active, Args). + QName, Prefetch, Active, ActivityStatus, Args). cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]). @@ -440,19 +440,20 @@ basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum} = Q, NoAck, ChP fun rabbit_fifo:query_single_active_consumer/1), SingleActiveConsumerOn = single_active_consumer_on(Q), - IsSingleActiveConsumer = case {SingleActiveConsumerOn, SacResult} of - {false, _} -> - true; - {true, {value, {ConsumerTag, ChPid}}} -> - true; - _ -> - false - end, + {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of + {false, _} -> + {true, up}; + {true, {value, {ConsumerTag, ChPid}}} -> + {true, single_active}; + _ -> + {false, waiting} + end, %% TODO: emit as rabbit_fifo effect rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, QName, - ConsumerPrefetchCount, IsSingleActiveConsumer, Args), + ConsumerPrefetchCount, IsSingleActiveConsumer, + ActivityStatus, Args), {ok, QState}. basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) -> diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl index 0518c05dbd..7ae43aa7e1 100644 --- a/test/rabbit_core_metrics_gc_SUITE.erl +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -300,7 +300,7 @@ consumer_metrics(Config) -> CTag = <<"tag">>, rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, consumer_created, [DeadPid, CTag, true, true, - QName, 1, false, []]), + QName, 1, false, waiting, []]), Id = {QName, DeadPid, CTag}, [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, [consumer_created, Id]), |
