diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_core_metrics_gc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 128 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 66 |
6 files changed, 239 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fe73e760b2..66e7cf0a3c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -224,7 +224,7 @@ -define(CONSUMER_INFO_KEYS, [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, - arguments]). + single_active, arguments]). warn_file_limit() -> DurableQueues = find_recoverable_queues(), @@ -958,8 +958,8 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) -> get_queue_consumer_info(Q, ConsumerInfoKeys) -> [lists:zip(ConsumerInfoKeys, [Q#amqqueue.name, ChPid, CTag, - AckRequired, Prefetch, Args]) || - {ChPid, CTag, AckRequired, Prefetch, Args, _} <- consumers(Q)]. + AckRequired, Prefetch, SingleActive, Args]) || + {ChPid, CTag, AckRequired, Prefetch, SingleActive, Args, _} <- consumers(Q)]. stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q); stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index caa4dfe093..af42d68359 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -120,6 +120,8 @@ effective_policy_definition, exclusive_consumer_pid, exclusive_consumer_tag, + single_active_consumer_pid, + single_active_consumer_tag, consumers, consumer_utilisation, memory, @@ -362,7 +364,7 @@ terminate_shutdown(Fun, #q{status = Status} = State) -> QName = qname(State), notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName, ActingUser) || - {Ch, CTag, _, _, _} <- + {Ch, CTag, _, _, _, _, _} <- rabbit_queue_consumers:all(Consumers)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -847,6 +849,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, Holder1 = new_single_active_consumer_after_channel_down(DownPid, Holder, SingleActiveConsumerOn, Consumers1), State2 = State1#q{consumers = Consumers1, active_consumer = Holder1}, + maybe_notify_consumer_updated(State2, Holder, Holder1), notify_decorators(State2), case should_auto_delete(State2) of true -> @@ -864,11 +867,12 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = true, Consumers) -> case CurrentSingleActiveConsumer of {DownChPid, _} -> + % the single active consumer is on the down channel, we have to replace it case rabbit_queue_consumers:get_consumer(Consumers) of undefined -> none; Consumer -> Consumer end; - false -> + _ -> CurrentSingleActiveConsumer end; new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = false, _Consumers) -> @@ -1032,6 +1036,14 @@ i(exclusive_consumer_tag, #q{active_consumer = {_ChPid, ConsumerTag}, single_act ConsumerTag; i(exclusive_consumer_tag, _) -> ''; +i(single_active_consumer_pid, #q{active_consumer = {ChPid, _Consumer}, single_active_consumer_on = true}) -> + ChPid; +i(single_active_consumer_pid, _) -> + ''; +i(single_active_consumer_tag, #q{active_consumer = {_ChPid, Consumer}, single_active_consumer_on = true}) -> + rabbit_queue_consumers:consumer_tag(Consumer); +i(single_active_consumer_tag, _) -> + ''; i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:len(BQS); i(messages_unacknowledged, _) -> @@ -1196,8 +1208,10 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(consumers, _From, State = #q{consumers = Consumers}) -> +handle_call(consumers, _From, State = #q{consumers = Consumers, single_active_consumer_on = false}) -> reply(rabbit_queue_consumers:all(Consumers), State); +handle_call(consumers, _From, State = #q{consumers = Consumers, active_consumer = ActiveConsumer}) -> + reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer), State); handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -1250,7 +1264,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1), {state, State#q{consumers = Consumers1, has_had_consumers = true, - active_consumer = NewConsumer}}; + active_consumer = NewConsumer}}; _ -> {state, State#q{consumers = Consumers1, has_had_consumers = true}} @@ -1271,7 +1285,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, end, {state, State#q{consumers = Consumers1, has_had_consumers = true, - active_consumer = ExclusiveConsumer}} + active_consumer = ExclusiveConsumer}} end end, case ConsumerRegistration of @@ -1281,9 +1295,16 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok = maybe_send_reply(ChPid, OkMsg), QName = qname(State1), AckRequired = not NoAck, + TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers), + IsSingleActiveConsumer = case {SingleActiveConsumerOn, State1#q.active_consumer} of + {true, TheConsumer} -> + true; + _ -> + false + end, rabbit_core_metrics:consumer_created( ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, - PrefetchCount, Args), + PrefetchCount, IsSingleActiveConsumer, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, PrefetchCount, Args, none, ActingUser), @@ -1305,6 +1326,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From, ), State1 = State#q{consumers = Consumers1, active_consumer = Holder1}, + maybe_notify_consumer_updated(State1, Holder, Holder1), emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser), notify_decorators(State1), case should_auto_delete(State1) of @@ -1392,6 +1414,24 @@ new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleA _ -> CurrentSingleActiveConsumer end. +maybe_notify_consumer_updated(#q{single_active_consumer_on = false}, _, _) -> + ok; +maybe_notify_consumer_updated(#q{single_active_consumer_on = true}, SingleActiveConsumer, SingleActiveConsumer) -> + % the single active consumer didn't change, nothing to do + ok; +maybe_notify_consumer_updated(#q{single_active_consumer_on = true} = State, _PreviousConsumer, NewConsumer) -> + case NewConsumer of + {ChPid, Consumer} -> + {Tag, Ack, Prefetch, Args} = rabbit_queue_consumers:get_infos(Consumer), + rabbit_core_metrics:consumer_updated( + ChPid, Tag, false, Ack, qname(State), + Prefetch, true, Args + ), + ok; + _ -> + ok + end. + handle_cast(init, State) -> try init_it({no_barrier, non_clean_shutdown}, none, State) diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index c8b3ad10b8..9c516bda3d 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -160,7 +160,7 @@ gc_process_and_entity(Table, GbSet) -> ({{Pid, Id} = Key, _, _, _, _}, none) when Table == channel_exchange_metrics -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); - ({{Id, Pid, _} = Key, _, _, _, _}, none) + ({{Id, Pid, _} = Key, _, _, _, _, _}, none) when Table == consumer_created -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); ({{{Pid, Id}, _} = Key, _, _, _, _}, none) -> diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index a83f755e7e..8f34c01210 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -43,6 +43,7 @@ query_consumer_count/1, query_consumers/1, query_stat/1, + query_single_active_consumer/1, usage/1, zero/1, @@ -590,11 +591,14 @@ maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_act -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, + waiting_consumers = WaitingConsumers, name = Name, prefix_msg_counts = {0, 0}, become_leader_handler = BLH}) -> % return effects to monitor all current consumers and enqueuers - Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]), + Pids = lists:usort(maps:keys(Enqs) + ++ [P || {_, P} <- maps:keys(Cons)] + ++ [P || {{_, P}, _} <- WaitingConsumers]), Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), @@ -610,9 +614,12 @@ state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts}) %% TODO: remove assertion? exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts}); state_enter(eol, #state{enqueuers = Enqs, - consumers = Custs0}) -> + consumers = Custs0, + waiting_consumers = WaitingConsumers0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), - [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))]; + WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end, #{}, WaitingConsumers0), + AllConsumers = maps:merge(Custs, WaitingConsumers1), + [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, AllConsumers))]; state_enter(_, _) -> %% catch all as not handling all states []. @@ -623,14 +630,13 @@ tick(_Ts, #state{name = Name, queue_resource = QName, messages = Messages, ra_indexes = Indexes, - consumers = Cons, msg_bytes_enqueue = EnqueueBytes, msg_bytes_checkout = CheckoutBytes} = State) -> Metrics = {Name, maps:size(Messages), % Ready num_checked_out(State), % checked out rabbit_fifo_index:size(Indexes), %% Total - maps:size(Cons), % Consumers + query_consumer_count(State), % Consumers EnqueueBytes, CheckoutBytes}, [{mod_call, rabbit_quorum_queue, @@ -705,11 +711,21 @@ query_ra_indexes(#state{ra_indexes = RaIndexes}) -> query_consumer_count(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> maps:size(Consumers) + length(WaitingConsumers). -query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> +query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers} = State) -> + SingleActiveConsumer = query_single_active_consumer(State), + IsSingleActiveConsumerFun = fun({Tag, Pid} = _ConsumerId) -> + case SingleActiveConsumer of + {value, {Tag, Pid}} -> + true; + _ -> + false + end + end, FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) -> {Pid, Tag, maps:get(ack, Meta, undefined), maps:get(prefetch, Meta, undefined), + IsSingleActiveConsumerFun({Tag, Pid}), maps:get(args, Meta, []), maps:get(username, Meta, undefined)} end, Consumers), @@ -718,12 +734,24 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume {Pid, Tag, maps:get(ack, Meta, undefined), maps:get(prefetch, Meta, undefined), + IsSingleActiveConsumerFun({Tag, Pid}), maps:get(args, Meta, []), maps:get(username, Meta, undefined)}, Acc) end, #{}, WaitingConsumers), maps:merge(FromConsumers, FromWaitingConsumers). +query_single_active_consumer(#state{consumer_strategy = single_active, consumers = Consumers}) -> + case maps:size(Consumers) of + 1 -> + {value, lists:nth(1, maps:keys(Consumers))}; + _ + -> + {error, illegal_size} + end ; +query_single_active_consumer(_) -> + disabled. + query_stat(#state{messages = M, consumers = Consumers}) -> {maps:size(M), maps:size(Consumers)}. @@ -797,7 +825,8 @@ cancel_consumer(ConsumerId, State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer}, service_queue = ServiceQueue1, waiting_consumers = RemainingWaitingConsumers}, - {Effects, State1}; + Effects1 = consumer_promoted_to_single_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, Effects), + {Effects1, State1}; error -> % The cancelled consumer is not the active one % Just remove it from idle_consumers @@ -807,6 +836,13 @@ cancel_consumer(ConsumerId, {Effects, State0#state{waiting_consumers = WaitingConsumers1}} end. +consumer_promoted_to_single_active_effects(#state{consumer_strategy = single_active, + queue_resource = QName }, + ConsumerId, #consumer{meta = Meta}, Effects) -> + [{mod_call, rabbit_quorum_queue, update_consumer_handler, + [QName, ConsumerId, false, maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), true, maps:get(args, Meta, [])]} | Effects]. + cancel_consumer0(ConsumerId, {Effects0, #state{consumers = C0} = S0}) -> case maps:take(ConsumerId, C0) of @@ -1372,6 +1408,8 @@ test_init(Name) -> atom_to_binary(Name, utf8)), shadow_copy_interval => 0}). +% To launch these tests: make eunit EUNIT_MODS="rabbit_fifo" + enq_enq_checkout_test() -> Cid = {<<"enq_enq_checkout_test">>, self()}, {State1, _} = enq(1, 1, first, test_init(test)), @@ -2076,8 +2114,8 @@ single_active_consumer_test() -> % the new active consumer is no longer in the waiting list ?assertEqual(1, length(State3#state.waiting_consumers)), ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)), - % there are some effects to unregister the consumer - ?assertEqual(1, length(Effects2)), + % there are some effects to unregister the consumer and to update the new active one (metrics) + ?assertEqual(2, length(Effects2)), % cancelling the active consumer {State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3), @@ -2086,8 +2124,8 @@ single_active_consumer_test() -> ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)), % the waiting consumer list is now empty ?assertEqual(0, length(State4#state.waiting_consumers)), - % there are some effects to unregister the consumer - ?assertEqual(1, length(Effects3)), + % there are some effects to unregister the consumer and to update the new active one (metrics) + ?assertEqual(2, length(Effects3)), % cancelling the last consumer {State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4), @@ -2131,8 +2169,8 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() -> ?assertEqual(1, map_size(State2#state.consumers)), % there are still waiting consumers ?assertEqual(2, length(State2#state.waiting_consumers)), - % the effect to unregister the consumer is there - ?assertEqual(1, length(Effects)), + % effects to unregister the consumer and to update the new active one (metrics) are there + ?assertEqual(2, length(Effects)), % the channel of the active consumer and a waiting consumer goes down {State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2), @@ -2140,8 +2178,8 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() -> ?assertEqual(1, map_size(State3#state.consumers)), % no more waiting consumer ?assertEqual(0, length(State3#state.waiting_consumers)), - % effects to cancel both consumers of this channel - ?assertEqual(2, length(Effects2)), + % effects to cancel both consumers of this channel + effect to update the new active one (metrics) + ?assertEqual(3, length(Effects2)), % the last channel goes down {State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3), @@ -2192,6 +2230,64 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti ok. +single_active_consumer_state_enter_leader_include_waiting_consumers_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, ChannelId}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + Effects = state_enter(leader, State1), + % 2 effects for each consumer process (channel process), 1 effect for the node + ?assertEqual(2 * 3 + 1, length(Effects)). + +single_active_consumer_state_enter_eol_include_waiting_consumers_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, ChannelId}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + Effects = state_enter(eol, State1), + % 1 effect for each consumer process (channel process) + ?assertEqual(3, length(Effects)). + query_consumers_test() -> State0 = init(#{name => ?FUNCTION_NAME, queue_resource => rabbit_misc:r("/", queue, @@ -2214,7 +2310,7 @@ query_consumers_test() -> ?assertEqual(4, query_consumer_count(State1)), Consumers = query_consumers(State1), ?assertEqual(4, maps:size(Consumers)), - maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _}, _Acc) -> + maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _, _}, _Acc) -> ?assertEqual(self(), Pid) end, [], Consumers). diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index e743fbce18..704c1a46ae 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -16,13 +16,13 @@ -module(rabbit_queue_consumers). --export([new/0, max_active_priority/1, inactive/1, all/1, count/0, +-export([new/0, max_active_priority/1, inactive/1, all/1, all/2, count/0, unacknowledged_message_count/0, add/10, remove/3, erase_ch/2, send_drained/0, deliver/5, record_ack/3, subtract_acks/3, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit/6, utilisation/1, is_same/3, get_consumer/1, get/3, - consumer_tag/1]). + consumer_tag/1, get_infos/1]). %%---------------------------------------------------------------------------- @@ -100,7 +100,9 @@ -spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(), state()) -> 'unchanged' | {'unblocked', state()}. -spec utilisation(state()) -> ratio(). +-spec get(ch(), rabbit_types:ctag(), state()) -> undefined | consumer(). -spec consumer_tag(consumer()) -> rabbit_types:ctag(). +-spec get_infos(consumer()) -> term(). %%---------------------------------------------------------------------------- @@ -115,16 +117,25 @@ max_active_priority(#state{consumers = Consumers}) -> inactive(#state{consumers = Consumers}) -> priority_queue:is_empty(Consumers). -all(#state{consumers = Consumers}) -> - lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, - consumers(Consumers, []), all_ch_record()). +all(State) -> + all(State, none). -consumers(Consumers, Acc) -> +all(#state{consumers = Consumers}, SingleActiveConsumer) -> + lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, Acc) end, + consumers(Consumers, SingleActiveConsumer, []), all_ch_record()). + +consumers(Consumers, SingleActiveConsumer, Acc) -> priority_queue:fold( fun ({ChPid, Consumer}, _P, Acc1) -> #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch, args = Args, user = Username} = Consumer, - [{ChPid, CTag, Ack, Prefetch, Args, Username} | Acc1] + IsSingleActive = case SingleActiveConsumer of + {ChPid, Consumer} -> + true; + _ -> + false + end, + [{ChPid, CTag, Ack, Prefetch, IsSingleActive, Args, Username} | Acc1] end, Acc, Consumers). count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). @@ -411,9 +422,15 @@ get(ChPid, ConsumerTag, #state{consumers = Consumers}) -> {{value, Consumer, _Priority}, _Tail} -> Consumer end. +get_infos(Consumer) -> + {Consumer#consumer.tag,Consumer#consumer.ack_required, + Consumer#consumer.prefetch, Consumer#consumer.args}. + consumer_tag(#consumer{tag = CTag}) -> CTag. + + %%---------------------------------------------------------------------------- parse_credit_args(Default, Args) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index deeff4194a..5c0d1c0070 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -26,6 +26,7 @@ -export([dead_letter_publish/4]). -export([queue_name/1]). -export([cluster_state/1, status/2]). +-export([update_consumer_handler/7, update_consumer/8]). -export([cancel_consumer_handler/2, cancel_consumer/3]). -export([become_leader/2, update_metrics/2]). -export([rpc_delete_metrics/1]). @@ -76,7 +77,9 @@ leader, online, members, - open_files + open_files, + single_active_consumer_pid, + single_active_consumer_ctag ]). -define(TICK_TIME, 1000). %% the ra server tick time @@ -161,17 +164,16 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) -> _ -> false end. +update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, SingleActive, Args) -> + local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer, + [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args]). + +update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args) -> + catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired, + QName, Prefetch, SingleActive, Args). + cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> - Node = node(ChPid), - case Node == node() of - true -> cancel_consumer(QName, ChPid, ConsumerTag); - false -> - %% this could potentially block for a while if the node is - %% in disconnected state or tcp buffers are full - rpc:cast(Node, rabbit_quorum_queue, - cancel_consumer, - [QName, ChPid, ConsumerTag]) - end. + local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]). cancel_consumer(QName, ChPid, ConsumerTag) -> catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), @@ -181,6 +183,17 @@ cancel_consumer(QName, ChPid, ConsumerTag) -> {queue, QName}, {user_who_performed_action, ?INTERNAL_USER}]). +local_or_remote_handler(ChPid, Module, Function, Args) -> + Node = node(ChPid), + case Node == node() of + true -> + erlang:apply(Module, Function, Args); + false -> + %% this could potentially block for a while if the node is + %% in disconnected state or tcp buffers are full + rpc:cast(Node, Module, Function, Args) + end. + become_leader(QName, Name) -> Fun = fun(Q1) -> Q1#amqqueue{pid = {Name, node()}, @@ -401,7 +414,7 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, Args :: rabbit_framing:amqp_table(), ActingUser :: binary(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid, +basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid, ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args, ActingUser, OkMsg, QState0) -> %% TODO: validate consumer arguments @@ -423,10 +436,19 @@ basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid, Prefetch, ConsumerMeta, QState0), + {ok, {_, SacResult}, _} = ra:local_query(QPid, + fun rabbit_fifo:query_single_active_consumer/1), + IsSingleActiveConsumer = case SacResult of + {value, {ConsumerTag, ChPid}} -> + true; + _ -> + false + end, + %% TODO: emit as rabbit_fifo effect rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, QName, - ConsumerPrefetchCount, Args), + ConsumerPrefetchCount, IsSingleActiveConsumer, Args), {ok, QState}. basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) -> @@ -740,6 +762,24 @@ i(open_files, #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}) -> {Data, _} = rpc:multicall(Nodes, rabbit_quorum_queue, open_files, [Name]), lists:flatten(Data); +i(single_active_consumer_pid, #amqqueue{pid = QPid}) -> + {ok, {_, SacResult}, _} = ra:local_query(QPid, + fun rabbit_fifo:query_single_active_consumer/1), + case SacResult of + {value, {_ConsumerTag, ChPid}} -> + ChPid; + _ -> + '' + end; +i(single_active_consumer_ctag, #amqqueue{pid = QPid}) -> + {ok, {_, SacResult}, _} = ra:local_query(QPid, + fun rabbit_fifo:query_single_active_consumer/1), + case SacResult of + {value, {ConsumerTag, _ChPid}} -> + ConsumerTag; + _ -> + '' + end; i(_K, _Q) -> ''. open_files(Name) -> |
