diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 21 |
4 files changed, 34 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fe73e760b2..66e7cf0a3c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -224,7 +224,7 @@ -define(CONSUMER_INFO_KEYS, [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, - arguments]). + single_active, arguments]). warn_file_limit() -> DurableQueues = find_recoverable_queues(), @@ -958,8 +958,8 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) -> get_queue_consumer_info(Q, ConsumerInfoKeys) -> [lists:zip(ConsumerInfoKeys, [Q#amqqueue.name, ChPid, CTag, - AckRequired, Prefetch, Args]) || - {ChPid, CTag, AckRequired, Prefetch, Args, _} <- consumers(Q)]. + AckRequired, Prefetch, SingleActive, Args]) || + {ChPid, CTag, AckRequired, Prefetch, SingleActive, Args, _} <- consumers(Q)]. stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q); stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 84428dd572..5eb72bfffd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -364,7 +364,7 @@ terminate_shutdown(Fun, #q{status = Status} = State) -> QName = qname(State), notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName, ActingUser) || - {Ch, CTag, _, _, _} <- + {Ch, CTag, _, _, _, _, _} <- rabbit_queue_consumers:all(Consumers)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -1216,8 +1216,10 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(consumers, _From, State = #q{consumers = Consumers}) -> +handle_call(consumers, _From, State = #q{consumers = Consumers, single_active_consumer_on = false}) -> reply(rabbit_queue_consumers:all(Consumers), State); +handle_call(consumers, _From, State = #q{consumers = Consumers, active_consumer = ActiveConsumer}) -> + reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer), State); handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index b7169bf924..48c9379970 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -704,11 +704,21 @@ query_ra_indexes(#state{ra_indexes = RaIndexes}) -> query_consumer_count(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> maps:size(Consumers) + length(WaitingConsumers). -query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> +query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers} = State) -> + SingleActiveConsumer = query_single_active_consumer(State), + IsSingleActiveConsumerFun = fun({Tag, Pid} = _ConsumerId) -> + case SingleActiveConsumer of + {value, {Tag, Pid}} -> + true; + _ -> + false + end + end, FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) -> {Pid, Tag, maps:get(ack, Meta, undefined), maps:get(prefetch, Meta, undefined), + IsSingleActiveConsumerFun({Tag, Pid}), maps:get(args, Meta, []), maps:get(username, Meta, undefined)} end, Consumers), @@ -717,6 +727,7 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume {Pid, Tag, maps:get(ack, Meta, undefined), maps:get(prefetch, Meta, undefined), + IsSingleActiveConsumerFun({Tag, Pid}), maps:get(args, Meta, []), maps:get(username, Meta, undefined)}, Acc) diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index dd086c1af6..704c1a46ae 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -16,7 +16,7 @@ -module(rabbit_queue_consumers). --export([new/0, max_active_priority/1, inactive/1, all/1, count/0, +-export([new/0, max_active_priority/1, inactive/1, all/1, all/2, count/0, unacknowledged_message_count/0, add/10, remove/3, erase_ch/2, send_drained/0, deliver/5, record_ack/3, subtract_acks/3, possibly_unblock/3, @@ -117,16 +117,25 @@ max_active_priority(#state{consumers = Consumers}) -> inactive(#state{consumers = Consumers}) -> priority_queue:is_empty(Consumers). -all(#state{consumers = Consumers}) -> - lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, - consumers(Consumers, []), all_ch_record()). +all(State) -> + all(State, none). -consumers(Consumers, Acc) -> +all(#state{consumers = Consumers}, SingleActiveConsumer) -> + lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, Acc) end, + consumers(Consumers, SingleActiveConsumer, []), all_ch_record()). + +consumers(Consumers, SingleActiveConsumer, Acc) -> priority_queue:fold( fun ({ChPid, Consumer}, _P, Acc1) -> #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch, args = Args, user = Username} = Consumer, - [{ChPid, CTag, Ack, Prefetch, Args, Username} | Acc1] + IsSingleActive = case SingleActiveConsumer of + {ChPid, Consumer} -> + true; + _ -> + false + end, + [{ChPid, CTag, Ack, Prefetch, IsSingleActive, Args, Username} | Acc1] end, Acc, Consumers). count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). |
