summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl58
1 files changed, 47 insertions, 11 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 6fe8677e94..fc699111d1 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -700,17 +700,27 @@ query_processes(#state{enqueuers = Enqs, consumers = Cons0}) ->
query_ra_indexes(#state{ra_indexes = RaIndexes}) ->
RaIndexes.
-query_consumer_count(#state{consumers = Consumers}) ->
- maps:size(Consumers).
-
-query_consumers(#state{consumers = Consumers}) ->
- maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) ->
- {Pid, Tag,
- maps:get(ack, Meta, undefined),
- maps:get(prefetch, Meta, undefined),
- maps:get(args, Meta, []),
- maps:get(username, Meta, undefined)}
- end, Consumers).
+query_consumer_count(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) ->
+ maps:size(Consumers) + length(WaitingConsumers).
+
+query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) ->
+ FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) ->
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)}
+ end, Consumers),
+ FromWaitingConsumers = lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) ->
+ maps:put({Tag, Pid},
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)},
+ Acc)
+ end, #{}, WaitingConsumers),
+ maps:merge(FromConsumers, FromWaitingConsumers).
%% other
-spec usage(atom()) -> float().
@@ -2177,6 +2187,32 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
ok.
+query_consumers_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, self()}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+
+ ?assertEqual(4, query_consumer_count(State1)),
+ Consumers = query_consumers(State1),
+ ?assertEqual(4, maps:size(Consumers)),
+ maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid)
+ end, [], Consumers).
+
meta(Idx) ->
#{index => Idx, term => 1}.