summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-18 15:44:43 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-18 15:44:43 +0100
commit84675c2e4d3c32dad2333bc1830ca83640af749e (patch)
treec4646df2cc489500d09b000a8347ac07669d8708
parentffc233c1a2fe8b18b7ae6e9738646a4133b07067 (diff)
downloadrabbitmq-server-git-84675c2e4d3c32dad2333bc1830ca83640af749e.tar.gz
Update active flag for consumers
Flag is true by default. Can be set to false in QQ when the consumer node is suspected to be down. When single active consumer is enabled, on the queue, only one consumer is active at a time. [#163298456] References #1838
-rw-r--r--src/rabbit_amqqueue_process.erl16
-rw-r--r--src/rabbit_fifo.erl139
-rw-r--r--src/rabbit_quorum_queue.erl18
3 files changed, 129 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index af42d68359..abf2f64f8f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1296,15 +1296,17 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
QName = qname(State1),
AckRequired = not NoAck,
TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers),
- IsSingleActiveConsumer = case {SingleActiveConsumerOn, State1#q.active_consumer} of
- {true, TheConsumer} ->
- true;
- _ ->
- false
- end,
+ ConsumerIsActive = case {SingleActiveConsumerOn, State1#q.active_consumer} of
+ {true, TheConsumer} ->
+ true;
+ {true, _} ->
+ false;
+ {false, _} ->
+ true
+ end,
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
- PrefetchCount, IsSingleActiveConsumer, Args),
+ PrefetchCount, ConsumerIsActive, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, PrefetchCount,
Args, none, ActingUser),
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 8f34c01210..7f99f1e6e4 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -452,24 +452,26 @@ apply(_, {down, ConsumerPid, noconnection},
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
Node = node(ConsumerPid),
+ ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
% mark all consumers and enqueuers as suspected down
% and monitor the node so that we can find out the final state of the
% process at some later point
- {Cons, State} = maps:fold(
- fun({_, P} = K,
- #consumer{checked_out = Checked0} = C,
- {Co, St0}) when node(P) =:= Node ->
- St = return_all(St0, Checked0),
- %% TODO: need to increment credit here
- %% with the size of the Checked map
- Credit = increase_credit(C, maps:size(Checked0)),
- {maps:put(K, C#consumer{suspected_down = true,
- credit = Credit,
- checked_out = #{}}, Co),
- St};
- (K, C, {Co, St}) ->
- {maps:put(K, C, Co), St}
- end, {#{}, State0}, Cons0),
+ {Cons, State, Effects1} = maps:fold(
+ fun({_, P} = K,
+ #consumer{checked_out = Checked0} = C,
+ {Co, St0, Eff}) when node(P) =:= Node ->
+ St = return_all(St0, Checked0),
+ %% TODO: need to increment credit here
+ %% with the size of the Checked map
+ Credit = increase_credit(C, maps:size(Checked0)),
+ Eff1 = ConsumerUpdateActiveFun(St, K, C, false, Eff),
+ {maps:put(K, C#consumer{suspected_down = true,
+ credit = Credit,
+ checked_out = #{}}, Co),
+ St, Eff1};
+ (K, C, {Co, St, Eff}) ->
+ {maps:put(K, C, Co), St, Eff}
+ end, {#{}, State0, []}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
@@ -477,14 +479,14 @@ apply(_, {down, ConsumerPid, noconnection},
% mark waiting consumers as suspected if necessary
WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, true),
- Effects = case maps:size(Cons) of
- 0 ->
- [{aux, inactive}, {monitor, node, Node}];
- _ ->
- [{monitor, node, Node}]
- end,
+ Effects2 = case maps:size(Cons) of
+ 0 ->
+ [{aux, inactive}, {monitor, node, Node}];
+ _ ->
+ [{monitor, node, Node}]
+ end ++ Effects1,
%% TODO: should we run a checkout here?
- {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects};
+ {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects2};
apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
@@ -529,6 +531,7 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
(_, Acc) -> Acc
end, [], WaitingConsumers0),
+ % FIXME deduplicate the list of PID to monitor (some consumers can share the same channel)
Monitors = [{monitor, process, P} || P <- Cons ++ Enqs ++ WaitingConsumers],
% un-suspect waiting consumers when necessary
@@ -538,12 +541,14 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
E#enqueuer{suspected_down = false};
(_, E) -> E
end, Enqs0),
+ ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
{Cons1, SQ, Effects} =
maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
when node(P) =:= Node ->
+ EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, EAcc),
update_or_remove_sub(
ConsumerId, C#consumer{suspected_down = false},
- CAcc, SQAcc, EAcc);
+ CAcc, SQAcc, EAcc1);
(_, _, Acc) ->
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
@@ -555,6 +560,15 @@ apply(_, {nodedown, _Node}, State) ->
apply(_, #update_config{config = Conf}, State) ->
{update_config(Conf, State), ok}.
+consumer_active_flag_update_function(#state{consumer_strategy = default}) ->
+ fun(State, ConsumerId, Consumer, Active, Effects) ->
+ consumer_update_active_effects(State, ConsumerId, Consumer, Active, Effects)
+ end;
+consumer_active_flag_update_function(#state{consumer_strategy = single_active}) ->
+ fun(_, _, _, _, Effects) ->
+ Effects
+ end.
+
maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = default} = State) ->
{[], State};
maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = single_active,
@@ -575,10 +589,10 @@ maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, #state{consumer_st
maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, _Suspected) ->
[];
maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = single_active,
- waiting_consumers = []}, _Suspected) ->
+ waiting_consumers = []}, _Suspected) ->
[];
maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers}, Suspected) ->
+ waiting_consumers = WaitingConsumers}, Suspected) ->
[begin
case node(P) of
Node ->
@@ -825,7 +839,7 @@ cancel_consumer(ConsumerId,
State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer},
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
- Effects1 = consumer_promoted_to_single_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, Effects),
+ Effects1 = consumer_update_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, true, Effects),
{Effects1, State1};
error ->
% The cancelled consumer is not the active one
@@ -836,12 +850,11 @@ cancel_consumer(ConsumerId,
{Effects, State0#state{waiting_consumers = WaitingConsumers1}}
end.
-consumer_promoted_to_single_active_effects(#state{consumer_strategy = single_active,
- queue_resource = QName },
- ConsumerId, #consumer{meta = Meta}, Effects) ->
+consumer_update_active_effects(#state{queue_resource = QName },
+ ConsumerId, #consumer{meta = Meta}, Active, Effects) ->
[{mod_call, rabbit_quorum_queue, update_consumer_handler,
[QName, ConsumerId, false, maps:get(ack, Meta, undefined),
- maps:get(prefetch, Meta, undefined), true, maps:get(args, Meta, [])]} | Effects].
+ maps:get(prefetch, Meta, undefined), Active, maps:get(args, Meta, [])]} | Effects].
cancel_consumer0(ConsumerId,
{Effects0, #state{consumers = C0} = S0}) ->
@@ -2314,6 +2327,72 @@ query_consumers_test() ->
?assertEqual(self(), Pid)
end, [], Consumers).
+active_flag_updated_when_consumer_suspected_unsuspected_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => false}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, ChannelId}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ {State2, _, Effects2} = apply(doesnotmatter, {down, Pid1, noconnection}, State1),
+ % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
+ ?assertEqual(4 + 1, length(Effects2)),
+
+ {_, _, Effects3} = apply(doesnotmatter, {nodeup, node(self())}, State2),
+ % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
+ ?assertEqual(4 + 4, length(Effects3)).
+
+active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, ChannelId}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ {State2, _, Effects2} = apply(doesnotmatter, {down, Pid1, noconnection}, State1),
+ % only 1 effect to monitor the node
+ ?assertEqual(1, length(Effects2)),
+
+ {_, _, Effects3} = apply(doesnotmatter, {nodeup, node(self())}, State2),
+ % for each consumer: 1 effect to monitor the consumer PID
+ ?assertEqual(4, length(Effects3)).
+
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 5c0d1c0070..dc616899f4 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -164,13 +164,13 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
_ -> false
end.
-update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, SingleActive, Args) ->
+update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, Args) ->
local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
- [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args]).
+ [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args]).
-update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args) ->
+update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args) ->
catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired,
- QName, Prefetch, SingleActive, Args).
+ QName, Prefetch, Active, Args).
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]).
@@ -414,7 +414,7 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
Args :: rabbit_framing:amqp_table(), ActingUser :: binary(),
any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid,
+basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum} = Q, NoAck, ChPid,
ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args,
ActingUser, OkMsg, QState0) ->
%% TODO: validate consumer arguments
@@ -438,8 +438,12 @@ basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid,
QState0),
{ok, {_, SacResult}, _} = ra:local_query(QPid,
fun rabbit_fifo:query_single_active_consumer/1),
- IsSingleActiveConsumer = case SacResult of
- {value, {ConsumerTag, ChPid}} ->
+
+ SingleActiveConsumerOn = single_active_consumer_on(Q),
+ IsSingleActiveConsumer = case {SingleActiveConsumerOn, SacResult} of
+ {false, _} ->
+ true;
+ {true, {value, {ConsumerTag, ChPid}}} ->
true;
_ ->
false