diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-21 14:34:52 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-21 14:34:52 +0100 |
| commit | 9f40da59d21e38c68659580338224d14c2a29337 (patch) | |
| tree | e66d1571cfd447372c49e6f57daab6007d449550 | |
| parent | dc9b575d8f81a990b531e7c89bb473bf66a7dbef (diff) | |
| download | rabbitmq-server-git-9f40da59d21e38c68659580338224d14c2a29337.tar.gz | |
Return active and activity status when listing consumers
[#163298456]
Fixes #1838
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 97 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 38 | ||||
| -rw-r--r-- | test/unit_queue_consumers_SUITE.erl | 30 |
5 files changed, 138 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 66e7cf0a3c..63d89ff4a7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -224,7 +224,7 @@ -define(CONSUMER_INFO_KEYS, [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, - single_active, arguments]). + active, activity_status, arguments]). warn_file_limit() -> DurableQueues = find_recoverable_queues(), @@ -958,8 +958,8 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) -> get_queue_consumer_info(Q, ConsumerInfoKeys) -> [lists:zip(ConsumerInfoKeys, [Q#amqqueue.name, ChPid, CTag, - AckRequired, Prefetch, SingleActive, Args]) || - {ChPid, CTag, AckRequired, Prefetch, SingleActive, Args, _} <- consumers(Q)]. + AckRequired, Prefetch, Active, ActivityStatus, Args]) || + {ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)]. stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q); stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index abf2f64f8f..38454d79e0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -364,7 +364,7 @@ terminate_shutdown(Fun, #q{status = Status} = State) -> QName = qname(State), notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName, ActingUser) || - {Ch, CTag, _, _, _, _, _} <- + {Ch, CTag, _, _, _, _, _, _} <- rabbit_queue_consumers:all(Consumers)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -1211,7 +1211,7 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State = #q{consumers = Consumers, single_active_consumer_on = false}) -> reply(rabbit_queue_consumers:all(Consumers), State); handle_call(consumers, _From, State = #q{consumers = Consumers, active_consumer = ActiveConsumer}) -> - reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer), State); + reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer, true), State); handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index e1d469f830..b7a949313c 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -727,31 +727,48 @@ query_consumer_count(#state{consumers = Consumers, maps:size(Consumers) + length(WaitingConsumers). query_consumers(#state{consumers = Consumers, - waiting_consumers = WaitingConsumers} = State) -> - SingleActiveConsumer = query_single_active_consumer(State), - IsSingleActiveConsumerFun = fun({Tag, Pid} = _ConsumerId) -> - case SingleActiveConsumer of - {value, {Tag, Pid}} -> - true; - _ -> - false - end - end, - FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) -> + waiting_consumers = WaitingConsumers, + consumer_strategy = ConsumerStrategy } = State) -> + ActiveActivityStatusFun = case ConsumerStrategy of + default -> + fun(_ConsumerId, #consumer{suspected_down = SuspectedDown}) -> + case SuspectedDown of + true -> + {false, suspected_down}; + false -> + {true, up} + end + end; + single_active -> + SingleActiveConsumer = query_single_active_consumer(State), + fun({Tag, Pid} = _Consumer, _) -> + case SingleActiveConsumer of + {value, {Tag, Pid}} -> + {true, single_active}; + _ -> + {false, waiting} + end + end + end, + FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta} = Consumer) -> + {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), {Pid, Tag, maps:get(ack, Meta, undefined), maps:get(prefetch, Meta, undefined), - IsSingleActiveConsumerFun({Tag, Pid}), + Active, + ActivityStatus, maps:get(args, Meta, []), maps:get(username, Meta, undefined)} end, Consumers), FromWaitingConsumers = - lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) -> + lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> + {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), maps:put({Tag, Pid}, {Pid, Tag, maps:get(ack, Meta, undefined), maps:get(prefetch, Meta, undefined), - IsSingleActiveConsumerFun({Tag, Pid}), + Active, + ActivityStatus, maps:get(args, Meta, []), maps:get(username, Meta, undefined)}, Acc) @@ -761,6 +778,8 @@ query_consumers(#state{consumers = Consumers, query_single_active_consumer(#state{consumer_strategy = single_active, consumers = Consumers}) -> case maps:size(Consumers) of + 0 -> + {error, no_value}; 1 -> {value, lists:nth(1, maps:keys(Consumers))}; _ @@ -2345,6 +2364,44 @@ query_consumers_test() -> queue_resource => rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), shadow_copy_interval => 0, + single_active_consumer_on => false}), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, self()}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + Consumers0 = State1#state.consumers, + Consumer = maps:get({<<"ctag2">>, self()}, Consumers0), + Consumers1 = maps:put({<<"ctag2">>, self()}, Consumer#consumer{suspected_down = true}, Consumers0), + State2 = State1#state{consumers = Consumers1}, + + ?assertEqual(4, query_consumer_count(State2)), + Consumers2 = query_consumers(State2), + ?assertEqual(4, maps:size(Consumers2)), + maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + <<"ctag2">> -> + ?assertNot(Active), + ?assertEqual(suspected_down, ActivityStatus); + _ -> + ?assert(Active), + ?assertEqual(up, ActivityStatus) + end + end, [], Consumers2). + +query_consumers_when_single_active_consumer_is_on_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, single_active_consumer_on => true}), % adding some consumers @@ -2362,8 +2419,16 @@ query_consumers_test() -> ?assertEqual(4, query_consumer_count(State1)), Consumers = query_consumers(State1), ?assertEqual(4, maps:size(Consumers)), - maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _, _}, _Acc) -> - ?assertEqual(self(), Pid) + maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + <<"ctag1">> -> + ?assert(Active), + ?assertEqual(single_active, ActivityStatus); + _ -> + ?assertNot(Active), + ?assertEqual(waiting, ActivityStatus) + end end, [], Consumers). active_flag_updated_when_consumer_suspected_unsuspected_test() -> diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 704c1a46ae..7a0c0f98e3 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -16,7 +16,7 @@ -module(rabbit_queue_consumers). --export([new/0, max_active_priority/1, inactive/1, all/1, all/2, count/0, +-export([new/0, max_active_priority/1, inactive/1, all/1, all/3, count/0, unacknowledged_message_count/0, add/10, remove/3, erase_ch/2, send_drained/0, deliver/5, record_ack/3, subtract_acks/3, possibly_unblock/3, @@ -118,24 +118,32 @@ inactive(#state{consumers = Consumers}) -> priority_queue:is_empty(Consumers). all(State) -> - all(State, none). - -all(#state{consumers = Consumers}, SingleActiveConsumer) -> - lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, Acc) end, - consumers(Consumers, SingleActiveConsumer, []), all_ch_record()). - -consumers(Consumers, SingleActiveConsumer, Acc) -> + all(State, none, false). + +all(#state{consumers = Consumers}, SingleActiveConsumer, SingleActiveConsumerOn) -> + lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) end, + consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, []), all_ch_record()). + +consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) -> + ActiveActivityStatusFun = case SingleActiveConsumerOn of + true -> + fun({ChPid, Consumer}) -> + case SingleActiveConsumer of + {ChPid, Consumer} -> + {true, single_active}; + _ -> + {false, waiting} + end + end; + false -> + fun(_) -> {true, up} end + end, priority_queue:fold( fun ({ChPid, Consumer}, _P, Acc1) -> #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch, args = Args, user = Username} = Consumer, - IsSingleActive = case SingleActiveConsumer of - {ChPid, Consumer} -> - true; - _ -> - false - end, - [{ChPid, CTag, Ack, Prefetch, IsSingleActive, Args, Username} | Acc1] + {Active, ActivityStatus} = ActiveActivityStatusFun({ChPid, Consumer}), + [{ChPid, CTag, Ack, Prefetch, Active, ActivityStatus, Args, Username} | Acc1] end, Acc, Consumers). count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). diff --git a/test/unit_queue_consumers_SUITE.erl b/test/unit_queue_consumers_SUITE.erl index 08b2da4574..39838b0874 100644 --- a/test/unit_queue_consumers_SUITE.erl +++ b/test/unit_queue_consumers_SUITE.erl @@ -25,7 +25,8 @@ all() -> [ is_same, get_consumer, - get + get, + list_consumers ]. is_same(_Config) -> @@ -79,6 +80,33 @@ get_consumer(_Config) -> ), ok. +list_consumers(_Config) -> + State = state(consumers([consumer(self(), <<"1">>), consumer(self(), <<"2">>), consumer(self(), <<"3">>)])), + Consumer = rabbit_queue_consumers:get_consumer(State), + {_Pid, ConsumerRecord} = Consumer, + CTag = rabbit_queue_consumers:consumer_tag(ConsumerRecord), + ConsumersWithSingleActive = rabbit_queue_consumers:all(State, Consumer, true), + ?assertEqual(3, length(ConsumersWithSingleActive)), + lists:foldl(fun({Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + CTag -> + ?assert(Active), + ?assertEqual(single_active, ActivityStatus); + _ -> + ?assertNot(Active), + ?assertEqual(waiting, ActivityStatus) + end + end, [], ConsumersWithSingleActive), + ConsumersNoSingleActive = rabbit_queue_consumers:all(State, none, false), + ?assertEqual(3, length(ConsumersNoSingleActive)), + lists:foldl(fun({Pid, _, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + ?assert(Active), + ?assertEqual(up, ActivityStatus) + end, [], ConsumersNoSingleActive), + ok. + consumers([]) -> priority_queue:new(); consumers(Consumers) -> |
