diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 139 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 18 |
3 files changed, 129 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index af42d68359..abf2f64f8f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1296,15 +1296,17 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, QName = qname(State1), AckRequired = not NoAck, TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers), - IsSingleActiveConsumer = case {SingleActiveConsumerOn, State1#q.active_consumer} of - {true, TheConsumer} -> - true; - _ -> - false - end, + ConsumerIsActive = case {SingleActiveConsumerOn, State1#q.active_consumer} of + {true, TheConsumer} -> + true; + {true, _} -> + false; + {false, _} -> + true + end, rabbit_core_metrics:consumer_created( ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, - PrefetchCount, IsSingleActiveConsumer, Args), + PrefetchCount, ConsumerIsActive, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, PrefetchCount, Args, none, ActingUser), diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 8f34c01210..7f99f1e6e4 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -452,24 +452,26 @@ apply(_, {down, ConsumerPid, noconnection}, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> Node = node(ConsumerPid), + ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), % mark all consumers and enqueuers as suspected down % and monitor the node so that we can find out the final state of the % process at some later point - {Cons, State} = maps:fold( - fun({_, P} = K, - #consumer{checked_out = Checked0} = C, - {Co, St0}) when node(P) =:= Node -> - St = return_all(St0, Checked0), - %% TODO: need to increment credit here - %% with the size of the Checked map - Credit = increase_credit(C, maps:size(Checked0)), - {maps:put(K, C#consumer{suspected_down = true, - credit = Credit, - checked_out = #{}}, Co), - St}; - (K, C, {Co, St}) -> - {maps:put(K, C, Co), St} - end, {#{}, State0}, Cons0), + {Cons, State, Effects1} = maps:fold( + fun({_, P} = K, + #consumer{checked_out = Checked0} = C, + {Co, St0, Eff}) when node(P) =:= Node -> + St = return_all(St0, Checked0), + %% TODO: need to increment credit here + %% with the size of the Checked map + Credit = increase_credit(C, maps:size(Checked0)), + Eff1 = ConsumerUpdateActiveFun(St, K, C, false, Eff), + {maps:put(K, C#consumer{suspected_down = true, + credit = Credit, + checked_out = #{}}, Co), + St, Eff1}; + (K, C, {Co, St, Eff}) -> + {maps:put(K, C, Co), St, Eff} + end, {#{}, State0, []}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = true}; (_, E) -> E @@ -477,14 +479,14 @@ apply(_, {down, ConsumerPid, noconnection}, % mark waiting consumers as suspected if necessary WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, true), - Effects = case maps:size(Cons) of - 0 -> - [{aux, inactive}, {monitor, node, Node}]; - _ -> - [{monitor, node, Node}] - end, + Effects2 = case maps:size(Cons) of + 0 -> + [{aux, inactive}, {monitor, node, Node}]; + _ -> + [{monitor, node, Node}] + end ++ Effects1, %% TODO: should we run a checkout here? - {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects}; + {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects2}; apply(_, {down, Pid, _Info}, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages @@ -529,6 +531,7 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0, (_, Acc) -> Acc end, [], WaitingConsumers0), + % FIXME deduplicate the list of PID to monitor (some consumers can share the same channel) Monitors = [{monitor, process, P} || P <- Cons ++ Enqs ++ WaitingConsumers], % un-suspect waiting consumers when necessary @@ -538,12 +541,14 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0, E#enqueuer{suspected_down = false}; (_, E) -> E end, Enqs0), + ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), {Cons1, SQ, Effects} = maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) when node(P) =:= Node -> + EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, EAcc), update_or_remove_sub( ConsumerId, C#consumer{suspected_down = false}, - CAcc, SQAcc, EAcc); + CAcc, SQAcc, EAcc1); (_, _, Acc) -> Acc end, {Cons0, SQ0, Monitors}, Cons0), @@ -555,6 +560,15 @@ apply(_, {nodedown, _Node}, State) -> apply(_, #update_config{config = Conf}, State) -> {update_config(Conf, State), ok}. +consumer_active_flag_update_function(#state{consumer_strategy = default}) -> + fun(State, ConsumerId, Consumer, Active, Effects) -> + consumer_update_active_effects(State, ConsumerId, Consumer, Active, Effects) + end; +consumer_active_flag_update_function(#state{consumer_strategy = single_active}) -> + fun(_, _, _, _, Effects) -> + Effects + end. + maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = default} = State) -> {[], State}; maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = single_active, @@ -575,10 +589,10 @@ maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, #state{consumer_st maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, _Suspected) -> []; maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = single_active, - waiting_consumers = []}, _Suspected) -> + waiting_consumers = []}, _Suspected) -> []; maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_active, - waiting_consumers = WaitingConsumers}, Suspected) -> + waiting_consumers = WaitingConsumers}, Suspected) -> [begin case node(P) of Node -> @@ -825,7 +839,7 @@ cancel_consumer(ConsumerId, State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer}, service_queue = ServiceQueue1, waiting_consumers = RemainingWaitingConsumers}, - Effects1 = consumer_promoted_to_single_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, Effects), + Effects1 = consumer_update_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, true, Effects), {Effects1, State1}; error -> % The cancelled consumer is not the active one @@ -836,12 +850,11 @@ cancel_consumer(ConsumerId, {Effects, State0#state{waiting_consumers = WaitingConsumers1}} end. -consumer_promoted_to_single_active_effects(#state{consumer_strategy = single_active, - queue_resource = QName }, - ConsumerId, #consumer{meta = Meta}, Effects) -> +consumer_update_active_effects(#state{queue_resource = QName }, + ConsumerId, #consumer{meta = Meta}, Active, Effects) -> [{mod_call, rabbit_quorum_queue, update_consumer_handler, [QName, ConsumerId, false, maps:get(ack, Meta, undefined), - maps:get(prefetch, Meta, undefined), true, maps:get(args, Meta, [])]} | Effects]. + maps:get(prefetch, Meta, undefined), Active, maps:get(args, Meta, [])]} | Effects]. cancel_consumer0(ConsumerId, {Effects0, #state{consumers = C0} = S0}) -> @@ -2314,6 +2327,72 @@ query_consumers_test() -> ?assertEqual(self(), Pid) end, [], Consumers). +active_flag_updated_when_consumer_suspected_unsuspected_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => false}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, ChannelId}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(doesnotmatter, {down, Pid1, noconnection}, State1), + % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node + ?assertEqual(4 + 1, length(Effects2)), + + {_, _, Effects3} = apply(doesnotmatter, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID + ?assertEqual(4 + 4, length(Effects3)). + +active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, ChannelId}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(doesnotmatter, {down, Pid1, noconnection}, State1), + % only 1 effect to monitor the node + ?assertEqual(1, length(Effects2)), + + {_, _, Effects3} = apply(doesnotmatter, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to monitor the consumer PID + ?assertEqual(4, length(Effects3)). + meta(Idx) -> #{index => Idx, term => 1}. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 5c0d1c0070..dc616899f4 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -164,13 +164,13 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) -> _ -> false end. -update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, SingleActive, Args) -> +update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, Args) -> local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer, - [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args]). + [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args]). -update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args) -> +update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args) -> catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired, - QName, Prefetch, SingleActive, Args). + QName, Prefetch, Active, Args). cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]). @@ -414,7 +414,7 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, Args :: rabbit_framing:amqp_table(), ActingUser :: binary(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid, +basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum} = Q, NoAck, ChPid, ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args, ActingUser, OkMsg, QState0) -> %% TODO: validate consumer arguments @@ -438,8 +438,12 @@ basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid, QState0), {ok, {_, SacResult}, _} = ra:local_query(QPid, fun rabbit_fifo:query_single_active_consumer/1), - IsSingleActiveConsumer = case SacResult of - {value, {ConsumerTag, ChPid}} -> + + SingleActiveConsumerOn = single_active_consumer_on(Q), + IsSingleActiveConsumer = case {SingleActiveConsumerOn, SacResult} of + {false, _} -> + true; + {true, {value, {ConsumerTag, ChPid}}} -> true; _ -> false |
