summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-21 14:34:52 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-21 14:34:52 +0100
commit9f40da59d21e38c68659580338224d14c2a29337 (patch)
treee66d1571cfd447372c49e6f57daab6007d449550
parentdc9b575d8f81a990b531e7c89bb473bf66a7dbef (diff)
downloadrabbitmq-server-git-9f40da59d21e38c68659580338224d14c2a29337.tar.gz
Return active and activity status when listing consumers
[#163298456] Fixes #1838
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_fifo.erl97
-rw-r--r--src/rabbit_queue_consumers.erl38
-rw-r--r--test/unit_queue_consumers_SUITE.erl30
5 files changed, 138 insertions, 37 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 66e7cf0a3c..63d89ff4a7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -224,7 +224,7 @@
-define(CONSUMER_INFO_KEYS,
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
- single_active, arguments]).
+ active, activity_status, arguments]).
warn_file_limit() ->
DurableQueues = find_recoverable_queues(),
@@ -958,8 +958,8 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
get_queue_consumer_info(Q, ConsumerInfoKeys) ->
[lists:zip(ConsumerInfoKeys,
[Q#amqqueue.name, ChPid, CTag,
- AckRequired, Prefetch, SingleActive, Args]) ||
- {ChPid, CTag, AckRequired, Prefetch, SingleActive, Args, _} <- consumers(Q)].
+ AckRequired, Prefetch, Active, ActivityStatus, Args]) ||
+ {ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)].
stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q);
stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index abf2f64f8f..38454d79e0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -364,7 +364,7 @@ terminate_shutdown(Fun, #q{status = Status} = State) ->
QName = qname(State),
notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName, ActingUser) ||
- {Ch, CTag, _, _, _, _, _} <-
+ {Ch, CTag, _, _, _, _, _, _} <-
rabbit_queue_consumers:all(Consumers)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -1211,7 +1211,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State = #q{consumers = Consumers, single_active_consumer_on = false}) ->
reply(rabbit_queue_consumers:all(Consumers), State);
handle_call(consumers, _From, State = #q{consumers = Consumers, active_consumer = ActiveConsumer}) ->
- reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer), State);
+ reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer, true), State);
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index e1d469f830..b7a949313c 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -727,31 +727,48 @@ query_consumer_count(#state{consumers = Consumers,
maps:size(Consumers) + length(WaitingConsumers).
query_consumers(#state{consumers = Consumers,
- waiting_consumers = WaitingConsumers} = State) ->
- SingleActiveConsumer = query_single_active_consumer(State),
- IsSingleActiveConsumerFun = fun({Tag, Pid} = _ConsumerId) ->
- case SingleActiveConsumer of
- {value, {Tag, Pid}} ->
- true;
- _ ->
- false
- end
- end,
- FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) ->
+ waiting_consumers = WaitingConsumers,
+ consumer_strategy = ConsumerStrategy } = State) ->
+ ActiveActivityStatusFun = case ConsumerStrategy of
+ default ->
+ fun(_ConsumerId, #consumer{suspected_down = SuspectedDown}) ->
+ case SuspectedDown of
+ true ->
+ {false, suspected_down};
+ false ->
+ {true, up}
+ end
+ end;
+ single_active ->
+ SingleActiveConsumer = query_single_active_consumer(State),
+ fun({Tag, Pid} = _Consumer, _) ->
+ case SingleActiveConsumer of
+ {value, {Tag, Pid}} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end
+ end
+ end,
+ FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta} = Consumer) ->
+ {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
{Pid, Tag,
maps:get(ack, Meta, undefined),
maps:get(prefetch, Meta, undefined),
- IsSingleActiveConsumerFun({Tag, Pid}),
+ Active,
+ ActivityStatus,
maps:get(args, Meta, []),
maps:get(username, Meta, undefined)}
end, Consumers),
FromWaitingConsumers =
- lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) ->
+ lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
+ {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
maps:put({Tag, Pid},
{Pid, Tag,
maps:get(ack, Meta, undefined),
maps:get(prefetch, Meta, undefined),
- IsSingleActiveConsumerFun({Tag, Pid}),
+ Active,
+ ActivityStatus,
maps:get(args, Meta, []),
maps:get(username, Meta, undefined)},
Acc)
@@ -761,6 +778,8 @@ query_consumers(#state{consumers = Consumers,
query_single_active_consumer(#state{consumer_strategy = single_active,
consumers = Consumers}) ->
case maps:size(Consumers) of
+ 0 ->
+ {error, no_value};
1 ->
{value, lists:nth(1, maps:keys(Consumers))};
_
@@ -2345,6 +2364,44 @@ query_consumers_test() ->
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
shadow_copy_interval => 0,
+ single_active_consumer_on => false}),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, self()}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ Consumers0 = State1#state.consumers,
+ Consumer = maps:get({<<"ctag2">>, self()}, Consumers0),
+ Consumers1 = maps:put({<<"ctag2">>, self()}, Consumer#consumer{suspected_down = true}, Consumers0),
+ State2 = State1#state{consumers = Consumers1},
+
+ ?assertEqual(4, query_consumer_count(State2)),
+ Consumers2 = query_consumers(State2),
+ ?assertEqual(4, maps:size(Consumers2)),
+ maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ case Tag of
+ <<"ctag2">> ->
+ ?assertNot(Active),
+ ?assertEqual(suspected_down, ActivityStatus);
+ _ ->
+ ?assert(Active),
+ ?assertEqual(up, ActivityStatus)
+ end
+ end, [], Consumers2).
+
+query_consumers_when_single_active_consumer_is_on_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
single_active_consumer_on => true}),
% adding some consumers
@@ -2362,8 +2419,16 @@ query_consumers_test() ->
?assertEqual(4, query_consumer_count(State1)),
Consumers = query_consumers(State1),
?assertEqual(4, maps:size(Consumers)),
- maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _, _}, _Acc) ->
- ?assertEqual(self(), Pid)
+ maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ case Tag of
+ <<"ctag1">> ->
+ ?assert(Active),
+ ?assertEqual(single_active, ActivityStatus);
+ _ ->
+ ?assertNot(Active),
+ ?assertEqual(waiting, ActivityStatus)
+ end
end, [], Consumers).
active_flag_updated_when_consumer_suspected_unsuspected_test() ->
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 704c1a46ae..7a0c0f98e3 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -16,7 +16,7 @@
-module(rabbit_queue_consumers).
--export([new/0, max_active_priority/1, inactive/1, all/1, all/2, count/0,
+-export([new/0, max_active_priority/1, inactive/1, all/1, all/3, count/0,
unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
send_drained/0, deliver/5, record_ack/3, subtract_acks/3,
possibly_unblock/3,
@@ -118,24 +118,32 @@ inactive(#state{consumers = Consumers}) ->
priority_queue:is_empty(Consumers).
all(State) ->
- all(State, none).
-
-all(#state{consumers = Consumers}, SingleActiveConsumer) ->
- lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, Acc) end,
- consumers(Consumers, SingleActiveConsumer, []), all_ch_record()).
-
-consumers(Consumers, SingleActiveConsumer, Acc) ->
+ all(State, none, false).
+
+all(#state{consumers = Consumers}, SingleActiveConsumer, SingleActiveConsumerOn) ->
+ lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) end,
+ consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, []), all_ch_record()).
+
+consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) ->
+ ActiveActivityStatusFun = case SingleActiveConsumerOn of
+ true ->
+ fun({ChPid, Consumer}) ->
+ case SingleActiveConsumer of
+ {ChPid, Consumer} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end
+ end;
+ false ->
+ fun(_) -> {true, up} end
+ end,
priority_queue:fold(
fun ({ChPid, Consumer}, _P, Acc1) ->
#consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch,
args = Args, user = Username} = Consumer,
- IsSingleActive = case SingleActiveConsumer of
- {ChPid, Consumer} ->
- true;
- _ ->
- false
- end,
- [{ChPid, CTag, Ack, Prefetch, IsSingleActive, Args, Username} | Acc1]
+ {Active, ActivityStatus} = ActiveActivityStatusFun({ChPid, Consumer}),
+ [{ChPid, CTag, Ack, Prefetch, Active, ActivityStatus, Args, Username} | Acc1]
end, Acc, Consumers).
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
diff --git a/test/unit_queue_consumers_SUITE.erl b/test/unit_queue_consumers_SUITE.erl
index 08b2da4574..39838b0874 100644
--- a/test/unit_queue_consumers_SUITE.erl
+++ b/test/unit_queue_consumers_SUITE.erl
@@ -25,7 +25,8 @@ all() ->
[
is_same,
get_consumer,
- get
+ get,
+ list_consumers
].
is_same(_Config) ->
@@ -79,6 +80,33 @@ get_consumer(_Config) ->
),
ok.
+list_consumers(_Config) ->
+ State = state(consumers([consumer(self(), <<"1">>), consumer(self(), <<"2">>), consumer(self(), <<"3">>)])),
+ Consumer = rabbit_queue_consumers:get_consumer(State),
+ {_Pid, ConsumerRecord} = Consumer,
+ CTag = rabbit_queue_consumers:consumer_tag(ConsumerRecord),
+ ConsumersWithSingleActive = rabbit_queue_consumers:all(State, Consumer, true),
+ ?assertEqual(3, length(ConsumersWithSingleActive)),
+ lists:foldl(fun({Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ case Tag of
+ CTag ->
+ ?assert(Active),
+ ?assertEqual(single_active, ActivityStatus);
+ _ ->
+ ?assertNot(Active),
+ ?assertEqual(waiting, ActivityStatus)
+ end
+ end, [], ConsumersWithSingleActive),
+ ConsumersNoSingleActive = rabbit_queue_consumers:all(State, none, false),
+ ?assertEqual(3, length(ConsumersNoSingleActive)),
+ lists:foldl(fun({Pid, _, _, _, Active, ActivityStatus, _, _}, _Acc) ->
+ ?assertEqual(self(), Pid),
+ ?assert(Active),
+ ?assertEqual(up, ActivityStatus)
+ end, [], ConsumersNoSingleActive),
+ ok.
+
consumers([]) ->
priority_queue:new();
consumers(Consumers) ->