summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2019-01-28 20:49:05 +0400
committerGitHub <noreply@github.com>2019-01-28 20:49:05 +0400
commitdd947c6081e5497e74aa2ba8ff4386971bac69dc (patch)
treeeff2f2f31d572615bc2453378f3b760d1323bb9f
parenta4b602567081b28c4bc53ac5995b5c054a305da9 (diff)
parentdcf663fcf99eeefb1c0de07e7389b47988cfbd92 (diff)
downloadrabbitmq-server-git-dd947c6081e5497e74aa2ba8ff4386971bac69dc.tar.gz
Merge pull request #1839 from rabbitmq/rabbitmq-server-1838-active-field-for-consumers
Update active flag for consumers
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl23
-rw-r--r--src/rabbit_core_metrics_gc.erl2
-rw-r--r--src/rabbit_fifo.erl244
-rw-r--r--src/rabbit_queue_consumers.erl38
-rw-r--r--src/rabbit_quorum_queue.erl31
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl2
-rw-r--r--test/unit_queue_consumers_SUITE.erl30
8 files changed, 283 insertions, 93 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 66e7cf0a3c..63d89ff4a7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -224,7 +224,7 @@
-define(CONSUMER_INFO_KEYS,
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
- single_active, arguments]).
+ active, activity_status, arguments]).
warn_file_limit() ->
DurableQueues = find_recoverable_queues(),
@@ -958,8 +958,8 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
get_queue_consumer_info(Q, ConsumerInfoKeys) ->
[lists:zip(ConsumerInfoKeys,
[Q#amqqueue.name, ChPid, CTag,
- AckRequired, Prefetch, SingleActive, Args]) ||
- {ChPid, CTag, AckRequired, Prefetch, SingleActive, Args, _} <- consumers(Q)].
+ AckRequired, Prefetch, Active, ActivityStatus, Args]) ||
+ {ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)].
stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q);
stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index af42d68359..5782bc6e23 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -364,7 +364,7 @@ terminate_shutdown(Fun, #q{status = Status} = State) ->
QName = qname(State),
notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName, ActingUser) ||
- {Ch, CTag, _, _, _, _, _} <-
+ {Ch, CTag, _, _, _, _, _, _} <-
rabbit_queue_consumers:all(Consumers)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -1211,7 +1211,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State = #q{consumers = Consumers, single_active_consumer_on = false}) ->
reply(rabbit_queue_consumers:all(Consumers), State);
handle_call(consumers, _From, State = #q{consumers = Consumers, active_consumer = ActiveConsumer}) ->
- reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer), State);
+ reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer, true), State);
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -1296,15 +1296,18 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
QName = qname(State1),
AckRequired = not NoAck,
TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers),
- IsSingleActiveConsumer = case {SingleActiveConsumerOn, State1#q.active_consumer} of
- {true, TheConsumer} ->
- true;
- _ ->
- false
- end,
+ {ConsumerIsActive, ActivityStatus} =
+ case {SingleActiveConsumerOn, State1#q.active_consumer} of
+ {true, TheConsumer} ->
+ {true, single_active};
+ {true, _} ->
+ {false, waiting};
+ {false, _} ->
+ {true, up}
+ end,
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
- PrefetchCount, IsSingleActiveConsumer, Args),
+ PrefetchCount, ConsumerIsActive, ActivityStatus, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, PrefetchCount,
Args, none, ActingUser),
@@ -1425,7 +1428,7 @@ maybe_notify_consumer_updated(#q{single_active_consumer_on = true} = State, _Pre
{Tag, Ack, Prefetch, Args} = rabbit_queue_consumers:get_infos(Consumer),
rabbit_core_metrics:consumer_updated(
ChPid, Tag, false, Ack, qname(State),
- Prefetch, true, Args
+ Prefetch, true, single_active, Args
),
ok;
_ ->
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index 9c516bda3d..d4c065e64f 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -160,7 +160,7 @@ gc_process_and_entity(Table, GbSet) ->
({{Pid, Id} = Key, _, _, _, _}, none)
when Table == channel_exchange_metrics ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
- ({{Id, Pid, _} = Key, _, _, _, _, _}, none)
+ ({{Id, Pid, _} = Key, _, _, _, _, _, _}, none)
when Table == consumer_created ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index c3b5e66355..612cd52dcc 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -460,22 +460,24 @@ apply(_, {down, ConsumerPid, noconnection},
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
Node = node(ConsumerPid),
+ ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
% mark all consumers and enqueuers as suspected down
% and monitor the node so that we can find out the final state of the
% process at some later point
- {Cons, State} = maps:fold(
- fun({_, P} = K,
- #consumer{checked_out = Checked0} = C,
- {Co, St0}) when node(P) =:= Node ->
- St = return_all(St0, Checked0),
- Credit = increase_credit(C, maps:size(Checked0)),
- {maps:put(K, C#consumer{suspected_down = true,
- credit = Credit,
- checked_out = #{}}, Co),
- St};
- (K, C, {Co, St}) ->
- {maps:put(K, C, Co), St}
- end, {#{}, State0}, Cons0),
+ {Cons, State, Effects1} = maps:fold(
+ fun({_, P} = K,
+ #consumer{checked_out = Checked0} = C,
+ {Co, St0, Eff}) when node(P) =:= Node ->
+ St = return_all(St0, Checked0),
+ Credit = increase_credit(C, maps:size(Checked0)),
+ Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff),
+ {maps:put(K, C#consumer{suspected_down = true,
+ credit = Credit,
+ checked_out = #{}}, Co),
+ St, Eff1};
+ (K, C, {Co, St, Eff}) ->
+ {maps:put(K, C, Co), St, Eff}
+ end, {#{}, State0, []}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
@@ -483,17 +485,17 @@ apply(_, {down, ConsumerPid, noconnection},
% mark waiting consumers as suspected if necessary
WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, true),
- Effects = case maps:size(Cons) of
- 0 ->
- [{aux, inactive}, {monitor, node, Node}];
- _ ->
- [{monitor, node, Node}]
- end,
+ Effects2 = case maps:size(Cons) of
+ 0 ->
+ [{aux, inactive}, {monitor, node, Node}];
+ _ ->
+ [{monitor, node, Node}]
+ end ++ Effects1,
%% TODO: should we run a checkout here?
{State#state{consumers = Cons, enqueuers = Enqs,
- waiting_consumers = WaitingConsumers}, ok, Effects};
+ waiting_consumers = WaitingConsumers}, ok, Effects2};
apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
@@ -531,12 +533,14 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
E#enqueuer{suspected_down = false};
(_, E) -> E
end, Enqs0),
+ ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
{Cons1, SQ, Effects} =
maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
when node(P) =:= Node ->
+ EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc),
update_or_remove_sub(
ConsumerId, C#consumer{suspected_down = false},
- CAcc, SQAcc, EAcc);
+ CAcc, SQAcc, EAcc1);
(_, _, Acc) ->
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
@@ -549,6 +553,15 @@ apply(_, {nodedown, _Node}, State) ->
apply(Meta, #update_config{config = Conf}, State) ->
checkout(Meta, update_config(Conf, State), []).
+consumer_active_flag_update_function(#state{consumer_strategy = default}) ->
+ fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
+ consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects)
+ end;
+consumer_active_flag_update_function(#state{consumer_strategy = single_active}) ->
+ fun(_, _, _, _, _, Effects) ->
+ Effects
+ end.
+
handle_waiting_consumer_down(_Pid,
#state{consumer_strategy = default} = State) ->
{[], State};
@@ -718,31 +731,48 @@ query_consumer_count(#state{consumers = Consumers,
maps:size(Consumers) + length(WaitingConsumers).
query_consumers(#state{consumers = Consumers,
- waiting_consumers = WaitingConsumers} = State) ->
- SingleActiveConsumer = query_single_active_consumer(State),
- IsSingleActiveConsumerFun = fun({Tag, Pid} = _ConsumerId) ->
- case SingleActiveConsumer of
- {value, {Tag, Pid}} ->
- true;
- _ ->
- false
- end
- end,
- FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) ->
+ waiting_consumers = WaitingConsumers,
+ consumer_strategy = ConsumerStrategy } = State) ->
+ ActiveActivityStatusFun = case ConsumerStrategy of
+ default ->
+ fun(_ConsumerId, #consumer{suspected_down = SuspectedDown}) ->
+ case SuspectedDown of
+ true ->
+ {false, suspected_down};
+ false ->
+ {true, up}
+ end
+ end;
+ single_active ->
+ SingleActiveConsumer = query_single_active_consumer(State),
+ fun({Tag, Pid} = _Consumer, _) ->
+ case SingleActiveConsumer of
+ {value, {Tag, Pid}} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end
+ end
+ end,
+ FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta} = Consumer) ->
+ {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
{Pid, Tag,
maps:get(ack, Meta, undefined),
maps:get(prefetch, Meta, undefined),
- IsSingleActiveConsumerFun({Tag, Pid}),
+ Active,
+ ActivityStatus,
maps:get(args, Meta, []),
maps:get(username, Meta, undefined)}
end, Consumers),
FromWaitingConsumers =
- lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) ->
+ lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
+ {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
maps:put({Tag, Pid},
{Pid, Tag,
maps:get(ack, Meta, undefined),
maps:get(prefetch, Meta, undefined),
- IsSingleActiveConsumerFun({Tag, Pid}),
+ Active,
+ ActivityStatus,
maps:get(args, Meta, []),
maps:get(username, Meta, undefined)},
Acc)
@@ -752,6 +782,8 @@ query_consumers(#state{consumers = Consumers,
query_single_active_consumer(#state{consumer_strategy = single_active,
consumers = Consumers}) ->
case maps:size(Consumers) of
+ 0 ->
+ {error, no_value};
1 ->
{value, lists:nth(1, maps:keys(Consumers))};
_
@@ -852,8 +884,10 @@ cancel_consumer(ConsumerId,
NewActiveConsumer},
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
- add_consumer_promotion_effect(State, NewActiveConsumerId,
- NewActiveConsumer, Effects1);
+ Effects2 = consumer_update_active_effects(State, NewActiveConsumerId,
+ NewActiveConsumer, true,
+ single_active, Effects1),
+ {State, Effects2};
error ->
% The cancelled consumer is not the active one
% Just remove it from idle_consumers
@@ -865,18 +899,18 @@ cancel_consumer(ConsumerId,
{State0#state{waiting_consumers = WaitingConsumers}, Effects}
end.
-add_consumer_promotion_effect(#state{consumer_strategy = single_active,
- queue_resource = QName} = State,
- ConsumerId, #consumer{meta = Meta},
- Effects) ->
+consumer_update_active_effects(#state{queue_resource = QName },
+ ConsumerId, #consumer{meta = Meta},
+ Active, ActivityStatus,
+ Effects) ->
Ack = maps:get(ack, Meta, undefined),
Prefetch = maps:get(prefetch, Meta, undefined),
Args = maps:get(args, Meta, []),
- {State, [{mod_call,
- rabbit_quorum_queue,
- update_consumer_handler,
- [QName, ConsumerId, false, Ack, Prefetch, true, Args]}
- | Effects]}.
+ [{mod_call,
+ rabbit_quorum_queue,
+ update_consumer_handler,
+ [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
+ | Effects].
cancel_consumer0(ConsumerId,
#state{consumers = C0} = S0, Effects0) ->
@@ -2429,6 +2463,44 @@ query_consumers_test() ->
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
shadow_copy_interval => 0,
+ single_active_consumer_on => false}),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ #{index => 1},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, self()}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ Consumers0 = State1#state.consumers,
+ Consumer = maps:get({<<"ctag2">>, self()}, Consumers0),
+ Consumers1 = maps:put({<<"ctag2">>, self()}, Consumer#consumer{suspected_down = true}, Consumers0),
+ State2 = State1#state{consumers = Consumers1},
+
+ ?assertEqual(4, query_consumer_count(State2)),
+ Consumers2 = query_consumers(State2),
+ ?assertEqual(4, maps:size(Consumers2)),
+ maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ case Tag of
+ <<"ctag2">> ->
+ ?assertNot(Active),
+ ?assertEqual(suspected_down, ActivityStatus);
+ _ ->
+ ?assert(Active),
+ ?assertEqual(up, ActivityStatus)
+ end
+ end, [], Consumers2).
+
+query_consumers_when_single_active_consumer_is_on_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
single_active_consumer_on => true}),
Meta = #{index => 1},
% adding some consumers
@@ -2446,10 +2518,84 @@ query_consumers_test() ->
?assertEqual(4, query_consumer_count(State1)),
Consumers = query_consumers(State1),
?assertEqual(4, maps:size(Consumers)),
- maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _, _}, _Acc) ->
- ?assertEqual(self(), Pid)
+ maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ case Tag of
+ <<"ctag1">> ->
+ ?assert(Active),
+ ?assertEqual(single_active, ActivityStatus);
+ _ ->
+ ?assertNot(Active),
+ ?assertEqual(waiting, ActivityStatus)
+ end
end, [], Consumers).
+active_flag_updated_when_consumer_suspected_unsuspected_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => false}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{index => 1},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, ChannelId}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
+ % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
+ ?assertEqual(4 + 1, length(Effects2)),
+
+ {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
+ % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
+ ?assertEqual(4 + 4, length(Effects3)).
+
+active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{index => 1},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, ChannelId}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
+ % only 1 effect to monitor the node
+ ?assertEqual(1, length(Effects2)),
+
+ {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
+ % for each consumer: 1 effect to monitor the consumer PID
+ ?assertEqual(4, length(Effects3)).
+
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 704c1a46ae..7a0c0f98e3 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -16,7 +16,7 @@
-module(rabbit_queue_consumers).
--export([new/0, max_active_priority/1, inactive/1, all/1, all/2, count/0,
+-export([new/0, max_active_priority/1, inactive/1, all/1, all/3, count/0,
unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
send_drained/0, deliver/5, record_ack/3, subtract_acks/3,
possibly_unblock/3,
@@ -118,24 +118,32 @@ inactive(#state{consumers = Consumers}) ->
priority_queue:is_empty(Consumers).
all(State) ->
- all(State, none).
-
-all(#state{consumers = Consumers}, SingleActiveConsumer) ->
- lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, Acc) end,
- consumers(Consumers, SingleActiveConsumer, []), all_ch_record()).
-
-consumers(Consumers, SingleActiveConsumer, Acc) ->
+ all(State, none, false).
+
+all(#state{consumers = Consumers}, SingleActiveConsumer, SingleActiveConsumerOn) ->
+ lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) end,
+ consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, []), all_ch_record()).
+
+consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) ->
+ ActiveActivityStatusFun = case SingleActiveConsumerOn of
+ true ->
+ fun({ChPid, Consumer}) ->
+ case SingleActiveConsumer of
+ {ChPid, Consumer} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end
+ end;
+ false ->
+ fun(_) -> {true, up} end
+ end,
priority_queue:fold(
fun ({ChPid, Consumer}, _P, Acc1) ->
#consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch,
args = Args, user = Username} = Consumer,
- IsSingleActive = case SingleActiveConsumer of
- {ChPid, Consumer} ->
- true;
- _ ->
- false
- end,
- [{ChPid, CTag, Ack, Prefetch, IsSingleActive, Args, Username} | Acc1]
+ {Active, ActivityStatus} = ActiveActivityStatusFun({ChPid, Consumer}),
+ [{ChPid, CTag, Ack, Prefetch, Active, ActivityStatus, Args, Username} | Acc1]
end, Acc, Consumers).
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 1f3aa1f38b..114af4fdfb 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -26,7 +26,7 @@
-export([dead_letter_publish/4]).
-export([queue_name/1]).
-export([cluster_state/1, status/2]).
--export([update_consumer_handler/7, update_consumer/8]).
+-export([update_consumer_handler/8, update_consumer/9]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
-export([become_leader/2, update_metrics/2]).
-export([rpc_delete_metrics/1]).
@@ -169,13 +169,13 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
_ -> false
end.
-update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, SingleActive, Args) ->
+update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
- [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args]).
+ [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args]).
-update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args) ->
+update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired,
- QName, Prefetch, SingleActive, Args).
+ QName, Prefetch, Active, ActivityStatus, Args).
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]).
@@ -419,7 +419,7 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
Args :: rabbit_framing:amqp_table(), ActingUser :: binary(),
any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid,
+basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum} = Q, NoAck, ChPid,
ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args,
ActingUser, OkMsg, QState0) ->
%% TODO: validate consumer arguments
@@ -443,17 +443,22 @@ basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid,
QState0),
{ok, {_, SacResult}, _} = ra:local_query(QPid,
fun rabbit_fifo:query_single_active_consumer/1),
- IsSingleActiveConsumer = case SacResult of
- {value, {ConsumerTag, ChPid}} ->
- true;
- _ ->
- false
- end,
+
+ SingleActiveConsumerOn = single_active_consumer_on(Q),
+ {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
+ {false, _} ->
+ {true, up};
+ {true, {value, {ConsumerTag, ChPid}}} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end,
%% TODO: emit as rabbit_fifo effect
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, QName,
- ConsumerPrefetchCount, IsSingleActiveConsumer, Args),
+ ConsumerPrefetchCount, IsSingleActiveConsumer,
+ ActivityStatus, Args),
{ok, QState}.
basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) ->
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index 0518c05dbd..7ae43aa7e1 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -300,7 +300,7 @@ consumer_metrics(Config) ->
CTag = <<"tag">>,
rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics,
consumer_created, [DeadPid, CTag, true, true,
- QName, 1, false, []]),
+ QName, 1, false, waiting, []]),
Id = {QName, DeadPid, CTag},
[_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, [consumer_created, Id]),
diff --git a/test/unit_queue_consumers_SUITE.erl b/test/unit_queue_consumers_SUITE.erl
index 08b2da4574..39838b0874 100644
--- a/test/unit_queue_consumers_SUITE.erl
+++ b/test/unit_queue_consumers_SUITE.erl
@@ -25,7 +25,8 @@ all() ->
[
is_same,
get_consumer,
- get
+ get,
+ list_consumers
].
is_same(_Config) ->
@@ -79,6 +80,33 @@ get_consumer(_Config) ->
),
ok.
+list_consumers(_Config) ->
+ State = state(consumers([consumer(self(), <<"1">>), consumer(self(), <<"2">>), consumer(self(), <<"3">>)])),
+ Consumer = rabbit_queue_consumers:get_consumer(State),
+ {_Pid, ConsumerRecord} = Consumer,
+ CTag = rabbit_queue_consumers:consumer_tag(ConsumerRecord),
+ ConsumersWithSingleActive = rabbit_queue_consumers:all(State, Consumer, true),
+ ?assertEqual(3, length(ConsumersWithSingleActive)),
+ lists:foldl(fun({Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ case Tag of
+ CTag ->
+ ?assert(Active),
+ ?assertEqual(single_active, ActivityStatus);
+ _ ->
+ ?assertNot(Active),
+ ?assertEqual(waiting, ActivityStatus)
+ end
+ end, [], ConsumersWithSingleActive),
+ ConsumersNoSingleActive = rabbit_queue_consumers:all(State, none, false),
+ ?assertEqual(3, length(ConsumersNoSingleActive)),
+ lists:foldl(fun({Pid, _, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ ?assert(Active),
+ ?assertEqual(up, ActivityStatus)
+ end, [], ConsumersNoSingleActive),
+ ok.
+
consumers([]) ->
priority_queue:new();
consumers(Consumers) ->