diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-11-22 10:30:47 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-11-22 10:30:47 +0100 |
| commit | fcdf570de6017ad8642e53302bef074580761bd2 (patch) | |
| tree | f3f8a0e2be361cdcd31c511332d9f5d75d53b130 | |
| parent | 744a307dea1befdadc030b453dfa0eeab74aae29 (diff) | |
| download | rabbitmq-server-git-fcdf570de6017ad8642e53302bef074580761bd2.tar.gz | |
Use single active consumer term when appropriate
References #1743
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 106 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 18 | ||||
| -rw-r--r-- | test/single_active_consumer_SUITE.erl (renamed from test/exclusive_consumer_SUITE.erl) | 2 |
3 files changed, 63 insertions, 63 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2570d1f830..ddf024719f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,8 +36,8 @@ -record(q, { %% an #amqqueue record q, - %% none | {exclusive consumer channel PID, consumer tag} | {exclusive consumer channel PID, consumer} - exclusive_consumer, + %% none | {exclusive consumer channel PID, consumer tag} | {single active consumer channel PID, consumer} + active_consumer, %% Set to true if a queue has ever had a consumer. %% This is used to determine when to delete auto-delete queues. has_had_consumers, @@ -96,7 +96,7 @@ %% running | flow | idle status, %% true | false - exclusive_consumer_on + single_active_consumer_on }). %%---------------------------------------------------------------------------- @@ -157,20 +157,20 @@ init(Q) -> ?MODULE}. init_state(Q) -> - ExclusiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-single-active-consumer">>) of + SingleActiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-single-active-consumer">>) of {bool, true} -> true; _ -> false end, - State = #q{q = Q, - exclusive_consumer = none, - has_had_consumers = false, - consumers = rabbit_queue_consumers:new(), - senders = pmon:new(delegate), - msg_id_to_channel = gb_trees:empty(), - status = running, - args_policy_version = 0, - overflow = 'drop-head', - exclusive_consumer_on = ExclusiveConsumerOn}, + State = #q{q = Q, + active_consumer = none, + has_had_consumers = false, + consumers = rabbit_queue_consumers:new(), + senders = pmon:new(delegate), + msg_id_to_channel = gb_trees:empty(), + status = running, + args_policy_version = 0, + overflow = 'drop-head', + single_active_consumer_on = SingleActiveConsumerOn}, rabbit_event:init_stats_timer(State, #q.stats_timer). init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> @@ -552,10 +552,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref). ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). -assert_invariant(#q{exclusive_consumer_on = true}) -> +assert_invariant(#q{single_active_consumer_on = true}) -> %% queue may contain messages and have available consumers with exclusive consumer ok; -assert_invariant(State = #q{consumers = Consumers, exclusive_consumer_on = false}) -> +assert_invariant(State = #q{consumers = Consumers, single_active_consumer_on = false}) -> true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)). is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). @@ -630,7 +630,7 @@ run_message_queue(ActiveConsumersChanged, State) -> false -> case rabbit_queue_consumers:deliver( fun(AckRequired) -> fetch(AckRequired, State) end, qname(State), State#q.consumers, - State#q.exclusive_consumer_on, State#q.exclusive_consumer) of + State#q.single_active_consumer_on, State#q.active_consumer) of {delivered, ActiveConsumersChanged1, State1, Consumers} -> run_message_queue( ActiveConsumersChanged or ActiveConsumersChanged1, @@ -656,7 +656,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, {{Message, Delivered, AckTag}, {BQS1, MTC}}; (false) -> {{Message, Delivered, undefined}, discard(Delivery, BQ, BQS, MTC)} - end, qname(State), State#q.consumers, State#q.exclusive_consumer_on, State#q.exclusive_consumer) of + end, qname(State), State#q.consumers, State#q.single_active_consumer_on, State#q.active_consumer) of {delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} -> {delivered, maybe_notify_decorators( ActiveConsumersChanged, @@ -816,10 +816,10 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{consumers = Consumers, - exclusive_consumer = Holder, - exclusive_consumer_on = ExclusiveConsumerOn, - senders = Senders}) -> +handle_ch_down(DownPid, State = #q{consumers = Consumers, + active_consumer = Holder, + single_active_consumer_on = SingleActiveConsumerOn, + senders = Senders}) -> State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of false -> Senders; @@ -843,9 +843,9 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, {ChAckTags, ChCTags, Consumers1} -> QName = qname(State1), [emit_consumer_deleted(DownPid, CTag, QName, ?INTERNAL_USER) || CTag <- ChCTags], - Holder1 = new_exclusive_consumer_after_channel_down(DownPid, Holder, ExclusiveConsumerOn, Consumers1), + Holder1 = new_single_active_consumer_after_channel_down(DownPid, Holder, SingleActiveConsumerOn, Consumers1), State2 = State1#q{consumers = Consumers1, - exclusive_consumer = Holder1}, + active_consumer = Holder1}, notify_decorators(State2), case should_auto_delete(State2) of true -> @@ -860,18 +860,18 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, end end. -new_exclusive_consumer_after_channel_down(DownChPid, CurrentExclusiveConsumer, _ExclusiveConsumerIsOn = true, Consumers) -> - case CurrentExclusiveConsumer of +new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = true, Consumers) -> + case CurrentSingleActiveConsumer of {DownChPid, _} -> case rabbit_queue_consumers:get_consumer(Consumers) of undefined -> none; Consumer -> Consumer end; false -> - CurrentExclusiveConsumer + CurrentSingleActiveConsumer end; -new_exclusive_consumer_after_channel_down(DownChPid, CurrentExclusiveConsumer, _ExclusiveConsumerIsOn = false, _Consumers) -> - case CurrentExclusiveConsumer of +new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = false, _Consumers) -> + case CurrentSingleActiveConsumer of {DownChPid, _} -> none; Other -> Other end. @@ -1023,15 +1023,15 @@ i(effective_policy_definition, #q{q = Q}) -> undefined -> []; Def -> Def end; -i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> +i(exclusive_consumer_pid, #q{active_consumer = none}) -> ''; -i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTagOrConsumer}}) -> +i(exclusive_consumer_pid, #q{active_consumer = {ChPid, _ConsumerTagOrConsumer}}) -> ChPid; -i(exclusive_consumer_tag, #q{exclusive_consumer = none}) -> +i(exclusive_consumer_tag, #q{active_consumer = none}) -> ''; -i(exclusive_consumer_tag, #q{exclusive_consumer_on = true, exclusive_consumer = {_ChPid, Consumer}}) -> +i(exclusive_consumer_tag, #q{single_active_consumer_on = true, active_consumer = {_ChPid, Consumer}}) -> rabbit_queue_consumers:consumer_tag(Consumer); -i(exclusive_consumer_tag, #q{exclusive_consumer_on = false, exclusive_consumer = {_ChPid, ConsumerTag}}) -> +i(exclusive_consumer_tag, #q{single_active_consumer_on = false, active_consumer = {_ChPid, ConsumerTag}}) -> ConsumerTag; i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:len(BQS); @@ -1232,9 +1232,9 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser}, _From, State = #q{consumers = Consumers, - exclusive_consumer = Holder, - exclusive_consumer_on = ExclusiveConsumerOn}) -> - ConsumerRegistration = case ExclusiveConsumerOn of + active_consumer = Holder, + single_active_consumer_on = SingleActiveConsumerOn}) -> + ConsumerRegistration = case SingleActiveConsumerOn of true -> case ExclusiveConsume of true -> @@ -1251,7 +1251,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1), {state, State#q{consumers = Consumers1, has_had_consumers = true, - exclusive_consumer = NewConsumer}}; + active_consumer = NewConsumer}}; _ -> {state, State#q{consumers = Consumers1, has_had_consumers = true}} @@ -1271,7 +1271,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, end, {state, State#q{consumers = Consumers1, has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}} + active_consumer = ExclusiveConsumer}} end end, case ConsumerRegistration of @@ -1292,19 +1292,19 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From, - State = #q{consumers = Consumers, - exclusive_consumer = Holder, - exclusive_consumer_on = ExclusiveConsumerOn }) -> + State = #q{consumers = Consumers, + active_consumer = Holder, + single_active_consumer_on = SingleActiveConsumerOn }) -> ok = maybe_send_reply(ChPid, OkMsg), case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of not_found -> reply(ok, State); Consumers1 -> - Holder1 = new_exclusive_consumer_after_basic_cancel(ChPid, ConsumerTag, - Holder, ExclusiveConsumerOn, Consumers1 + Holder1 = new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, + Holder, SingleActiveConsumerOn, Consumers1 ), State1 = State#q{consumers = Consumers1, - exclusive_consumer = Holder1}, + active_consumer = Holder1}, emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser), notify_decorators(State1), case should_auto_delete(State1) of @@ -1374,22 +1374,22 @@ handle_call(sync_mirrors, _From, State) -> handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State). -new_exclusive_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentExclusiveConsumer, - _ExclusiveConsumerIsOn = true, Consumers) -> - case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, CurrentExclusiveConsumer) of +new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer, + _SingleActiveConsumerIsOn = true, Consumers) -> + case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, CurrentSingleActiveConsumer) of true -> case rabbit_queue_consumers:get_consumer(Consumers) of undefined -> none; Consumer -> Consumer end; false -> - CurrentExclusiveConsumer + CurrentSingleActiveConsumer end; -new_exclusive_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentExclusiveConsumer, - _ExclusiveConsumerIsOn = false, _Consumers) -> - case CurrentExclusiveConsumer of +new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer, + _SingleActiveConsumerIsOn = false, _Consumers) -> + case CurrentSingleActiveConsumer of {ChPid, ConsumerTag} -> none; - _ -> CurrentExclusiveConsumer + _ -> CurrentSingleActiveConsumer end. handle_cast(init, State) -> diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 074c32b387..f32d261d20 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -193,34 +193,34 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. -deliver(FetchFun, QName, State, ExclusiveConsumerIsOn, ExclusiveConsumer) -> - deliver(FetchFun, QName, false, State, ExclusiveConsumerIsOn, ExclusiveConsumer). +deliver(FetchFun, QName, State, SingleActiveConsumerIsOn, ActiveConsumer) -> + deliver(FetchFun, QName, false, State, SingleActiveConsumerIsOn, ActiveConsumer). deliver(_FetchFun, _QName, false, State, true, none) -> {undelivered, false, State#state{use = update_use(State#state.use, inactive)}}; -deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, ExclusiveConsumer) -> - {ChPid, Consumer} = ExclusiveConsumer, +deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, SingleActiveConsumer) -> + {ChPid, Consumer} = SingleActiveConsumer, %% blocked consumers are removed from the queue state, but not the exclusive_consumer field, %% so we need to do this check to avoid adding the exclusive consumer to the channel record %% over and over - case is_blocked(ExclusiveConsumer) of + case is_blocked(SingleActiveConsumer) of true -> {undelivered, false, State#state{use = update_use(State#state.use, inactive)}}; false -> - case deliver_to_consumer(FetchFun, ExclusiveConsumer, QName) of + case deliver_to_consumer(FetchFun, SingleActiveConsumer, QName) of {delivered, R} -> {delivered, false, R, State}; undelivered -> - {ChPid, Consumer} = ExclusiveConsumer, + {ChPid, Consumer} = SingleActiveConsumer, Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers), {undelivered, true, State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}} end end; deliver(FetchFun, QName, ConsumersChanged, - State = #state{consumers = Consumers}, false, _ExclusiveConsumer) -> + State = #state{consumers = Consumers}, false, _SingleActiveConsumer) -> case priority_queue:out_p(Consumers) of {empty, _} -> {undelivered, ConsumersChanged, @@ -233,7 +233,7 @@ deliver(FetchFun, QName, ConsumersChanged, Tail)}}; undelivered -> deliver(FetchFun, QName, true, - State#state{consumers = Tail}, false, _ExclusiveConsumer) + State#state{consumers = Tail}, false, _SingleActiveConsumer) end end. diff --git a/test/exclusive_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl index 1ba05b9841..9670744b2b 100644 --- a/test/exclusive_consumer_SUITE.erl +++ b/test/single_active_consumer_SUITE.erl @@ -14,7 +14,7 @@ %% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. %% --module(exclusive_consumer_SUITE). +-module(single_active_consumer_SUITE). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). |
