summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-11-22 10:30:47 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-11-22 10:30:47 +0100
commitfcdf570de6017ad8642e53302bef074580761bd2 (patch)
treef3f8a0e2be361cdcd31c511332d9f5d75d53b130
parent744a307dea1befdadc030b453dfa0eeab74aae29 (diff)
downloadrabbitmq-server-git-fcdf570de6017ad8642e53302bef074580761bd2.tar.gz
Use single active consumer term when appropriate
References #1743
-rw-r--r--src/rabbit_amqqueue_process.erl106
-rw-r--r--src/rabbit_queue_consumers.erl18
-rw-r--r--test/single_active_consumer_SUITE.erl (renamed from test/exclusive_consumer_SUITE.erl)2
3 files changed, 63 insertions, 63 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2570d1f830..ddf024719f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -36,8 +36,8 @@
-record(q, {
%% an #amqqueue record
q,
- %% none | {exclusive consumer channel PID, consumer tag} | {exclusive consumer channel PID, consumer}
- exclusive_consumer,
+ %% none | {exclusive consumer channel PID, consumer tag} | {single active consumer channel PID, consumer}
+ active_consumer,
%% Set to true if a queue has ever had a consumer.
%% This is used to determine when to delete auto-delete queues.
has_had_consumers,
@@ -96,7 +96,7 @@
%% running | flow | idle
status,
%% true | false
- exclusive_consumer_on
+ single_active_consumer_on
}).
%%----------------------------------------------------------------------------
@@ -157,20 +157,20 @@ init(Q) ->
?MODULE}.
init_state(Q) ->
- ExclusiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-single-active-consumer">>) of
+ SingleActiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-single-active-consumer">>) of
{bool, true} -> true;
_ -> false
end,
- State = #q{q = Q,
- exclusive_consumer = none,
- has_had_consumers = false,
- consumers = rabbit_queue_consumers:new(),
- senders = pmon:new(delegate),
- msg_id_to_channel = gb_trees:empty(),
- status = running,
- args_policy_version = 0,
- overflow = 'drop-head',
- exclusive_consumer_on = ExclusiveConsumerOn},
+ State = #q{q = Q,
+ active_consumer = none,
+ has_had_consumers = false,
+ consumers = rabbit_queue_consumers:new(),
+ senders = pmon:new(delegate),
+ msg_id_to_channel = gb_trees:empty(),
+ status = running,
+ args_policy_version = 0,
+ overflow = 'drop-head',
+ single_active_consumer_on = SingleActiveConsumerOn},
rabbit_event:init_stats_timer(State, #q.stats_timer).
init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@@ -552,10 +552,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(#q{exclusive_consumer_on = true}) ->
+assert_invariant(#q{single_active_consumer_on = true}) ->
%% queue may contain messages and have available consumers with exclusive consumer
ok;
-assert_invariant(State = #q{consumers = Consumers, exclusive_consumer_on = false}) ->
+assert_invariant(State = #q{consumers = Consumers, single_active_consumer_on = false}) ->
true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)).
is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
@@ -630,7 +630,7 @@ run_message_queue(ActiveConsumersChanged, State) ->
false -> case rabbit_queue_consumers:deliver(
fun(AckRequired) -> fetch(AckRequired, State) end,
qname(State), State#q.consumers,
- State#q.exclusive_consumer_on, State#q.exclusive_consumer) of
+ State#q.single_active_consumer_on, State#q.active_consumer) of
{delivered, ActiveConsumersChanged1, State1, Consumers} ->
run_message_queue(
ActiveConsumersChanged or ActiveConsumersChanged1,
@@ -656,7 +656,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
{{Message, Delivered, AckTag}, {BQS1, MTC}};
(false) -> {{Message, Delivered, undefined},
discard(Delivery, BQ, BQS, MTC)}
- end, qname(State), State#q.consumers, State#q.exclusive_consumer_on, State#q.exclusive_consumer) of
+ end, qname(State), State#q.consumers, State#q.single_active_consumer_on, State#q.active_consumer) of
{delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} ->
{delivered, maybe_notify_decorators(
ActiveConsumersChanged,
@@ -816,10 +816,10 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{consumers = Consumers,
- exclusive_consumer = Holder,
- exclusive_consumer_on = ExclusiveConsumerOn,
- senders = Senders}) ->
+handle_ch_down(DownPid, State = #q{consumers = Consumers,
+ active_consumer = Holder,
+ single_active_consumer_on = SingleActiveConsumerOn,
+ senders = Senders}) ->
State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
false ->
Senders;
@@ -843,9 +843,9 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
{ChAckTags, ChCTags, Consumers1} ->
QName = qname(State1),
[emit_consumer_deleted(DownPid, CTag, QName, ?INTERNAL_USER) || CTag <- ChCTags],
- Holder1 = new_exclusive_consumer_after_channel_down(DownPid, Holder, ExclusiveConsumerOn, Consumers1),
+ Holder1 = new_single_active_consumer_after_channel_down(DownPid, Holder, SingleActiveConsumerOn, Consumers1),
State2 = State1#q{consumers = Consumers1,
- exclusive_consumer = Holder1},
+ active_consumer = Holder1},
notify_decorators(State2),
case should_auto_delete(State2) of
true ->
@@ -860,18 +860,18 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
end
end.
-new_exclusive_consumer_after_channel_down(DownChPid, CurrentExclusiveConsumer, _ExclusiveConsumerIsOn = true, Consumers) ->
- case CurrentExclusiveConsumer of
+new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = true, Consumers) ->
+ case CurrentSingleActiveConsumer of
{DownChPid, _} ->
case rabbit_queue_consumers:get_consumer(Consumers) of
undefined -> none;
Consumer -> Consumer
end;
false ->
- CurrentExclusiveConsumer
+ CurrentSingleActiveConsumer
end;
-new_exclusive_consumer_after_channel_down(DownChPid, CurrentExclusiveConsumer, _ExclusiveConsumerIsOn = false, _Consumers) ->
- case CurrentExclusiveConsumer of
+new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = false, _Consumers) ->
+ case CurrentSingleActiveConsumer of
{DownChPid, _} -> none;
Other -> Other
end.
@@ -1023,15 +1023,15 @@ i(effective_policy_definition, #q{q = Q}) ->
undefined -> [];
Def -> Def
end;
-i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
+i(exclusive_consumer_pid, #q{active_consumer = none}) ->
'';
-i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTagOrConsumer}}) ->
+i(exclusive_consumer_pid, #q{active_consumer = {ChPid, _ConsumerTagOrConsumer}}) ->
ChPid;
-i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
+i(exclusive_consumer_tag, #q{active_consumer = none}) ->
'';
-i(exclusive_consumer_tag, #q{exclusive_consumer_on = true, exclusive_consumer = {_ChPid, Consumer}}) ->
+i(exclusive_consumer_tag, #q{single_active_consumer_on = true, active_consumer = {_ChPid, Consumer}}) ->
rabbit_queue_consumers:consumer_tag(Consumer);
-i(exclusive_consumer_tag, #q{exclusive_consumer_on = false, exclusive_consumer = {_ChPid, ConsumerTag}}) ->
+i(exclusive_consumer_tag, #q{single_active_consumer_on = false, active_consumer = {_ChPid, ConsumerTag}}) ->
ConsumerTag;
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
@@ -1232,9 +1232,9 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
_From, State = #q{consumers = Consumers,
- exclusive_consumer = Holder,
- exclusive_consumer_on = ExclusiveConsumerOn}) ->
- ConsumerRegistration = case ExclusiveConsumerOn of
+ active_consumer = Holder,
+ single_active_consumer_on = SingleActiveConsumerOn}) ->
+ ConsumerRegistration = case SingleActiveConsumerOn of
true ->
case ExclusiveConsume of
true ->
@@ -1251,7 +1251,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
{state, State#q{consumers = Consumers1,
has_had_consumers = true,
- exclusive_consumer = NewConsumer}};
+ active_consumer = NewConsumer}};
_ ->
{state, State#q{consumers = Consumers1,
has_had_consumers = true}}
@@ -1271,7 +1271,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
end,
{state, State#q{consumers = Consumers1,
has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer}}
+ active_consumer = ExclusiveConsumer}}
end
end,
case ConsumerRegistration of
@@ -1292,19 +1292,19 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From,
- State = #q{consumers = Consumers,
- exclusive_consumer = Holder,
- exclusive_consumer_on = ExclusiveConsumerOn }) ->
+ State = #q{consumers = Consumers,
+ active_consumer = Holder,
+ single_active_consumer_on = SingleActiveConsumerOn }) ->
ok = maybe_send_reply(ChPid, OkMsg),
case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of
not_found ->
reply(ok, State);
Consumers1 ->
- Holder1 = new_exclusive_consumer_after_basic_cancel(ChPid, ConsumerTag,
- Holder, ExclusiveConsumerOn, Consumers1
+ Holder1 = new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag,
+ Holder, SingleActiveConsumerOn, Consumers1
),
State1 = State#q{consumers = Consumers1,
- exclusive_consumer = Holder1},
+ active_consumer = Holder1},
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser),
notify_decorators(State1),
case should_auto_delete(State1) of
@@ -1374,22 +1374,22 @@ handle_call(sync_mirrors, _From, State) ->
handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State).
-new_exclusive_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentExclusiveConsumer,
- _ExclusiveConsumerIsOn = true, Consumers) ->
- case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, CurrentExclusiveConsumer) of
+new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer,
+ _SingleActiveConsumerIsOn = true, Consumers) ->
+ case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, CurrentSingleActiveConsumer) of
true ->
case rabbit_queue_consumers:get_consumer(Consumers) of
undefined -> none;
Consumer -> Consumer
end;
false ->
- CurrentExclusiveConsumer
+ CurrentSingleActiveConsumer
end;
-new_exclusive_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentExclusiveConsumer,
- _ExclusiveConsumerIsOn = false, _Consumers) ->
- case CurrentExclusiveConsumer of
+new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer,
+ _SingleActiveConsumerIsOn = false, _Consumers) ->
+ case CurrentSingleActiveConsumer of
{ChPid, ConsumerTag} -> none;
- _ -> CurrentExclusiveConsumer
+ _ -> CurrentSingleActiveConsumer
end.
handle_cast(init, State) ->
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 074c32b387..f32d261d20 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -193,34 +193,34 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
-deliver(FetchFun, QName, State, ExclusiveConsumerIsOn, ExclusiveConsumer) ->
- deliver(FetchFun, QName, false, State, ExclusiveConsumerIsOn, ExclusiveConsumer).
+deliver(FetchFun, QName, State, SingleActiveConsumerIsOn, ActiveConsumer) ->
+ deliver(FetchFun, QName, false, State, SingleActiveConsumerIsOn, ActiveConsumer).
deliver(_FetchFun, _QName, false, State, true, none) ->
{undelivered, false,
State#state{use = update_use(State#state.use, inactive)}};
-deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, ExclusiveConsumer) ->
- {ChPid, Consumer} = ExclusiveConsumer,
+deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, SingleActiveConsumer) ->
+ {ChPid, Consumer} = SingleActiveConsumer,
%% blocked consumers are removed from the queue state, but not the exclusive_consumer field,
%% so we need to do this check to avoid adding the exclusive consumer to the channel record
%% over and over
- case is_blocked(ExclusiveConsumer) of
+ case is_blocked(SingleActiveConsumer) of
true ->
{undelivered, false,
State#state{use = update_use(State#state.use, inactive)}};
false ->
- case deliver_to_consumer(FetchFun, ExclusiveConsumer, QName) of
+ case deliver_to_consumer(FetchFun, SingleActiveConsumer, QName) of
{delivered, R} ->
{delivered, false, R, State};
undelivered ->
- {ChPid, Consumer} = ExclusiveConsumer,
+ {ChPid, Consumer} = SingleActiveConsumer,
Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers),
{undelivered, true,
State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}}
end
end;
deliver(FetchFun, QName, ConsumersChanged,
- State = #state{consumers = Consumers}, false, _ExclusiveConsumer) ->
+ State = #state{consumers = Consumers}, false, _SingleActiveConsumer) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
{undelivered, ConsumersChanged,
@@ -233,7 +233,7 @@ deliver(FetchFun, QName, ConsumersChanged,
Tail)}};
undelivered ->
deliver(FetchFun, QName, true,
- State#state{consumers = Tail}, false, _ExclusiveConsumer)
+ State#state{consumers = Tail}, false, _SingleActiveConsumer)
end
end.
diff --git a/test/exclusive_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
index 1ba05b9841..9670744b2b 100644
--- a/test/exclusive_consumer_SUITE.erl
+++ b/test/single_active_consumer_SUITE.erl
@@ -14,7 +14,7 @@
%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
%%
--module(exclusive_consumer_SUITE).
+-module(single_active_consumer_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").