summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl16
-rw-r--r--src/rabbit_fifo.erl139
-rw-r--r--src/rabbit_quorum_queue.erl18
3 files changed, 129 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index af42d68359..abf2f64f8f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1296,15 +1296,17 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
QName = qname(State1),
AckRequired = not NoAck,
TheConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, State1#q.consumers),
- IsSingleActiveConsumer = case {SingleActiveConsumerOn, State1#q.active_consumer} of
- {true, TheConsumer} ->
- true;
- _ ->
- false
- end,
+ ConsumerIsActive = case {SingleActiveConsumerOn, State1#q.active_consumer} of
+ {true, TheConsumer} ->
+ true;
+ {true, _} ->
+ false;
+ {false, _} ->
+ true
+ end,
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
- PrefetchCount, IsSingleActiveConsumer, Args),
+ PrefetchCount, ConsumerIsActive, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
AckRequired, QName, PrefetchCount,
Args, none, ActingUser),
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 8f34c01210..7f99f1e6e4 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -452,24 +452,26 @@ apply(_, {down, ConsumerPid, noconnection},
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
Node = node(ConsumerPid),
+ ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
% mark all consumers and enqueuers as suspected down
% and monitor the node so that we can find out the final state of the
% process at some later point
- {Cons, State} = maps:fold(
- fun({_, P} = K,
- #consumer{checked_out = Checked0} = C,
- {Co, St0}) when node(P) =:= Node ->
- St = return_all(St0, Checked0),
- %% TODO: need to increment credit here
- %% with the size of the Checked map
- Credit = increase_credit(C, maps:size(Checked0)),
- {maps:put(K, C#consumer{suspected_down = true,
- credit = Credit,
- checked_out = #{}}, Co),
- St};
- (K, C, {Co, St}) ->
- {maps:put(K, C, Co), St}
- end, {#{}, State0}, Cons0),
+ {Cons, State, Effects1} = maps:fold(
+ fun({_, P} = K,
+ #consumer{checked_out = Checked0} = C,
+ {Co, St0, Eff}) when node(P) =:= Node ->
+ St = return_all(St0, Checked0),
+ %% TODO: need to increment credit here
+ %% with the size of the Checked map
+ Credit = increase_credit(C, maps:size(Checked0)),
+ Eff1 = ConsumerUpdateActiveFun(St, K, C, false, Eff),
+ {maps:put(K, C#consumer{suspected_down = true,
+ credit = Credit,
+ checked_out = #{}}, Co),
+ St, Eff1};
+ (K, C, {Co, St, Eff}) ->
+ {maps:put(K, C, Co), St, Eff}
+ end, {#{}, State0, []}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
@@ -477,14 +479,14 @@ apply(_, {down, ConsumerPid, noconnection},
% mark waiting consumers as suspected if necessary
WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, true),
- Effects = case maps:size(Cons) of
- 0 ->
- [{aux, inactive}, {monitor, node, Node}];
- _ ->
- [{monitor, node, Node}]
- end,
+ Effects2 = case maps:size(Cons) of
+ 0 ->
+ [{aux, inactive}, {monitor, node, Node}];
+ _ ->
+ [{monitor, node, Node}]
+ end ++ Effects1,
%% TODO: should we run a checkout here?
- {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects};
+ {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects2};
apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
@@ -529,6 +531,7 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
(_, Acc) -> Acc
end, [], WaitingConsumers0),
+ % FIXME deduplicate the list of PID to monitor (some consumers can share the same channel)
Monitors = [{monitor, process, P} || P <- Cons ++ Enqs ++ WaitingConsumers],
% un-suspect waiting consumers when necessary
@@ -538,12 +541,14 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
E#enqueuer{suspected_down = false};
(_, E) -> E
end, Enqs0),
+ ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
{Cons1, SQ, Effects} =
maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
when node(P) =:= Node ->
+ EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, EAcc),
update_or_remove_sub(
ConsumerId, C#consumer{suspected_down = false},
- CAcc, SQAcc, EAcc);
+ CAcc, SQAcc, EAcc1);
(_, _, Acc) ->
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
@@ -555,6 +560,15 @@ apply(_, {nodedown, _Node}, State) ->
apply(_, #update_config{config = Conf}, State) ->
{update_config(Conf, State), ok}.
+consumer_active_flag_update_function(#state{consumer_strategy = default}) ->
+ fun(State, ConsumerId, Consumer, Active, Effects) ->
+ consumer_update_active_effects(State, ConsumerId, Consumer, Active, Effects)
+ end;
+consumer_active_flag_update_function(#state{consumer_strategy = single_active}) ->
+ fun(_, _, _, _, Effects) ->
+ Effects
+ end.
+
maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = default} = State) ->
{[], State};
maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = single_active,
@@ -575,10 +589,10 @@ maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, #state{consumer_st
maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, _Suspected) ->
[];
maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = single_active,
- waiting_consumers = []}, _Suspected) ->
+ waiting_consumers = []}, _Suspected) ->
[];
maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers}, Suspected) ->
+ waiting_consumers = WaitingConsumers}, Suspected) ->
[begin
case node(P) of
Node ->
@@ -825,7 +839,7 @@ cancel_consumer(ConsumerId,
State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer},
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
- Effects1 = consumer_promoted_to_single_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, Effects),
+ Effects1 = consumer_update_active_effects(State1, NewActiveConsumerId, NewActiveConsumer, true, Effects),
{Effects1, State1};
error ->
% The cancelled consumer is not the active one
@@ -836,12 +850,11 @@ cancel_consumer(ConsumerId,
{Effects, State0#state{waiting_consumers = WaitingConsumers1}}
end.
-consumer_promoted_to_single_active_effects(#state{consumer_strategy = single_active,
- queue_resource = QName },
- ConsumerId, #consumer{meta = Meta}, Effects) ->
+consumer_update_active_effects(#state{queue_resource = QName },
+ ConsumerId, #consumer{meta = Meta}, Active, Effects) ->
[{mod_call, rabbit_quorum_queue, update_consumer_handler,
[QName, ConsumerId, false, maps:get(ack, Meta, undefined),
- maps:get(prefetch, Meta, undefined), true, maps:get(args, Meta, [])]} | Effects].
+ maps:get(prefetch, Meta, undefined), Active, maps:get(args, Meta, [])]} | Effects].
cancel_consumer0(ConsumerId,
{Effects0, #state{consumers = C0} = S0}) ->
@@ -2314,6 +2327,72 @@ query_consumers_test() ->
?assertEqual(self(), Pid)
end, [], Consumers).
+active_flag_updated_when_consumer_suspected_unsuspected_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => false}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, ChannelId}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ {State2, _, Effects2} = apply(doesnotmatter, {down, Pid1, noconnection}, State1),
+ % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
+ ?assertEqual(4 + 1, length(Effects2)),
+
+ {_, _, Effects3} = apply(doesnotmatter, {nodeup, node(self())}, State2),
+ % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
+ ?assertEqual(4 + 4, length(Effects3)).
+
+active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, ChannelId}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ {State2, _, Effects2} = apply(doesnotmatter, {down, Pid1, noconnection}, State1),
+ % only 1 effect to monitor the node
+ ?assertEqual(1, length(Effects2)),
+
+ {_, _, Effects3} = apply(doesnotmatter, {nodeup, node(self())}, State2),
+ % for each consumer: 1 effect to monitor the consumer PID
+ ?assertEqual(4, length(Effects3)).
+
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 5c0d1c0070..dc616899f4 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -164,13 +164,13 @@ single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
_ -> false
end.
-update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, SingleActive, Args) ->
+update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, Args) ->
local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
- [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args]).
+ [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args]).
-update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, SingleActive, Args) ->
+update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, Args) ->
catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired,
- QName, Prefetch, SingleActive, Args).
+ QName, Prefetch, Active, Args).
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]).
@@ -414,7 +414,7 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
Args :: rabbit_framing:amqp_table(), ActingUser :: binary(),
any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid,
+basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum} = Q, NoAck, ChPid,
ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args,
ActingUser, OkMsg, QState0) ->
%% TODO: validate consumer arguments
@@ -438,8 +438,12 @@ basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum}, NoAck, ChPid,
QState0),
{ok, {_, SacResult}, _} = ra:local_query(QPid,
fun rabbit_fifo:query_single_active_consumer/1),
- IsSingleActiveConsumer = case SacResult of
- {value, {ConsumerTag, ChPid}} ->
+
+ SingleActiveConsumerOn = single_active_consumer_on(Q),
+ IsSingleActiveConsumer = case {SingleActiveConsumerOn, SacResult} of
+ {false, _} ->
+ true;
+ {true, {value, {ConsumerTag, ChPid}}} ->
true;
_ ->
false