diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 102 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 12 |
2 files changed, 98 insertions, 16 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 2d5c267227..867e291d65 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -226,7 +226,12 @@ %% This is done so that consumers are still served in a deterministic %% order on recovery. prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(), - PrefixMsgs :: non_neg_integer()} + PrefixMsgs :: non_neg_integer()}, + %% whether single active consumer is on or not for this queue + single_active_consumer_on = false :: boolean(), + %% waiting consumers, one is picked active consumer is cancelled or dies + %% used only when single active consumer is on + waiting_consumers = [] :: list() }). -opaque state() :: #state{}. @@ -262,9 +267,11 @@ update_state(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), + SingleActiveConsumerOn = maps:get(single_active_consumer_on, Conf, false), State#state{dead_letter_handler = DLH, become_leader_handler = BLH, - shadow_copy_interval = SHI}. + shadow_copy_interval = SHI, + single_active_consumer_on = SingleActiveConsumerOn}. % msg_ids are scoped per consumer % ra_indexes holds all raft indexes for enqueues currently on queue @@ -667,6 +674,54 @@ num_checked_out(#state{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, + {Effects0, #state{single_active_consumer_on = false} = S0}) -> + %% general case, single active consumer off + cancel_consumer0(ConsumerId, {Effects0, S0}); +cancel_consumer(ConsumerId, + {Effects0, #state{single_active_consumer_on = true, + waiting_consumers = WaitingConsumers } = S0}) when length(WaitingConsumers) == 0 -> + %% single active consumer on, no consumers are waiting + cancel_consumer0(ConsumerId, {Effects0, S0}); +cancel_consumer(ConsumerId, + {Effects0, #state{consumers = Cons0, + single_active_consumer_on = true, + waiting_consumers = WaitingConsumers0 } = State0}) -> + %% single active consumer on, consumers are waiting + case maps:take(ConsumerId, Cons0) of + {_CurrentActiveConsumer, _} -> + % The active consumer is to be removed + % Cancel it + {Effects1, State1} = case maps:take(ConsumerId, Cons0) of + {#consumer{checked_out = Checked0}, _} -> + S = return_all(State0, Checked0), + Effects = cancel_consumer_effects(ConsumerId, S, Effects0), + {Effects, State0}; + error -> + {Effects0, State0} + end, + % Take another one from the waiting consumers and put it in consumers + {NewActiveConsumerId, NewActiveConsumer} = lists:nth(1, WaitingConsumers0), + WaitingConsumers1 = lists:delete({NewActiveConsumerId, NewActiveConsumer}, WaitingConsumers0), + #state{service_queue = ServiceQueue} = State0, + ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue), + State2 = State1#state{consumers = #{NewActiveConsumerId => NewActiveConsumer}, + service_queue = ServiceQueue1, + waiting_consumers = WaitingConsumers1}, + {Effects1, State2}; + error -> + % The cancelled consumer is not the active one + % Just remove it from idle_consumers + case lists:keytake(ConsumerId, 1, WaitingConsumers0) of + {value, {ConsumerId, #consumer{checked_out = Checked0}}, WaitingConsumers1} -> + S = return_all(State0, Checked0), + Effects = cancel_consumer_effects(ConsumerId, S, Effects0), + {Effects, #state{waiting_consumers = WaitingConsumers1}}; + false -> + {Effects0, State0} + end + end. + +cancel_consumer0(ConsumerId, {Effects0, #state{consumers = C0} = S0}) -> case maps:take(ConsumerId, C0) of {#consumer{checked_out = Checked0}, Cons} -> @@ -677,7 +732,7 @@ cancel_consumer(ConsumerId, {[{aux, inactive} | Effects], S#state{consumers = Cons}}; _ -> {Effects, S#state{consumers = Cons}} - end; + end; error -> % already removed - do nothing {Effects0, S0} @@ -1053,23 +1108,42 @@ uniq_queue_in(Key, Queue) -> end. -update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, +update_consumer(ConsumerId, Meta, Spec, + #state{single_active_consumer_on = false} = State0) -> + %% general case, single active consumer off + update_consumer0(ConsumerId, Meta, Spec, State0); +update_consumer(ConsumerId, Meta, Spec, #state{consumers = Cons0, - service_queue = ServiceQueue0} = State0) -> + single_active_consumer_on = true} = State0) when map_size(Cons0) == 0 -> + %% single active consumer on, no one is consuming yet + update_consumer0(ConsumerId, Meta, Spec, State0); +update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, + #state{single_active_consumer_on = true, + waiting_consumers = WaitingConsumers0} = State0) -> + %% single active consumer on and one active consumer already + %% adding the new consumer to the waiting list + Consumer = #consumer{lifetime = Life, meta = Meta, + credit = Credit, credit_mode = Mode}, + WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}], + State0#state{waiting_consumers = WaitingConsumers1}. + +update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, + #state{consumers = Cons0, + service_queue = ServiceQueue0} = State0) -> %% TODO: this logic may not be correct for updating a pre-existing consumer Init = #consumer{lifetime = Life, meta = Meta, credit = Credit, credit_mode = Mode}, Cons = maps:update_with(ConsumerId, - fun(S) -> - %% remove any in-flight messages from - %% the credit update - N = maps:size(S#consumer.checked_out), - C = max(0, Credit - N), - S#consumer{lifetime = Life, - credit = C} - end, Init, Cons0), + fun(S) -> + %% remove any in-flight messages from + %% the credit update + N = maps:size(S#consumer.checked_out), + C = max(0, Credit - N), + S#consumer{lifetime = Life, + credit = C} + end, Init, Cons0), ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), - ServiceQueue0), + ServiceQueue0), State0#state{consumers = Cons, service_queue = ServiceQueue}. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index bb8af13b9d..b6c3207ff4 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -37,7 +37,8 @@ -export([policy_changed/2]). -export([cleanup_data_dir/0]). --include_lib("rabbit_common/include/rabbit.hrl"). +%%-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). -type ra_server_id() :: {Name :: atom(), Node :: node()}. @@ -151,7 +152,14 @@ ra_machine_config(Q = #amqqueue{name = QName}) -> #{dead_letter_handler => dlx_mfa(Q), queue_resource => QName, become_leader_handler => {?MODULE, become_leader, [QName]}, - metrics_handler => {?MODULE, update_metrics, [QName]}}. + metrics_handler => {?MODULE, update_metrics, [QName]}, + single_active_consumer_on => single_active_consumer_on(Q)}. + +single_active_consumer_on(#amqqueue{arguments = QArguments}) -> + case rabbit_misc:table_lookup(QArguments, <<"x-single-active-consumer">>) of + {bool, true} -> true; + _ -> false + end. cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> Node = node(ChPid), |
