diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-14 11:01:43 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-14 11:01:43 +0100 |
| commit | 4502b9180e099a3ee9009ed99c9e2ba11d66b9b7 (patch) | |
| tree | eb7433ed2abd812175714147f4e71ee79585925d /src | |
| parent | 2755617b7a85208e13f0d00c9a36772672363be7 (diff) | |
| download | rabbitmq-server-git-4502b9180e099a3ee9009ed99c9e2ba11d66b9b7.tar.gz | |
Add waiting consumers when listing consumers in QQ
Necessary when single active consumer is on, otherwise only the single
active consumer shows up in the command line.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 58 |
1 files changed, 47 insertions, 11 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 6fe8677e94..fc699111d1 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -700,17 +700,27 @@ query_processes(#state{enqueuers = Enqs, consumers = Cons0}) -> query_ra_indexes(#state{ra_indexes = RaIndexes}) -> RaIndexes. -query_consumer_count(#state{consumers = Consumers}) -> - maps:size(Consumers). - -query_consumers(#state{consumers = Consumers}) -> - maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) -> - {Pid, Tag, - maps:get(ack, Meta, undefined), - maps:get(prefetch, Meta, undefined), - maps:get(args, Meta, []), - maps:get(username, Meta, undefined)} - end, Consumers). +query_consumer_count(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> + maps:size(Consumers) + length(WaitingConsumers). + +query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> + FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) -> + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)} + end, Consumers), + FromWaitingConsumers = lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) -> + maps:put({Tag, Pid}, + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)}, + Acc) + end, #{}, WaitingConsumers), + maps:merge(FromConsumers, FromWaitingConsumers). %% other -spec usage(atom()) -> float(). @@ -2177,6 +2187,32 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti ok. +query_consumers_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, self()}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + + ?assertEqual(4, query_consumer_count(State1)), + Consumers = query_consumers(State1), + ?assertEqual(4, maps:size(Consumers)), + maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _}, _Acc) -> + ?assertEqual(self(), Pid) + end, [], Consumers). + meta(Idx) -> #{index => Idx, term => 1}. |
