summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl52
-rw-r--r--src/rabbit_core_metrics_gc.erl2
-rw-r--r--src/rabbit_fifo.erl128
-rw-r--r--src/rabbit_queue_consumers.erl31
-rw-r--r--src/rabbit_quorum_queue.erl66
6 files changed, 239 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fe73e760b2..66e7cf0a3c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -224,7 +224,7 @@
-define(CONSUMER_INFO_KEYS,
[queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
- arguments]).
+ single_active, arguments]).
warn_file_limit() ->
DurableQueues = find_recoverable_queues(),
@@ -958,8 +958,8 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
get_queue_consumer_info(Q, ConsumerInfoKeys) ->
[lists:zip(ConsumerInfoKeys,
[Q#amqqueue.name, ChPid, CTag,
- AckRequired, Prefetch, Args]) ||
- {ChPid, CTag, AckRequired, Prefetch, Args, _} <- consumers(Q)].
+ AckRequired, Prefetch, SingleActive, Args]) ||
+ {ChPid, CTag, AckRequired, Prefetch, SingleActive, Args, _} <- consumers(Q)].
stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q);
stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index caa4dfe093..af42d68359 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -120,6 +120,8 @@
effective_policy_definition,
exclusive_consumer_pid,
exclusive_consumer_tag,
+ single_active_consumer_pid,
+ single_active_consumer_tag,
consumers,
consumer_utilisation,
memory,
@@ -362,7 +364,7 @@ terminate_shutdown(Fun, #q{status = Status} = State) ->
QName = qname(State),
notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName, ActingUser) ||
- {Ch, CTag, _, _, _} <-
+ {Ch, CTag, _, _, _, _, _} <-
rabbit_queue_consumers:all(Consumers)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -847,6 +849,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
Holder1 = new_single_active_consumer_after_channel_down(DownPid, Holder, SingleActiveConsumerOn, Consumers1),
State2 = State1#q{consumers = Consumers1,
active_consumer = Holder1},
+ maybe_notify_consumer_updated(State2, Holder, Holder1),
notify_decorators(State2),
case should_auto_delete(State2) of
true ->
@@ -864,11 +867,12 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = true, Consumers) ->
case CurrentSingleActiveConsumer of
{DownChPid, _} ->
+ % the single active consumer is on the down channel, we have to replace it
case rabbit_queue_consumers:get_consumer(Consumers) of
undefined -> none;
Consumer -> Consumer
end;
- false ->
+ _ ->
CurrentSingleActiveConsumer
end;
new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = false, _Consumers) ->
@@ -1032,6 +1036,14 @@ i(exclusive_consumer_tag, #q{active_consumer = {_ChPid, ConsumerTag}, single_act
ConsumerTag;
i(exclusive_consumer_tag, _) ->
'';
+i(single_active_consumer_pid, #q{active_consumer = {ChPid, _Consumer}, single_active_consumer_on = true}) ->
+ ChPid;
+i(single_active_consumer_pid, _) ->
+ '';
+i(single_active_consumer_tag, #q{active_consumer = {_ChPid, Consumer}, single_active_consumer_on = true}) ->
+ rabbit_queue_consumers:consumer_tag(Consumer);
+i(single_active_consumer_tag, _) ->
+ '';
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
@@ -1196,8 +1208,10 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(consumers, _From, State = #q{consumers = Consumers}) ->
+handle_call(consumers, _From, State = #q{consumers = Consumers, single_active_consumer_on = false}) ->
reply(rabbit_queue_consumers:all(Consumers), State);
+handle_call(consumers, _From, State = #q{consumers = Consumers, active_consumer = ActiveConsumer}) ->
+ reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer), State);
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -1250,7 +1264,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
{state, State#q{consumers = Consumers1,
has_had_consumers = true,
- active_consumer = NewConsumer}};
+ active_consumer = NewConsumer}};
_ ->
{state, State#q{consumers = Consumers1,
has_had_consumers = true}}
@@ -1271,7 +1285,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
end,
{state, State#q{consumers = Consumers1,
has_had_consumers = true,
- active_consumer = ExclusiveConsumer}}
+ active_consumer = ExclusiveConsumer}}
end
end,
case ConsumerRegistration of
@@ -1281,9 +1295,16 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok = maybe_send_reply(ChPid, OkMsg),
QName = qname(State1),
AckRequired = not NoAck,
+ TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers),
+ IsSingleActiveConsumer = case {SingleActiveConsumerOn, State1#q.active_consumer} of
+ {true, TheConsumer} ->
+ true;
+ _ ->
+ false
+ end,
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
- PrefetchCount, Args),
+ PrefetchCount, IsSingleActiveConsumer, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, PrefetchCount,
Args, none, ActingUser),
@@ -1305,6 +1326,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From,
),
State1 = State#q{consumers = Consumers1,
active_consumer = Holder1},
+ maybe_notify_consumer_updated(State1, Holder, Holder1),
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser),
notify_decorators(State1),
case should_auto_delete(State1) of
@@ -1392,6 +1414,24 @@ new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleA
_ -> CurrentSingleActiveConsumer
end.
+maybe_notify_consumer_updated(#q{single_active_consumer_on = false}, _, _) ->
+ ok;
+maybe_notify_consumer_updated(#q{single_active_consumer_on = true}, SingleActiveConsumer, SingleActiveConsumer) ->
+ % the single active consumer didn't change, nothing to do
+ ok;
+maybe_notify_consumer_updated(#q{single_active_consumer_on = true} = State, _PreviousConsumer, NewConsumer) ->
+ case NewConsumer of
+ {ChPid, Consumer} ->
+ {Tag, Ack, Prefetch, Args} = rabbit_queue_consumers:get_infos(Consumer),
+ rabbit_core_metrics:consumer_updated(
+ ChPid, Tag, false, Ack, qname(State),
+ Prefetch, true, Args
+ ),
+ ok;
+ _ ->
+ ok
+ end.
+
handle_cast(init, State) ->
try
init_it({no_barrier, non_clean_shutdown}, none, State)
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index c8b3ad10b8..9c516bda3d 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -160,7 +160,7 @@ gc_process_and_entity(Table, GbSet) ->
({{Pid, Id} = Key, _, _, _, _}, none)
when Table == channel_exchange_metrics ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
- ({{Id, Pid, _} = Key, _, _, _, _}, none)
+ ({{Id, Pid, _} = Key, _, _, _, _, _}, none)
when Table == consumer_created ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index a83f755e7e..8f34c01210 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -43,6 +43,7 @@
query_consumer_count/1,
query_consumers/1,
query_stat/1,
+ query_single_active_consumer/1,
usage/1,
zero/1,
@@ -590,11 +591,14 @@ maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_act
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
+ waiting_consumers = WaitingConsumers,
name = Name,
prefix_msg_counts = {0, 0},
become_leader_handler = BLH}) ->
% return effects to monitor all current consumers and enqueuers
- Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]),
+ Pids = lists:usort(maps:keys(Enqs)
+ ++ [P || {_, P} <- maps:keys(Cons)]
+ ++ [P || {{_, P}, _} <- WaitingConsumers]),
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
@@ -610,9 +614,12 @@ state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
%% TODO: remove assertion?
exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
state_enter(eol, #state{enqueuers = Enqs,
- consumers = Custs0}) ->
+ consumers = Custs0,
+ waiting_consumers = WaitingConsumers0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
- [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))];
+ WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end, #{}, WaitingConsumers0),
+ AllConsumers = maps:merge(Custs, WaitingConsumers1),
+ [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, AllConsumers))];
state_enter(_, _) ->
%% catch all as not handling all states
[].
@@ -623,14 +630,13 @@ tick(_Ts, #state{name = Name,
queue_resource = QName,
messages = Messages,
ra_indexes = Indexes,
- consumers = Cons,
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {Name,
maps:size(Messages), % Ready
num_checked_out(State), % checked out
rabbit_fifo_index:size(Indexes), %% Total
- maps:size(Cons), % Consumers
+ query_consumer_count(State), % Consumers
EnqueueBytes,
CheckoutBytes},
[{mod_call, rabbit_quorum_queue,
@@ -705,11 +711,21 @@ query_ra_indexes(#state{ra_indexes = RaIndexes}) ->
query_consumer_count(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) ->
maps:size(Consumers) + length(WaitingConsumers).
-query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) ->
+query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers} = State) ->
+ SingleActiveConsumer = query_single_active_consumer(State),
+ IsSingleActiveConsumerFun = fun({Tag, Pid} = _ConsumerId) ->
+ case SingleActiveConsumer of
+ {value, {Tag, Pid}} ->
+ true;
+ _ ->
+ false
+ end
+ end,
FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) ->
{Pid, Tag,
maps:get(ack, Meta, undefined),
maps:get(prefetch, Meta, undefined),
+ IsSingleActiveConsumerFun({Tag, Pid}),
maps:get(args, Meta, []),
maps:get(username, Meta, undefined)}
end, Consumers),
@@ -718,12 +734,24 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume
{Pid, Tag,
maps:get(ack, Meta, undefined),
maps:get(prefetch, Meta, undefined),
+ IsSingleActiveConsumerFun({Tag, Pid}),
maps:get(args, Meta, []),
maps:get(username, Meta, undefined)},
Acc)
end, #{}, WaitingConsumers),
maps:merge(FromConsumers, FromWaitingConsumers).
+query_single_active_consumer(#state{consumer_strategy = single_active, consumers = Consumers}) ->
+ case maps:size(Consumers) of
+ 1 ->
+ {value, lists:nth(1, maps:keys(Consumers))};
+ _
+ ->
+ {error, illegal_size}
+ end ;
+query_single_active_consumer(_) ->
+ disabled.
+
query_stat(#state{messages = M,
consumers = Consumers}) ->
{maps:size(M), maps:size(Consumers)}.
@@ -797,7 +825,8 @@ cancel_consumer(ConsumerId,
State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer},
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
- {Effects, State1};
+ Effects1 = consumer_promoted_to_single_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, Effects),
+ {Effects1, State1};
error ->
% The cancelled consumer is not the active one
% Just remove it from idle_consumers
@@ -807,6 +836,13 @@ cancel_consumer(ConsumerId,
{Effects, State0#state{waiting_consumers = WaitingConsumers1}}
end.
+consumer_promoted_to_single_active_effects(#state{consumer_strategy = single_active,
+ queue_resource = QName },
+ ConsumerId, #consumer{meta = Meta}, Effects) ->
+ [{mod_call, rabbit_quorum_queue, update_consumer_handler,
+ [QName, ConsumerId, false, maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined), true, maps:get(args, Meta, [])]} | Effects].
+
cancel_consumer0(ConsumerId,
{Effects0, #state{consumers = C0} = S0}) ->
case maps:take(ConsumerId, C0) of
@@ -1372,6 +1408,8 @@ test_init(Name) ->
atom_to_binary(Name, utf8)),
shadow_copy_interval => 0}).
+% To launch these tests: make eunit EUNIT_MODS="rabbit_fifo"
+
enq_enq_checkout_test() ->
Cid = {<<"enq_enq_checkout_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
@@ -2076,8 +2114,8 @@ single_active_consumer_test() ->
% the new active consumer is no longer in the waiting list
?assertEqual(1, length(State3#state.waiting_consumers)),
?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)),
- % there are some effects to unregister the consumer
- ?assertEqual(1, length(Effects2)),
+ % there are some effects to unregister the consumer and to update the new active one (metrics)
+ ?assertEqual(2, length(Effects2)),
% cancelling the active consumer
{State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
@@ -2086,8 +2124,8 @@ single_active_consumer_test() ->
?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
% the waiting consumer list is now empty
?assertEqual(0, length(State4#state.waiting_consumers)),
- % there are some effects to unregister the consumer
- ?assertEqual(1, length(Effects3)),
+ % there are some effects to unregister the consumer and to update the new active one (metrics)
+ ?assertEqual(2, length(Effects3)),
% cancelling the last consumer
{State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
@@ -2131,8 +2169,8 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
?assertEqual(1, map_size(State2#state.consumers)),
% there are still waiting consumers
?assertEqual(2, length(State2#state.waiting_consumers)),
- % the effect to unregister the consumer is there
- ?assertEqual(1, length(Effects)),
+ % effects to unregister the consumer and to update the new active one (metrics) are there
+ ?assertEqual(2, length(Effects)),
% the channel of the active consumer and a waiting consumer goes down
{State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2),
@@ -2140,8 +2178,8 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
?assertEqual(1, map_size(State3#state.consumers)),
% no more waiting consumer
?assertEqual(0, length(State3#state.waiting_consumers)),
- % effects to cancel both consumers of this channel
- ?assertEqual(2, length(Effects2)),
+ % effects to cancel both consumers of this channel + effect to update the new active one (metrics)
+ ?assertEqual(3, length(Effects2)),
% the last channel goes down
{State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3),
@@ -2192,6 +2230,64 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
ok.
+single_active_consumer_state_enter_leader_include_waiting_consumers_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, ChannelId}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ Effects = state_enter(leader, State1),
+ % 2 effects for each consumer process (channel process), 1 effect for the node
+ ?assertEqual(2 * 3 + 1, length(Effects)).
+
+single_active_consumer_state_enter_eol_include_waiting_consumers_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, ChannelId}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ Effects = state_enter(eol, State1),
+ % 1 effect for each consumer process (channel process)
+ ?assertEqual(3, length(Effects)).
+
query_consumers_test() ->
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
@@ -2214,7 +2310,7 @@ query_consumers_test() ->
?assertEqual(4, query_consumer_count(State1)),
Consumers = query_consumers(State1),
?assertEqual(4, maps:size(Consumers)),
- maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _}, _Acc) ->
+ maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _, _}, _Acc) ->
?assertEqual(self(), Pid)
end, [], Consumers).
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index e743fbce18..704c1a46ae 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -16,13 +16,13 @@
-module(rabbit_queue_consumers).
--export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
+-export([new/0, max_active_priority/1, inactive/1, all/1, all/2, count/0,
unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
send_drained/0, deliver/5, record_ack/3, subtract_acks/3,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
credit/6, utilisation/1, is_same/3, get_consumer/1, get/3,
- consumer_tag/1]).
+ consumer_tag/1, get_infos/1]).
%%----------------------------------------------------------------------------
@@ -100,7 +100,9 @@
-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
state()) -> 'unchanged' | {'unblocked', state()}.
-spec utilisation(state()) -> ratio().
+-spec get(ch(), rabbit_types:ctag(), state()) -> undefined | consumer().
-spec consumer_tag(consumer()) -> rabbit_types:ctag().
+-spec get_infos(consumer()) -> term().
%%----------------------------------------------------------------------------
@@ -115,16 +117,25 @@ max_active_priority(#state{consumers = Consumers}) ->
inactive(#state{consumers = Consumers}) ->
priority_queue:is_empty(Consumers).
-all(#state{consumers = Consumers}) ->
- lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
- consumers(Consumers, []), all_ch_record()).
+all(State) ->
+ all(State, none).
-consumers(Consumers, Acc) ->
+all(#state{consumers = Consumers}, SingleActiveConsumer) ->
+ lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, Acc) end,
+ consumers(Consumers, SingleActiveConsumer, []), all_ch_record()).
+
+consumers(Consumers, SingleActiveConsumer, Acc) ->
priority_queue:fold(
fun ({ChPid, Consumer}, _P, Acc1) ->
#consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch,
args = Args, user = Username} = Consumer,
- [{ChPid, CTag, Ack, Prefetch, Args, Username} | Acc1]
+ IsSingleActive = case SingleActiveConsumer of
+ {ChPid, Consumer} ->
+ true;
+ _ ->
+ false
+ end,
+ [{ChPid, CTag, Ack, Prefetch, IsSingleActive, Args, Username} | Acc1]
end, Acc, Consumers).
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
@@ -411,9 +422,15 @@ get(ChPid, ConsumerTag, #state{consumers = Consumers}) ->
{{value, Consumer, _Priority}, _Tail} -> Consumer
end.
+get_infos(Consumer) ->
+ {Consumer#consumer.tag,Consumer#consumer.ack_required,
+ Consumer#consumer.prefetch, Consumer#consumer.args}.
+
consumer_tag(#consumer{tag = CTag}) ->
CTag.
+
+
%%----------------------------------------------------------------------------
parse_credit_args(Default, Args) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index deeff4194a..5c0d1c0070 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -26,6 +26,7 @@
-export([dead_letter_publish/4]).
-export([queue_name/1]).
-export([cluster_state/1, status/2]).
+-export([update_consumer_handler/7, update_consumer/8]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
-export([become_leader/2, update_metrics/2]).
-export([rpc_delete_metrics/1]).
@@ -76,7 +77,9 @@
leader,
online,
members,
- open_files
+ open_files,
+ single_active_consumer_pid,
+ single_active_consumer_ctag
]).
-define(TICK_TIME, 1000). %% the ra server tick time
@@ -161,17 +164,16 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
_ -> false
end.
+update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, SingleActive, Args) ->
+ local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
+ [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args]).
+
+update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args) ->
+ catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired,
+ QName, Prefetch, SingleActive, Args).
+
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
- Node = node(ChPid),
- case Node == node() of
- true -> cancel_consumer(QName, ChPid, ConsumerTag);
- false ->
- %% this could potentially block for a while if the node is
- %% in disconnected state or tcp buffers are full
- rpc:cast(Node, rabbit_quorum_queue,
- cancel_consumer,
- [QName, ChPid, ConsumerTag])
- end.
+ local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]).
cancel_consumer(QName, ChPid, ConsumerTag) ->
catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
@@ -181,6 +183,17 @@ cancel_consumer(QName, ChPid, ConsumerTag) ->
{queue, QName},
{user_who_performed_action, ?INTERNAL_USER}]).
+local_or_remote_handler(ChPid, Module, Function, Args) ->
+ Node = node(ChPid),
+ case Node == node() of
+ true ->
+ erlang:apply(Module, Function, Args);
+ false ->
+ %% this could potentially block for a while if the node is
+ %% in disconnected state or tcp buffers are full
+ rpc:cast(Node, Module, Function, Args)
+ end.
+
become_leader(QName, Name) ->
Fun = fun(Q1) ->
Q1#amqqueue{pid = {Name, node()},
@@ -401,7 +414,7 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
Args :: rabbit_framing:amqp_table(), ActingUser :: binary(),
any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid,
+basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid,
ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args,
ActingUser, OkMsg, QState0) ->
%% TODO: validate consumer arguments
@@ -423,10 +436,19 @@ basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid,
Prefetch,
ConsumerMeta,
QState0),
+ {ok, {_, SacResult}, _} = ra:local_query(QPid,
+ fun rabbit_fifo:query_single_active_consumer/1),
+ IsSingleActiveConsumer = case SacResult of
+ {value, {ConsumerTag, ChPid}} ->
+ true;
+ _ ->
+ false
+ end,
+
%% TODO: emit as rabbit_fifo effect
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, QName,
- ConsumerPrefetchCount, Args),
+ ConsumerPrefetchCount, IsSingleActiveConsumer, Args),
{ok, QState}.
basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) ->
@@ -740,6 +762,24 @@ i(open_files, #amqqueue{pid = {Name, _},
quorum_nodes = Nodes}) ->
{Data, _} = rpc:multicall(Nodes, rabbit_quorum_queue, open_files, [Name]),
lists:flatten(Data);
+i(single_active_consumer_pid, #amqqueue{pid = QPid}) ->
+ {ok, {_, SacResult}, _} = ra:local_query(QPid,
+ fun rabbit_fifo:query_single_active_consumer/1),
+ case SacResult of
+ {value, {_ConsumerTag, ChPid}} ->
+ ChPid;
+ _ ->
+ ''
+ end;
+i(single_active_consumer_ctag, #amqqueue{pid = QPid}) ->
+ {ok, {_, SacResult}, _} = ra:local_query(QPid,
+ fun rabbit_fifo:query_single_active_consumer/1),
+ case SacResult of
+ {value, {ConsumerTag, _ChPid}} ->
+ ConsumerTag;
+ _ ->
+ ''
+ end;
i(_K, _Q) -> ''.
open_files(Name) ->