summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_fifo.erl13
-rw-r--r--src/rabbit_queue_consumers.erl21
4 files changed, 34 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fe73e760b2..66e7cf0a3c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -224,7 +224,7 @@
-define(CONSUMER_INFO_KEYS,
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
- arguments]).
+ single_active, arguments]).
warn_file_limit() ->
DurableQueues = find_recoverable_queues(),
@@ -958,8 +958,8 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
get_queue_consumer_info(Q, ConsumerInfoKeys) ->
[lists:zip(ConsumerInfoKeys,
[Q#amqqueue.name, ChPid, CTag,
- AckRequired, Prefetch, Args]) ||
- {ChPid, CTag, AckRequired, Prefetch, Args, _} <- consumers(Q)].
+ AckRequired, Prefetch, SingleActive, Args]) ||
+ {ChPid, CTag, AckRequired, Prefetch, SingleActive, Args, _} <- consumers(Q)].
stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q);
stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 84428dd572..5eb72bfffd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -364,7 +364,7 @@ terminate_shutdown(Fun, #q{status = Status} = State) ->
QName = qname(State),
notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName, ActingUser) ||
- {Ch, CTag, _, _, _} <-
+ {Ch, CTag, _, _, _, _, _} <-
rabbit_queue_consumers:all(Consumers)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -1216,8 +1216,10 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(consumers, _From, State = #q{consumers = Consumers}) ->
+handle_call(consumers, _From, State = #q{consumers = Consumers, single_active_consumer_on = false}) ->
reply(rabbit_queue_consumers:all(Consumers), State);
+handle_call(consumers, _From, State = #q{consumers = Consumers, active_consumer = ActiveConsumer}) ->
+ reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer), State);
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index b7169bf924..48c9379970 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -704,11 +704,21 @@ query_ra_indexes(#state{ra_indexes = RaIndexes}) ->
query_consumer_count(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) ->
maps:size(Consumers) + length(WaitingConsumers).
-query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) ->
+query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers} = State) ->
+ SingleActiveConsumer = query_single_active_consumer(State),
+ IsSingleActiveConsumerFun = fun({Tag, Pid} = _ConsumerId) ->
+ case SingleActiveConsumer of
+ {value, {Tag, Pid}} ->
+ true;
+ _ ->
+ false
+ end
+ end,
FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) ->
{Pid, Tag,
maps:get(ack, Meta, undefined),
maps:get(prefetch, Meta, undefined),
+ IsSingleActiveConsumerFun({Tag, Pid}),
maps:get(args, Meta, []),
maps:get(username, Meta, undefined)}
end, Consumers),
@@ -717,6 +727,7 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume
{Pid, Tag,
maps:get(ack, Meta, undefined),
maps:get(prefetch, Meta, undefined),
+ IsSingleActiveConsumerFun({Tag, Pid}),
maps:get(args, Meta, []),
maps:get(username, Meta, undefined)},
Acc)
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index dd086c1af6..704c1a46ae 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -16,7 +16,7 @@
-module(rabbit_queue_consumers).
--export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
+-export([new/0, max_active_priority/1, inactive/1, all/1, all/2, count/0,
unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
send_drained/0, deliver/5, record_ack/3, subtract_acks/3,
possibly_unblock/3,
@@ -117,16 +117,25 @@ max_active_priority(#state{consumers = Consumers}) ->
inactive(#state{consumers = Consumers}) ->
priority_queue:is_empty(Consumers).
-all(#state{consumers = Consumers}) ->
- lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
- consumers(Consumers, []), all_ch_record()).
+all(State) ->
+ all(State, none).
-consumers(Consumers, Acc) ->
+all(#state{consumers = Consumers}, SingleActiveConsumer) ->
+ lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, Acc) end,
+ consumers(Consumers, SingleActiveConsumer, []), all_ch_record()).
+
+consumers(Consumers, SingleActiveConsumer, Acc) ->
priority_queue:fold(
fun ({ChPid, Consumer}, _P, Acc1) ->
#consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch,
args = Args, user = Username} = Consumer,
- [{ChPid, CTag, Ack, Prefetch, Args, Username} | Acc1]
+ IsSingleActive = case SingleActiveConsumer of
+ {ChPid, Consumer} ->
+ true;
+ _ ->
+ false
+ end,
+ [{ChPid, CTag, Ack, Prefetch, IsSingleActive, Args, Username} | Acc1]
end, Acc, Consumers).
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).