diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-18 15:44:43 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-18 15:44:43 +0100 |
| commit | 84675c2e4d3c32dad2333bc1830ca83640af749e (patch) | |
| tree | c4646df2cc489500d09b000a8347ac07669d8708 | |
| parent | ffc233c1a2fe8b18b7ae6e9738646a4133b07067 (diff) | |
| download | rabbitmq-server-git-84675c2e4d3c32dad2333bc1830ca83640af749e.tar.gz | |
Update active flag for consumers
Flag is true by default. Can be set to false in QQ when the consumer
node is suspected to be down. When single active consumer is enabled, on
the queue, only one consumer is active at a time.
[#163298456]
References #1838
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 139 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 18 |
3 files changed, 129 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index af42d68359..abf2f64f8f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1296,15 +1296,17 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, QName = qname(State1), AckRequired = not NoAck, TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers), - IsSingleActiveConsumer = case {SingleActiveConsumerOn, State1#q.active_consumer} of - {true, TheConsumer} -> - true; - _ -> - false - end, + ConsumerIsActive = case {SingleActiveConsumerOn, State1#q.active_consumer} of + {true, TheConsumer} -> + true; + {true, _} -> + false; + {false, _} -> + true + end, rabbit_core_metrics:consumer_created( ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, - PrefetchCount, IsSingleActiveConsumer, Args), + PrefetchCount, ConsumerIsActive, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, PrefetchCount, Args, none, ActingUser), diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 8f34c01210..7f99f1e6e4 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -452,24 +452,26 @@ apply(_, {down, ConsumerPid, noconnection}, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> Node = node(ConsumerPid), + ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), % mark all consumers and enqueuers as suspected down % and monitor the node so that we can find out the final state of the % process at some later point - {Cons, State} = maps:fold( - fun({_, P} = K, - #consumer{checked_out = Checked0} = C, - {Co, St0}) when node(P) =:= Node -> - St = return_all(St0, Checked0), - %% TODO: need to increment credit here - %% with the size of the Checked map - Credit = increase_credit(C, maps:size(Checked0)), - {maps:put(K, C#consumer{suspected_down = true, - credit = Credit, - checked_out = #{}}, Co), - St}; - (K, C, {Co, St}) -> - {maps:put(K, C, Co), St} - end, {#{}, State0}, Cons0), + {Cons, State, Effects1} = maps:fold( + fun({_, P} = K, + #consumer{checked_out = Checked0} = C, + {Co, St0, Eff}) when node(P) =:= Node -> + St = return_all(St0, Checked0), + %% TODO: need to increment credit here + %% with the size of the Checked map + Credit = increase_credit(C, maps:size(Checked0)), + Eff1 = ConsumerUpdateActiveFun(St, K, C, false, Eff), + {maps:put(K, C#consumer{suspected_down = true, + credit = Credit, + checked_out = #{}}, Co), + St, Eff1}; + (K, C, {Co, St, Eff}) -> + {maps:put(K, C, Co), St, Eff} + end, {#{}, State0, []}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = true}; (_, E) -> E @@ -477,14 +479,14 @@ apply(_, {down, ConsumerPid, noconnection}, % mark waiting consumers as suspected if necessary WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, true), - Effects = case maps:size(Cons) of - 0 -> - [{aux, inactive}, {monitor, node, Node}]; - _ -> - [{monitor, node, Node}] - end, + Effects2 = case maps:size(Cons) of + 0 -> + [{aux, inactive}, {monitor, node, Node}]; + _ -> + [{monitor, node, Node}] + end ++ Effects1, %% TODO: should we run a checkout here? - {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects}; + {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects2}; apply(_, {down, Pid, _Info}, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages @@ -529,6 +531,7 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0, (_, Acc) -> Acc end, [], WaitingConsumers0), + % FIXME deduplicate the list of PID to monitor (some consumers can share the same channel) Monitors = [{monitor, process, P} || P <- Cons ++ Enqs ++ WaitingConsumers], % un-suspect waiting consumers when necessary @@ -538,12 +541,14 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0, E#enqueuer{suspected_down = false}; (_, E) -> E end, Enqs0), + ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), {Cons1, SQ, Effects} = maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) when node(P) =:= Node -> + EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, EAcc), update_or_remove_sub( ConsumerId, C#consumer{suspected_down = false}, - CAcc, SQAcc, EAcc); + CAcc, SQAcc, EAcc1); (_, _, Acc) -> Acc end, {Cons0, SQ0, Monitors}, Cons0), @@ -555,6 +560,15 @@ apply(_, {nodedown, _Node}, State) -> apply(_, #update_config{config = Conf}, State) -> {update_config(Conf, State), ok}. +consumer_active_flag_update_function(#state{consumer_strategy = default}) -> + fun(State, ConsumerId, Consumer, Active, Effects) -> + consumer_update_active_effects(State, ConsumerId, Consumer, Active, Effects) + end; +consumer_active_flag_update_function(#state{consumer_strategy = single_active}) -> + fun(_, _, _, _, Effects) -> + Effects + end. + maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = default} = State) -> {[], State}; maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = single_active, @@ -575,10 +589,10 @@ maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, #state{consumer_st maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, _Suspected) -> []; maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = single_active, - waiting_consumers = []}, _Suspected) -> + waiting_consumers = []}, _Suspected) -> []; maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_active, - waiting_consumers = WaitingConsumers}, Suspected) -> + waiting_consumers = WaitingConsumers}, Suspected) -> [begin case node(P) of Node -> @@ -825,7 +839,7 @@ cancel_consumer(ConsumerId, State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer}, service_queue = ServiceQueue1, waiting_consumers = RemainingWaitingConsumers}, - Effects1 = consumer_promoted_to_single_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, Effects), + Effects1 = consumer_update_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, true, Effects), {Effects1, State1}; error -> % The cancelled consumer is not the active one @@ -836,12 +850,11 @@ cancel_consumer(ConsumerId, {Effects, State0#state{waiting_consumers = WaitingConsumers1}} end. -consumer_promoted_to_single_active_effects(#state{consumer_strategy = single_active, - queue_resource = QName }, - ConsumerId, #consumer{meta = Meta}, Effects) -> +consumer_update_active_effects(#state{queue_resource = QName }, + ConsumerId, #consumer{meta = Meta}, Active, Effects) -> [{mod_call, rabbit_quorum_queue, update_consumer_handler, [QName, ConsumerId, false, maps:get(ack, Meta, undefined), - maps:get(prefetch, Meta, undefined), true, maps:get(args, Meta, [])]} | Effects]. + maps:get(prefetch, Meta, undefined), Active, maps:get(args, Meta, [])]} | Effects]. cancel_consumer0(ConsumerId, {Effects0, #state{consumers = C0} = S0}) -> @@ -2314,6 +2327,72 @@ query_consumers_test() -> ?assertEqual(self(), Pid) end, [], Consumers). +active_flag_updated_when_consumer_suspected_unsuspected_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => false}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, ChannelId}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(doesnotmatter, {down, Pid1, noconnection}, State1), + % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node + ?assertEqual(4 + 1, length(Effects2)), + + {_, _, Effects3} = apply(doesnotmatter, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID + ?assertEqual(4 + 4, length(Effects3)). + +active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, ChannelId}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(doesnotmatter, {down, Pid1, noconnection}, State1), + % only 1 effect to monitor the node + ?assertEqual(1, length(Effects2)), + + {_, _, Effects3} = apply(doesnotmatter, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to monitor the consumer PID + ?assertEqual(4, length(Effects3)). + meta(Idx) -> #{index => Idx, term => 1}. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 5c0d1c0070..dc616899f4 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -164,13 +164,13 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) -> _ -> false end. -update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, SingleActive, Args) -> +update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, Args) -> local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer, - [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args]). + [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args]). -update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args) -> +update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args) -> catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired, - QName, Prefetch, SingleActive, Args). + QName, Prefetch, Active, Args). cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]). @@ -414,7 +414,7 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, Args :: rabbit_framing:amqp_table(), ActingUser :: binary(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid, +basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum} = Q, NoAck, ChPid, ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args, ActingUser, OkMsg, QState0) -> %% TODO: validate consumer arguments @@ -438,8 +438,12 @@ basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid, QState0), {ok, {_, SacResult}, _} = ra:local_query(QPid, fun rabbit_fifo:query_single_active_consumer/1), - IsSingleActiveConsumer = case SacResult of - {value, {ConsumerTag, ChPid}} -> + + SingleActiveConsumerOn = single_active_consumer_on(Q), + IsSingleActiveConsumer = case {SingleActiveConsumerOn, SacResult} of + {false, _} -> + true; + {true, {value, {ConsumerTag, ChPid}}} -> true; _ -> false |
