diff options
| author | Daniil Fedotov <hairyhum@gmail.com> | 2019-01-28 20:49:05 +0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-01-28 20:49:05 +0400 |
| commit | dd947c6081e5497e74aa2ba8ff4386971bac69dc (patch) | |
| tree | eff2f2f31d572615bc2453378f3b760d1323bb9f | |
| parent | a4b602567081b28c4bc53ac5995b5c054a305da9 (diff) | |
| parent | dcf663fcf99eeefb1c0de07e7389b47988cfbd92 (diff) | |
| download | rabbitmq-server-git-dd947c6081e5497e74aa2ba8ff4386971bac69dc.tar.gz | |
Merge pull request #1839 from rabbitmq/rabbitmq-server-1838-active-field-for-consumers
Update active flag for consumers
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_core_metrics_gc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 244 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 31 | ||||
| -rw-r--r-- | test/rabbit_core_metrics_gc_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/unit_queue_consumers_SUITE.erl | 30 |
8 files changed, 283 insertions, 93 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 66e7cf0a3c..63d89ff4a7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -224,7 +224,7 @@ -define(CONSUMER_INFO_KEYS, [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, - single_active, arguments]). + active, activity_status, arguments]). warn_file_limit() -> DurableQueues = find_recoverable_queues(), @@ -958,8 +958,8 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) -> get_queue_consumer_info(Q, ConsumerInfoKeys) -> [lists:zip(ConsumerInfoKeys, [Q#amqqueue.name, ChPid, CTag, - AckRequired, Prefetch, SingleActive, Args]) || - {ChPid, CTag, AckRequired, Prefetch, SingleActive, Args, _} <- consumers(Q)]. + AckRequired, Prefetch, Active, ActivityStatus, Args]) || + {ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)]. stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q); stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index af42d68359..5782bc6e23 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -364,7 +364,7 @@ terminate_shutdown(Fun, #q{status = Status} = State) -> QName = qname(State), notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName, ActingUser) || - {Ch, CTag, _, _, _, _, _} <- + {Ch, CTag, _, _, _, _, _, _} <- rabbit_queue_consumers:all(Consumers)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -1211,7 +1211,7 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State = #q{consumers = Consumers, single_active_consumer_on = false}) -> reply(rabbit_queue_consumers:all(Consumers), State); handle_call(consumers, _From, State = #q{consumers = Consumers, active_consumer = ActiveConsumer}) -> - reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer), State); + reply(rabbit_queue_consumers:all(Consumers, ActiveConsumer, true), State); handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -1296,15 +1296,18 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, QName = qname(State1), AckRequired = not NoAck, TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers), - IsSingleActiveConsumer = case {SingleActiveConsumerOn, State1#q.active_consumer} of - {true, TheConsumer} -> - true; - _ -> - false - end, + {ConsumerIsActive, ActivityStatus} = + case {SingleActiveConsumerOn, State1#q.active_consumer} of + {true, TheConsumer} -> + {true, single_active}; + {true, _} -> + {false, waiting}; + {false, _} -> + {true, up} + end, rabbit_core_metrics:consumer_created( ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, - PrefetchCount, IsSingleActiveConsumer, Args), + PrefetchCount, ConsumerIsActive, ActivityStatus, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, PrefetchCount, Args, none, ActingUser), @@ -1425,7 +1428,7 @@ maybe_notify_consumer_updated(#q{single_active_consumer_on = true} = State, _Pre {Tag, Ack, Prefetch, Args} = rabbit_queue_consumers:get_infos(Consumer), rabbit_core_metrics:consumer_updated( ChPid, Tag, false, Ack, qname(State), - Prefetch, true, Args + Prefetch, true, single_active, Args ), ok; _ -> diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index 9c516bda3d..d4c065e64f 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -160,7 +160,7 @@ gc_process_and_entity(Table, GbSet) -> ({{Pid, Id} = Key, _, _, _, _}, none) when Table == channel_exchange_metrics -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); - ({{Id, Pid, _} = Key, _, _, _, _, _}, none) + ({{Id, Pid, _} = Key, _, _, _, _, _, _}, none) when Table == consumer_created -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); ({{{Pid, Id}, _} = Key, _, _, _, _}, none) -> diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index c3b5e66355..612cd52dcc 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -460,22 +460,24 @@ apply(_, {down, ConsumerPid, noconnection}, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> Node = node(ConsumerPid), + ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), % mark all consumers and enqueuers as suspected down % and monitor the node so that we can find out the final state of the % process at some later point - {Cons, State} = maps:fold( - fun({_, P} = K, - #consumer{checked_out = Checked0} = C, - {Co, St0}) when node(P) =:= Node -> - St = return_all(St0, Checked0), - Credit = increase_credit(C, maps:size(Checked0)), - {maps:put(K, C#consumer{suspected_down = true, - credit = Credit, - checked_out = #{}}, Co), - St}; - (K, C, {Co, St}) -> - {maps:put(K, C, Co), St} - end, {#{}, State0}, Cons0), + {Cons, State, Effects1} = maps:fold( + fun({_, P} = K, + #consumer{checked_out = Checked0} = C, + {Co, St0, Eff}) when node(P) =:= Node -> + St = return_all(St0, Checked0), + Credit = increase_credit(C, maps:size(Checked0)), + Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff), + {maps:put(K, C#consumer{suspected_down = true, + credit = Credit, + checked_out = #{}}, Co), + St, Eff1}; + (K, C, {Co, St, Eff}) -> + {maps:put(K, C, Co), St, Eff} + end, {#{}, State0, []}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = true}; (_, E) -> E @@ -483,17 +485,17 @@ apply(_, {down, ConsumerPid, noconnection}, % mark waiting consumers as suspected if necessary WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, true), - Effects = case maps:size(Cons) of - 0 -> - [{aux, inactive}, {monitor, node, Node}]; - _ -> - [{monitor, node, Node}] - end, + Effects2 = case maps:size(Cons) of + 0 -> + [{aux, inactive}, {monitor, node, Node}]; + _ -> + [{monitor, node, Node}] + end ++ Effects1, %% TODO: should we run a checkout here? {State#state{consumers = Cons, enqueuers = Enqs, - waiting_consumers = WaitingConsumers}, ok, Effects}; + waiting_consumers = WaitingConsumers}, ok, Effects2}; apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid State1 = case maps:take(Pid, Enqs0) of @@ -531,12 +533,14 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0, E#enqueuer{suspected_down = false}; (_, E) -> E end, Enqs0), + ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), {Cons1, SQ, Effects} = maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) when node(P) =:= Node -> + EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc), update_or_remove_sub( ConsumerId, C#consumer{suspected_down = false}, - CAcc, SQAcc, EAcc); + CAcc, SQAcc, EAcc1); (_, _, Acc) -> Acc end, {Cons0, SQ0, Monitors}, Cons0), @@ -549,6 +553,15 @@ apply(_, {nodedown, _Node}, State) -> apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []). +consumer_active_flag_update_function(#state{consumer_strategy = default}) -> + fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> + consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) + end; +consumer_active_flag_update_function(#state{consumer_strategy = single_active}) -> + fun(_, _, _, _, _, Effects) -> + Effects + end. + handle_waiting_consumer_down(_Pid, #state{consumer_strategy = default} = State) -> {[], State}; @@ -718,31 +731,48 @@ query_consumer_count(#state{consumers = Consumers, maps:size(Consumers) + length(WaitingConsumers). query_consumers(#state{consumers = Consumers, - waiting_consumers = WaitingConsumers} = State) -> - SingleActiveConsumer = query_single_active_consumer(State), - IsSingleActiveConsumerFun = fun({Tag, Pid} = _ConsumerId) -> - case SingleActiveConsumer of - {value, {Tag, Pid}} -> - true; - _ -> - false - end - end, - FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) -> + waiting_consumers = WaitingConsumers, + consumer_strategy = ConsumerStrategy } = State) -> + ActiveActivityStatusFun = case ConsumerStrategy of + default -> + fun(_ConsumerId, #consumer{suspected_down = SuspectedDown}) -> + case SuspectedDown of + true -> + {false, suspected_down}; + false -> + {true, up} + end + end; + single_active -> + SingleActiveConsumer = query_single_active_consumer(State), + fun({Tag, Pid} = _Consumer, _) -> + case SingleActiveConsumer of + {value, {Tag, Pid}} -> + {true, single_active}; + _ -> + {false, waiting} + end + end + end, + FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta} = Consumer) -> + {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), {Pid, Tag, maps:get(ack, Meta, undefined), maps:get(prefetch, Meta, undefined), - IsSingleActiveConsumerFun({Tag, Pid}), + Active, + ActivityStatus, maps:get(args, Meta, []), maps:get(username, Meta, undefined)} end, Consumers), FromWaitingConsumers = - lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) -> + lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> + {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), maps:put({Tag, Pid}, {Pid, Tag, maps:get(ack, Meta, undefined), maps:get(prefetch, Meta, undefined), - IsSingleActiveConsumerFun({Tag, Pid}), + Active, + ActivityStatus, maps:get(args, Meta, []), maps:get(username, Meta, undefined)}, Acc) @@ -752,6 +782,8 @@ query_consumers(#state{consumers = Consumers, query_single_active_consumer(#state{consumer_strategy = single_active, consumers = Consumers}) -> case maps:size(Consumers) of + 0 -> + {error, no_value}; 1 -> {value, lists:nth(1, maps:keys(Consumers))}; _ @@ -852,8 +884,10 @@ cancel_consumer(ConsumerId, NewActiveConsumer}, service_queue = ServiceQueue1, waiting_consumers = RemainingWaitingConsumers}, - add_consumer_promotion_effect(State, NewActiveConsumerId, - NewActiveConsumer, Effects1); + Effects2 = consumer_update_active_effects(State, NewActiveConsumerId, + NewActiveConsumer, true, + single_active, Effects1), + {State, Effects2}; error -> % The cancelled consumer is not the active one % Just remove it from idle_consumers @@ -865,18 +899,18 @@ cancel_consumer(ConsumerId, {State0#state{waiting_consumers = WaitingConsumers}, Effects} end. -add_consumer_promotion_effect(#state{consumer_strategy = single_active, - queue_resource = QName} = State, - ConsumerId, #consumer{meta = Meta}, - Effects) -> +consumer_update_active_effects(#state{queue_resource = QName }, + ConsumerId, #consumer{meta = Meta}, + Active, ActivityStatus, + Effects) -> Ack = maps:get(ack, Meta, undefined), Prefetch = maps:get(prefetch, Meta, undefined), Args = maps:get(args, Meta, []), - {State, [{mod_call, - rabbit_quorum_queue, - update_consumer_handler, - [QName, ConsumerId, false, Ack, Prefetch, true, Args]} - | Effects]}. + [{mod_call, + rabbit_quorum_queue, + update_consumer_handler, + [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} + | Effects]. cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0) -> @@ -2429,6 +2463,44 @@ query_consumers_test() -> queue_resource => rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), shadow_copy_interval => 0, + single_active_consumer_on => false}), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + #{index => 1}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, self()}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + Consumers0 = State1#state.consumers, + Consumer = maps:get({<<"ctag2">>, self()}, Consumers0), + Consumers1 = maps:put({<<"ctag2">>, self()}, Consumer#consumer{suspected_down = true}, Consumers0), + State2 = State1#state{consumers = Consumers1}, + + ?assertEqual(4, query_consumer_count(State2)), + Consumers2 = query_consumers(State2), + ?assertEqual(4, maps:size(Consumers2)), + maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + <<"ctag2">> -> + ?assertNot(Active), + ?assertEqual(suspected_down, ActivityStatus); + _ -> + ?assert(Active), + ?assertEqual(up, ActivityStatus) + end + end, [], Consumers2). + +query_consumers_when_single_active_consumer_is_on_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, single_active_consumer_on => true}), Meta = #{index => 1}, % adding some consumers @@ -2446,10 +2518,84 @@ query_consumers_test() -> ?assertEqual(4, query_consumer_count(State1)), Consumers = query_consumers(State1), ?assertEqual(4, maps:size(Consumers)), - maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _, _}, _Acc) -> - ?assertEqual(self(), Pid) + maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + <<"ctag1">> -> + ?assert(Active), + ?assertEqual(single_active, ActivityStatus); + _ -> + ?assertNot(Active), + ?assertEqual(waiting, ActivityStatus) + end end, [], Consumers). +active_flag_updated_when_consumer_suspected_unsuspected_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => false}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{index => 1}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, ChannelId}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), + % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node + ?assertEqual(4 + 1, length(Effects2)), + + {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID + ?assertEqual(4 + 4, length(Effects3)). + +active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{index => 1}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, ChannelId}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), + % only 1 effect to monitor the node + ?assertEqual(1, length(Effects2)), + + {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to monitor the consumer PID + ?assertEqual(4, length(Effects3)). + meta(Idx) -> #{index => Idx, term => 1}. diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 704c1a46ae..7a0c0f98e3 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -16,7 +16,7 @@ -module(rabbit_queue_consumers). --export([new/0, max_active_priority/1, inactive/1, all/1, all/2, count/0, +-export([new/0, max_active_priority/1, inactive/1, all/1, all/3, count/0, unacknowledged_message_count/0, add/10, remove/3, erase_ch/2, send_drained/0, deliver/5, record_ack/3, subtract_acks/3, possibly_unblock/3, @@ -118,24 +118,32 @@ inactive(#state{consumers = Consumers}) -> priority_queue:is_empty(Consumers). all(State) -> - all(State, none). - -all(#state{consumers = Consumers}, SingleActiveConsumer) -> - lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, Acc) end, - consumers(Consumers, SingleActiveConsumer, []), all_ch_record()). - -consumers(Consumers, SingleActiveConsumer, Acc) -> + all(State, none, false). + +all(#state{consumers = Consumers}, SingleActiveConsumer, SingleActiveConsumerOn) -> + lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) end, + consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, []), all_ch_record()). + +consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) -> + ActiveActivityStatusFun = case SingleActiveConsumerOn of + true -> + fun({ChPid, Consumer}) -> + case SingleActiveConsumer of + {ChPid, Consumer} -> + {true, single_active}; + _ -> + {false, waiting} + end + end; + false -> + fun(_) -> {true, up} end + end, priority_queue:fold( fun ({ChPid, Consumer}, _P, Acc1) -> #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch, args = Args, user = Username} = Consumer, - IsSingleActive = case SingleActiveConsumer of - {ChPid, Consumer} -> - true; - _ -> - false - end, - [{ChPid, CTag, Ack, Prefetch, IsSingleActive, Args, Username} | Acc1] + {Active, ActivityStatus} = ActiveActivityStatusFun({ChPid, Consumer}), + [{ChPid, CTag, Ack, Prefetch, Active, ActivityStatus, Args, Username} | Acc1] end, Acc, Consumers). count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 1f3aa1f38b..114af4fdfb 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -26,7 +26,7 @@ -export([dead_letter_publish/4]). -export([queue_name/1]). -export([cluster_state/1, status/2]). --export([update_consumer_handler/7, update_consumer/8]). +-export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). -export([become_leader/2, update_metrics/2]). -export([rpc_delete_metrics/1]). @@ -169,13 +169,13 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) -> _ -> false end. -update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, SingleActive, Args) -> +update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) -> local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer, - [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args]). + [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args]). -update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args) -> +update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) -> catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired, - QName, Prefetch, SingleActive, Args). + QName, Prefetch, Active, ActivityStatus, Args). cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]). @@ -419,7 +419,7 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, Args :: rabbit_framing:amqp_table(), ActingUser :: binary(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid, +basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum} = Q, NoAck, ChPid, ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args, ActingUser, OkMsg, QState0) -> %% TODO: validate consumer arguments @@ -443,17 +443,22 @@ basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid, QState0), {ok, {_, SacResult}, _} = ra:local_query(QPid, fun rabbit_fifo:query_single_active_consumer/1), - IsSingleActiveConsumer = case SacResult of - {value, {ConsumerTag, ChPid}} -> - true; - _ -> - false - end, + + SingleActiveConsumerOn = single_active_consumer_on(Q), + {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of + {false, _} -> + {true, up}; + {true, {value, {ConsumerTag, ChPid}}} -> + {true, single_active}; + _ -> + {false, waiting} + end, %% TODO: emit as rabbit_fifo effect rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, QName, - ConsumerPrefetchCount, IsSingleActiveConsumer, Args), + ConsumerPrefetchCount, IsSingleActiveConsumer, + ActivityStatus, Args), {ok, QState}. basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) -> diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl index 0518c05dbd..7ae43aa7e1 100644 --- a/test/rabbit_core_metrics_gc_SUITE.erl +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -300,7 +300,7 @@ consumer_metrics(Config) -> CTag = <<"tag">>, rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, consumer_created, [DeadPid, CTag, true, true, - QName, 1, false, []]), + QName, 1, false, waiting, []]), Id = {QName, DeadPid, CTag}, [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, [consumer_created, Id]), diff --git a/test/unit_queue_consumers_SUITE.erl b/test/unit_queue_consumers_SUITE.erl index 08b2da4574..39838b0874 100644 --- a/test/unit_queue_consumers_SUITE.erl +++ b/test/unit_queue_consumers_SUITE.erl @@ -25,7 +25,8 @@ all() -> [ is_same, get_consumer, - get + get, + list_consumers ]. is_same(_Config) -> @@ -79,6 +80,33 @@ get_consumer(_Config) -> ), ok. +list_consumers(_Config) -> + State = state(consumers([consumer(self(), <<"1">>), consumer(self(), <<"2">>), consumer(self(), <<"3">>)])), + Consumer = rabbit_queue_consumers:get_consumer(State), + {_Pid, ConsumerRecord} = Consumer, + CTag = rabbit_queue_consumers:consumer_tag(ConsumerRecord), + ConsumersWithSingleActive = rabbit_queue_consumers:all(State, Consumer, true), + ?assertEqual(3, length(ConsumersWithSingleActive)), + lists:foldl(fun({Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + CTag -> + ?assert(Active), + ?assertEqual(single_active, ActivityStatus); + _ -> + ?assertNot(Active), + ?assertEqual(waiting, ActivityStatus) + end + end, [], ConsumersWithSingleActive), + ConsumersNoSingleActive = rabbit_queue_consumers:all(State, none, false), + ?assertEqual(3, length(ConsumersNoSingleActive)), + lists:foldl(fun({Pid, _, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + ?assert(Active), + ?assertEqual(up, ActivityStatus) + end, [], ConsumersNoSingleActive), + ok. + consumers([]) -> priority_queue:new(); consumers(Consumers) -> |
