summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-15 15:12:09 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-15 15:12:09 +0100
commitbc05865808fe0bb965f1301dc7b0da272dd03426 (patch)
tree371153a74f23e290470a6786f66001e3b9da914c
parentfce1d722bbf3fa664789e65c169726b1445857f9 (diff)
downloadrabbitmq-server-git-bc05865808fe0bb965f1301dc7b0da272dd03426.tar.gz
Integrate single active consumer with metrics
* Add single consumer tag and channel PID to metrics infos in quorum queues * Include waiting consumers in consumer count in quorum queues * Add single active consumer flag in consumer metrics * Update metrics for newly-promoted single active consumer [#163089472] References rabbitmq/rabbitmq-management#649
-rw-r--r--src/rabbit_amqqueue_process.erl36
-rw-r--r--src/rabbit_core_metrics_gc.erl2
-rw-r--r--src/rabbit_fifo.erl41
-rw-r--r--src/rabbit_queue_consumers.erl10
-rw-r--r--src/rabbit_quorum_queue.erl66
5 files changed, 122 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b1cac7d3a1..84428dd572 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -857,6 +857,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
Holder1 = new_single_active_consumer_after_channel_down(DownPid, Holder, SingleActiveConsumerOn, Consumers1),
State2 = State1#q{consumers = Consumers1,
active_consumer = Holder1},
+ maybe_notify_consumer_updated(State2, Holder, Holder1),
notify_decorators(State2),
case should_auto_delete(State2) of
true ->
@@ -874,11 +875,12 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = true, Consumers) ->
case CurrentSingleActiveConsumer of
{DownChPid, _} ->
+ % the single active consumer is on the down channel, we have to replace it
case rabbit_queue_consumers:get_consumer(Consumers) of
undefined -> none;
Consumer -> Consumer
end;
- false ->
+ _ ->
CurrentSingleActiveConsumer
end;
new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = false, _Consumers) ->
@@ -1268,7 +1270,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
{state, State#q{consumers = Consumers1,
has_had_consumers = true,
- active_consumer = NewConsumer}};
+ active_consumer = NewConsumer}};
_ ->
{state, State#q{consumers = Consumers1,
has_had_consumers = true}}
@@ -1289,7 +1291,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
end,
{state, State#q{consumers = Consumers1,
has_had_consumers = true,
- active_consumer = ExclusiveConsumer}}
+ active_consumer = ExclusiveConsumer}}
end
end,
case ConsumerRegistration of
@@ -1299,9 +1301,16 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok = maybe_send_reply(ChPid, OkMsg),
QName = qname(State1),
AckRequired = not NoAck,
+ TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers),
+ IsSingleActiveConsumer = case {SingleActiveConsumerOn, State1#q.active_consumer} of
+ {true, TheConsumer} ->
+ true;
+ _ ->
+ false
+ end,
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
- PrefetchCount, Args),
+ PrefetchCount, IsSingleActiveConsumer, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, PrefetchCount,
Args, none, ActingUser),
@@ -1323,6 +1332,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From,
),
State1 = State#q{consumers = Consumers1,
active_consumer = Holder1},
+ maybe_notify_consumer_updated(State1, Holder, Holder1),
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser),
notify_decorators(State1),
case should_auto_delete(State1) of
@@ -1410,6 +1420,24 @@ new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleA
_ -> CurrentSingleActiveConsumer
end.
+maybe_notify_consumer_updated(#q{single_active_consumer_on = false}, _, _) ->
+ ok;
+maybe_notify_consumer_updated(#q{single_active_consumer_on = true}, SingleActiveConsumer, SingleActiveConsumer) ->
+ % the single active consumer didn't change, nothing to do
+ ok;
+maybe_notify_consumer_updated(#q{single_active_consumer_on = true} = State, _PreviousConsumer, NewConsumer) ->
+ case NewConsumer of
+ {ChPid, Consumer} ->
+ {Tag, Ack, Prefetch, Args} = rabbit_queue_consumers:get_infos(Consumer),
+ rabbit_core_metrics:consumer_updated(
+ ChPid, Tag, false, Ack, qname(State),
+ Prefetch, true, Args
+ ),
+ ok;
+ _ ->
+ ok
+ end.
+
handle_cast(init, State) ->
try
init_it({no_barrier, non_clean_shutdown}, none, State)
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index c8b3ad10b8..9c516bda3d 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -160,7 +160,7 @@ gc_process_and_entity(Table, GbSet) ->
({{Pid, Id} = Key, _, _, _, _}, none)
when Table == channel_exchange_metrics ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
- ({{Id, Pid, _} = Key, _, _, _, _}, none)
+ ({{Id, Pid, _} = Key, _, _, _, _, _}, none)
when Table == consumer_created ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 4fe4d954b9..b7169bf924 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -43,6 +43,7 @@
query_consumer_count/1,
query_consumers/1,
query_stat/1,
+ query_single_active_consumer/1,
usage/1,
zero/1,
@@ -622,14 +623,13 @@ tick(_Ts, #state{name = Name,
queue_resource = QName,
messages = Messages,
ra_indexes = Indexes,
- consumers = Cons,
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {Name,
maps:size(Messages), % Ready
num_checked_out(State), % checked out
rabbit_fifo_index:size(Indexes), %% Total
- maps:size(Cons), % Consumers
+ query_consumer_count(State), % Consumers
EnqueueBytes,
CheckoutBytes},
[{mod_call, rabbit_quorum_queue,
@@ -723,6 +723,17 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume
end, #{}, WaitingConsumers),
maps:merge(FromConsumers, FromWaitingConsumers).
+query_single_active_consumer(#state{consumer_strategy = single_active, consumers = Consumers}) ->
+ case maps:size(Consumers) of
+ 1 ->
+ {value, lists:nth(1, maps:keys(Consumers))};
+ _
+ ->
+ {error, illegal_size}
+ end ;
+query_single_active_consumer(_) ->
+ disabled.
+
query_stat(#state{messages = M,
consumers = Consumers}) ->
{maps:size(M), maps:size(Consumers)}.
@@ -796,7 +807,8 @@ cancel_consumer(ConsumerId,
State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer},
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
- {Effects, State1};
+ Effects1 = consumer_promoted_to_single_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, Effects),
+ {Effects1, State1};
error ->
% The cancelled consumer is not the active one
% Just remove it from idle_consumers
@@ -806,6 +818,13 @@ cancel_consumer(ConsumerId,
{Effects, State0#state{waiting_consumers = WaitingConsumers1}}
end.
+consumer_promoted_to_single_active_effects(#state{consumer_strategy = single_active,
+ queue_resource = QName },
+ ConsumerId, #consumer{meta = Meta}, Effects) ->
+ [{mod_call, rabbit_quorum_queue, update_consumer_handler,
+ [QName, ConsumerId, false, maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined), true, maps:get(args, Meta, [])]} | Effects].
+
cancel_consumer0(ConsumerId,
{Effects0, #state{consumers = C0} = S0}) ->
case maps:take(ConsumerId, C0) of
@@ -2075,8 +2094,8 @@ single_active_consumer_test() ->
% the new active consumer is no longer in the waiting list
?assertEqual(1, length(State3#state.waiting_consumers)),
?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)),
- % there are some effects to unregister the consumer
- ?assertEqual(1, length(Effects2)),
+ % there are some effects to unregister the consumer and to update the new active one (metrics)
+ ?assertEqual(2, length(Effects2)),
% cancelling the active consumer
{State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
@@ -2085,8 +2104,8 @@ single_active_consumer_test() ->
?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
% the waiting consumer list is now empty
?assertEqual(0, length(State4#state.waiting_consumers)),
- % there are some effects to unregister the consumer
- ?assertEqual(1, length(Effects3)),
+ % there are some effects to unregister the consumer and to update the new active one (metrics)
+ ?assertEqual(2, length(Effects3)),
% cancelling the last consumer
{State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
@@ -2130,8 +2149,8 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
?assertEqual(1, map_size(State2#state.consumers)),
% there are still waiting consumers
?assertEqual(2, length(State2#state.waiting_consumers)),
- % the effect to unregister the consumer is there
- ?assertEqual(1, length(Effects)),
+ % effects to unregister the consumer and to update the new active one (metrics) are there
+ ?assertEqual(2, length(Effects)),
% the channel of the active consumer and a waiting consumer goes down
{State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2),
@@ -2139,8 +2158,8 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
?assertEqual(1, map_size(State3#state.consumers)),
% no more waiting consumer
?assertEqual(0, length(State3#state.waiting_consumers)),
- % effects to cancel both consumers of this channel
- ?assertEqual(2, length(Effects2)),
+ % effects to cancel both consumers of this channel + effect to update the new active one (metrics)
+ ?assertEqual(3, length(Effects2)),
% the last channel goes down
{State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3),
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index e743fbce18..dd086c1af6 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -22,7 +22,7 @@
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
credit/6, utilisation/1, is_same/3, get_consumer/1, get/3,
- consumer_tag/1]).
+ consumer_tag/1, get_infos/1]).
%%----------------------------------------------------------------------------
@@ -100,7 +100,9 @@
-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
state()) -> 'unchanged' | {'unblocked', state()}.
-spec utilisation(state()) -> ratio().
+-spec get(ch(), rabbit_types:ctag(), state()) -> undefined | consumer().
-spec consumer_tag(consumer()) -> rabbit_types:ctag().
+-spec get_infos(consumer()) -> term().
%%----------------------------------------------------------------------------
@@ -411,9 +413,15 @@ get(ChPid, ConsumerTag, #state{consumers = Consumers}) ->
{{value, Consumer, _Priority}, _Tail} -> Consumer
end.
+get_infos(Consumer) ->
+ {Consumer#consumer.tag,Consumer#consumer.ack_required,
+ Consumer#consumer.prefetch, Consumer#consumer.args}.
+
consumer_tag(#consumer{tag = CTag}) ->
CTag.
+
+
%%----------------------------------------------------------------------------
parse_credit_args(Default, Args) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 9dc9b7ab85..0a3a54b052 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -26,6 +26,7 @@
-export([dead_letter_publish/4]).
-export([queue_name/1]).
-export([cluster_state/1, status/2]).
+-export([update_consumer_handler/7, update_consumer/8]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
-export([become_leader/2, update_metrics/2]).
-export([rpc_delete_metrics/1]).
@@ -162,17 +163,16 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
_ -> false
end.
+update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, SingleActive, Args) ->
+ local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
+ [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args]).
+
+update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args) ->
+ catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired,
+ QName, Prefetch, SingleActive, Args).
+
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
- Node = node(ChPid),
- case Node == node() of
- true -> cancel_consumer(QName, ChPid, ConsumerTag);
- false ->
- %% this could potentially block for a while if the node is
- %% in disconnected state or tcp buffers are full
- rpc:cast(Node, rabbit_quorum_queue,
- cancel_consumer,
- [QName, ChPid, ConsumerTag])
- end.
+ local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]).
cancel_consumer(QName, ChPid, ConsumerTag) ->
catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
@@ -182,6 +182,17 @@ cancel_consumer(QName, ChPid, ConsumerTag) ->
{queue, QName},
{user_who_performed_action, ?INTERNAL_USER}]).
+local_or_remote_handler(ChPid, Module, Function, Args) ->
+ Node = node(ChPid),
+ case Node == node() of
+ true ->
+ erlang:apply(Module, Function, Args);
+ false ->
+ %% this could potentially block for a while if the node is
+ %% in disconnected state or tcp buffers are full
+ rpc:cast(Node, Module, Function, Args)
+ end.
+
become_leader(QName, Name) ->
Fun = fun(Q1) ->
Q1#amqqueue{pid = {Name, node()},
@@ -374,7 +385,7 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
Args :: rabbit_framing:amqp_table(), ActingUser :: binary(),
any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid,
+basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid,
ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args,
ActingUser, OkMsg, QState0) ->
%% TODO: validate consumer arguments
@@ -396,10 +407,19 @@ basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid,
Prefetch,
ConsumerMeta,
QState0),
+ {ok, {_, SacResult}, _} = ra:local_query(QPid,
+ fun rabbit_fifo:query_single_active_consumer/1),
+ IsSingleActiveConsumer = case SacResult of
+ {value, {ConsumerTag, ChPid}} ->
+ true;
+ _ ->
+ false
+ end,
+
%% TODO: emit as rabbit_fifo effect
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, QName,
- ConsumerPrefetchCount, Args),
+ ConsumerPrefetchCount, IsSingleActiveConsumer, Args),
{ok, QState}.
basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) ->
@@ -713,10 +733,24 @@ i(open_files, #amqqueue{pid = {Name, _},
quorum_nodes = Nodes}) ->
{Data, _} = rpc:multicall(Nodes, rabbit_quorum_queue, open_files, [Name]),
lists:flatten(Data);
-i(single_active_consumer_pid, _Q) ->
- '';
-i(single_active_consumer_ctag, _Q) ->
- '';
+i(single_active_consumer_pid, #amqqueue{pid = QPid}) ->
+ {ok, {_, SacResult}, _} = ra:local_query(QPid,
+ fun rabbit_fifo:query_single_active_consumer/1),
+ case SacResult of
+ {value, {_ConsumerTag, ChPid}} ->
+ ChPid;
+ _ ->
+ ''
+ end;
+i(single_active_consumer_ctag, #amqqueue{pid = QPid}) ->
+ {ok, {_, SacResult}, _} = ra:local_query(QPid,
+ fun rabbit_fifo:query_single_active_consumer/1),
+ case SacResult of
+ {value, {ConsumerTag, _ChPid}} ->
+ ConsumerTag;
+ _ ->
+ ''
+ end;
i(_K, _Q) -> ''.
open_files(Name) ->