diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 58 |
1 files changed, 47 insertions, 11 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 6fe8677e94..fc699111d1 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -700,17 +700,27 @@ query_processes(#state{enqueuers = Enqs, consumers = Cons0}) -> query_ra_indexes(#state{ra_indexes = RaIndexes}) -> RaIndexes. -query_consumer_count(#state{consumers = Consumers}) -> - maps:size(Consumers). - -query_consumers(#state{consumers = Consumers}) -> - maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) -> - {Pid, Tag, - maps:get(ack, Meta, undefined), - maps:get(prefetch, Meta, undefined), - maps:get(args, Meta, []), - maps:get(username, Meta, undefined)} - end, Consumers). +query_consumer_count(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> + maps:size(Consumers) + length(WaitingConsumers). + +query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> + FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) -> + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)} + end, Consumers), + FromWaitingConsumers = lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) -> + maps:put({Tag, Pid}, + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)}, + Acc) + end, #{}, WaitingConsumers), + maps:merge(FromConsumers, FromWaitingConsumers). %% other -spec usage(atom()) -> float(). @@ -2177,6 +2187,32 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti ok. +query_consumers_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, self()}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + + ?assertEqual(4, query_consumer_count(State1)), + Consumers = query_consumers(State1), + ?assertEqual(4, maps:size(Consumers)), + maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _}, _Acc) -> + ?assertEqual(self(), Pid) + end, [], Consumers). + meta(Idx) -> #{index => Idx, term => 1}. |
