diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_core_metrics_gc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 66 |
5 files changed, 122 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b1cac7d3a1..84428dd572 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -857,6 +857,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, Holder1 = new_single_active_consumer_after_channel_down(DownPid, Holder, SingleActiveConsumerOn, Consumers1), State2 = State1#q{consumers = Consumers1, active_consumer = Holder1}, + maybe_notify_consumer_updated(State2, Holder, Holder1), notify_decorators(State2), case should_auto_delete(State2) of true -> @@ -874,11 +875,12 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = true, Consumers) -> case CurrentSingleActiveConsumer of {DownChPid, _} -> + % the single active consumer is on the down channel, we have to replace it case rabbit_queue_consumers:get_consumer(Consumers) of undefined -> none; Consumer -> Consumer end; - false -> + _ -> CurrentSingleActiveConsumer end; new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = false, _Consumers) -> @@ -1268,7 +1270,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1), {state, State#q{consumers = Consumers1, has_had_consumers = true, - active_consumer = NewConsumer}}; + active_consumer = NewConsumer}}; _ -> {state, State#q{consumers = Consumers1, has_had_consumers = true}} @@ -1289,7 +1291,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, end, {state, State#q{consumers = Consumers1, has_had_consumers = true, - active_consumer = ExclusiveConsumer}} + active_consumer = ExclusiveConsumer}} end end, case ConsumerRegistration of @@ -1299,9 +1301,16 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok = maybe_send_reply(ChPid, OkMsg), QName = qname(State1), AckRequired = not NoAck, + TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers), + IsSingleActiveConsumer = case {SingleActiveConsumerOn, State1#q.active_consumer} of + {true, TheConsumer} -> + true; + _ -> + false + end, rabbit_core_metrics:consumer_created( ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, - PrefetchCount, Args), + PrefetchCount, IsSingleActiveConsumer, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, PrefetchCount, Args, none, ActingUser), @@ -1323,6 +1332,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From, ), State1 = State#q{consumers = Consumers1, active_consumer = Holder1}, + maybe_notify_consumer_updated(State1, Holder, Holder1), emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser), notify_decorators(State1), case should_auto_delete(State1) of @@ -1410,6 +1420,24 @@ new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleA _ -> CurrentSingleActiveConsumer end. +maybe_notify_consumer_updated(#q{single_active_consumer_on = false}, _, _) -> + ok; +maybe_notify_consumer_updated(#q{single_active_consumer_on = true}, SingleActiveConsumer, SingleActiveConsumer) -> + % the single active consumer didn't change, nothing to do + ok; +maybe_notify_consumer_updated(#q{single_active_consumer_on = true} = State, _PreviousConsumer, NewConsumer) -> + case NewConsumer of + {ChPid, Consumer} -> + {Tag, Ack, Prefetch, Args} = rabbit_queue_consumers:get_infos(Consumer), + rabbit_core_metrics:consumer_updated( + ChPid, Tag, false, Ack, qname(State), + Prefetch, true, Args + ), + ok; + _ -> + ok + end. + handle_cast(init, State) -> try init_it({no_barrier, non_clean_shutdown}, none, State) diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index c8b3ad10b8..9c516bda3d 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -160,7 +160,7 @@ gc_process_and_entity(Table, GbSet) -> ({{Pid, Id} = Key, _, _, _, _}, none) when Table == channel_exchange_metrics -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); - ({{Id, Pid, _} = Key, _, _, _, _}, none) + ({{Id, Pid, _} = Key, _, _, _, _, _}, none) when Table == consumer_created -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); ({{{Pid, Id}, _} = Key, _, _, _, _}, none) -> diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 4fe4d954b9..b7169bf924 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -43,6 +43,7 @@ query_consumer_count/1, query_consumers/1, query_stat/1, + query_single_active_consumer/1, usage/1, zero/1, @@ -622,14 +623,13 @@ tick(_Ts, #state{name = Name, queue_resource = QName, messages = Messages, ra_indexes = Indexes, - consumers = Cons, msg_bytes_enqueue = EnqueueBytes, msg_bytes_checkout = CheckoutBytes} = State) -> Metrics = {Name, maps:size(Messages), % Ready num_checked_out(State), % checked out rabbit_fifo_index:size(Indexes), %% Total - maps:size(Cons), % Consumers + query_consumer_count(State), % Consumers EnqueueBytes, CheckoutBytes}, [{mod_call, rabbit_quorum_queue, @@ -723,6 +723,17 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume end, #{}, WaitingConsumers), maps:merge(FromConsumers, FromWaitingConsumers). +query_single_active_consumer(#state{consumer_strategy = single_active, consumers = Consumers}) -> + case maps:size(Consumers) of + 1 -> + {value, lists:nth(1, maps:keys(Consumers))}; + _ + -> + {error, illegal_size} + end ; +query_single_active_consumer(_) -> + disabled. + query_stat(#state{messages = M, consumers = Consumers}) -> {maps:size(M), maps:size(Consumers)}. @@ -796,7 +807,8 @@ cancel_consumer(ConsumerId, State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer}, service_queue = ServiceQueue1, waiting_consumers = RemainingWaitingConsumers}, - {Effects, State1}; + Effects1 = consumer_promoted_to_single_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, Effects), + {Effects1, State1}; error -> % The cancelled consumer is not the active one % Just remove it from idle_consumers @@ -806,6 +818,13 @@ cancel_consumer(ConsumerId, {Effects, State0#state{waiting_consumers = WaitingConsumers1}} end. +consumer_promoted_to_single_active_effects(#state{consumer_strategy = single_active, + queue_resource = QName }, + ConsumerId, #consumer{meta = Meta}, Effects) -> + [{mod_call, rabbit_quorum_queue, update_consumer_handler, + [QName, ConsumerId, false, maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), true, maps:get(args, Meta, [])]} | Effects]. + cancel_consumer0(ConsumerId, {Effects0, #state{consumers = C0} = S0}) -> case maps:take(ConsumerId, C0) of @@ -2075,8 +2094,8 @@ single_active_consumer_test() -> % the new active consumer is no longer in the waiting list ?assertEqual(1, length(State3#state.waiting_consumers)), ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)), - % there are some effects to unregister the consumer - ?assertEqual(1, length(Effects2)), + % there are some effects to unregister the consumer and to update the new active one (metrics) + ?assertEqual(2, length(Effects2)), % cancelling the active consumer {State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3), @@ -2085,8 +2104,8 @@ single_active_consumer_test() -> ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)), % the waiting consumer list is now empty ?assertEqual(0, length(State4#state.waiting_consumers)), - % there are some effects to unregister the consumer - ?assertEqual(1, length(Effects3)), + % there are some effects to unregister the consumer and to update the new active one (metrics) + ?assertEqual(2, length(Effects3)), % cancelling the last consumer {State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4), @@ -2130,8 +2149,8 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() -> ?assertEqual(1, map_size(State2#state.consumers)), % there are still waiting consumers ?assertEqual(2, length(State2#state.waiting_consumers)), - % the effect to unregister the consumer is there - ?assertEqual(1, length(Effects)), + % effects to unregister the consumer and to update the new active one (metrics) are there + ?assertEqual(2, length(Effects)), % the channel of the active consumer and a waiting consumer goes down {State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2), @@ -2139,8 +2158,8 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() -> ?assertEqual(1, map_size(State3#state.consumers)), % no more waiting consumer ?assertEqual(0, length(State3#state.waiting_consumers)), - % effects to cancel both consumers of this channel - ?assertEqual(2, length(Effects2)), + % effects to cancel both consumers of this channel + effect to update the new active one (metrics) + ?assertEqual(3, length(Effects2)), % the last channel goes down {State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3), diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index e743fbce18..dd086c1af6 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -22,7 +22,7 @@ possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit/6, utilisation/1, is_same/3, get_consumer/1, get/3, - consumer_tag/1]). + consumer_tag/1, get_infos/1]). %%---------------------------------------------------------------------------- @@ -100,7 +100,9 @@ -spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(), state()) -> 'unchanged' | {'unblocked', state()}. -spec utilisation(state()) -> ratio(). +-spec get(ch(), rabbit_types:ctag(), state()) -> undefined | consumer(). -spec consumer_tag(consumer()) -> rabbit_types:ctag(). +-spec get_infos(consumer()) -> term(). %%---------------------------------------------------------------------------- @@ -411,9 +413,15 @@ get(ChPid, ConsumerTag, #state{consumers = Consumers}) -> {{value, Consumer, _Priority}, _Tail} -> Consumer end. +get_infos(Consumer) -> + {Consumer#consumer.tag,Consumer#consumer.ack_required, + Consumer#consumer.prefetch, Consumer#consumer.args}. + consumer_tag(#consumer{tag = CTag}) -> CTag. + + %%---------------------------------------------------------------------------- parse_credit_args(Default, Args) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 9dc9b7ab85..0a3a54b052 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -26,6 +26,7 @@ -export([dead_letter_publish/4]). -export([queue_name/1]). -export([cluster_state/1, status/2]). +-export([update_consumer_handler/7, update_consumer/8]). -export([cancel_consumer_handler/2, cancel_consumer/3]). -export([become_leader/2, update_metrics/2]). -export([rpc_delete_metrics/1]). @@ -162,17 +163,16 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) -> _ -> false end. +update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, SingleActive, Args) -> + local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer, + [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args]). + +update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args) -> + catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired, + QName, Prefetch, SingleActive, Args). + cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> - Node = node(ChPid), - case Node == node() of - true -> cancel_consumer(QName, ChPid, ConsumerTag); - false -> - %% this could potentially block for a while if the node is - %% in disconnected state or tcp buffers are full - rpc:cast(Node, rabbit_quorum_queue, - cancel_consumer, - [QName, ChPid, ConsumerTag]) - end. + local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]). cancel_consumer(QName, ChPid, ConsumerTag) -> catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), @@ -182,6 +182,17 @@ cancel_consumer(QName, ChPid, ConsumerTag) -> {queue, QName}, {user_who_performed_action, ?INTERNAL_USER}]). +local_or_remote_handler(ChPid, Module, Function, Args) -> + Node = node(ChPid), + case Node == node() of + true -> + erlang:apply(Module, Function, Args); + false -> + %% this could potentially block for a while if the node is + %% in disconnected state or tcp buffers are full + rpc:cast(Node, Module, Function, Args) + end. + become_leader(QName, Name) -> Fun = fun(Q1) -> Q1#amqqueue{pid = {Name, node()}, @@ -374,7 +385,7 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, Args :: rabbit_framing:amqp_table(), ActingUser :: binary(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid, +basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid, ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args, ActingUser, OkMsg, QState0) -> %% TODO: validate consumer arguments @@ -396,10 +407,19 @@ basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid, Prefetch, ConsumerMeta, QState0), + {ok, {_, SacResult}, _} = ra:local_query(QPid, + fun rabbit_fifo:query_single_active_consumer/1), + IsSingleActiveConsumer = case SacResult of + {value, {ConsumerTag, ChPid}} -> + true; + _ -> + false + end, + %% TODO: emit as rabbit_fifo effect rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, QName, - ConsumerPrefetchCount, Args), + ConsumerPrefetchCount, IsSingleActiveConsumer, Args), {ok, QState}. basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) -> @@ -713,10 +733,24 @@ i(open_files, #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}) -> {Data, _} = rpc:multicall(Nodes, rabbit_quorum_queue, open_files, [Name]), lists:flatten(Data); -i(single_active_consumer_pid, _Q) -> - ''; -i(single_active_consumer_ctag, _Q) -> - ''; +i(single_active_consumer_pid, #amqqueue{pid = QPid}) -> + {ok, {_, SacResult}, _} = ra:local_query(QPid, + fun rabbit_fifo:query_single_active_consumer/1), + case SacResult of + {value, {_ConsumerTag, ChPid}} -> + ChPid; + _ -> + '' + end; +i(single_active_consumer_ctag, #amqqueue{pid = QPid}) -> + {ok, {_, SacResult}, _} = ra:local_query(QPid, + fun rabbit_fifo:query_single_active_consumer/1), + case SacResult of + {value, {ConsumerTag, _ChPid}} -> + ConsumerTag; + _ -> + '' + end; i(_K, _Q) -> ''. open_files(Name) -> |
