diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-08 11:15:52 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-08 11:15:52 +0100 |
| commit | 377661dbd1d608ec09ad81d5873944ab551f50d0 (patch) | |
| tree | 7b0debf2419ce3adea11c551217861082fd3798e /src | |
| parent | f841d7a34810a89055096c2ec6ddeb73fbf30cf8 (diff) | |
| download | rabbitmq-server-git-377661dbd1d608ec09ad81d5873944ab551f50d0.tar.gz | |
Address comments after single active consumer review
References #1799
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 74 |
1 files changed, 39 insertions, 35 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 706094037f..33e843c7e9 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. %% -module(rabbit_fifo). @@ -178,6 +178,8 @@ suspected_down = false :: boolean() }). +-type consumer() :: #consumer{}. + -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list @@ -234,10 +236,10 @@ msg_bytes_enqueue = 0 :: non_neg_integer(), msg_bytes_checkout = 0 :: non_neg_integer(), %% whether single active consumer is on or not for this queue - single_active_consumer_on = false :: boolean(), + consumer_strategy = default :: default | single_active, %% waiting consumers, one is picked active consumer is cancelled or dies %% used only when single active consumer is on - waiting_consumers = [] :: list() + waiting_consumers = [] :: [{consumer_id(), consumer()}] }). -opaque state() :: #state{}. @@ -246,7 +248,8 @@ queue_resource := rabbit_types:r('queue'), dead_letter_handler => applied_mfa(), become_leader_handler => applied_mfa(), - shadow_copy_interval => non_neg_integer()}. + shadow_copy_interval => non_neg_integer(), + single_active_consumer_on => boolean()}. -export_type([protocol/0, delivery/0, @@ -273,11 +276,16 @@ update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), - SingleActiveConsumerOn = maps:get(single_active_consumer_on, Conf, false), + ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of + true -> + single_active; + false -> + default + end, State#state{dead_letter_handler = DLH, become_leader_handler = BLH, shadow_copy_interval = SHI, - single_active_consumer_on = SingleActiveConsumerOn}. + consumer_strategy = ConsumerStrategy}. zero(_) -> 0. @@ -705,51 +713,47 @@ num_checked_out(#state{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - {Effects0, #state{single_active_consumer_on = false} = S0}) -> + {Effects0, #state{consumer_strategy = default} = S0}) -> %% general case, single active consumer off cancel_consumer0(ConsumerId, {Effects0, S0}); cancel_consumer(ConsumerId, - {Effects0, #state{single_active_consumer_on = true, - waiting_consumers = WaitingConsumers } = S0}) when length(WaitingConsumers) == 0 -> + {Effects0, #state{consumer_strategy = single_active, + waiting_consumers = [] } = S0}) -> %% single active consumer on, no consumers are waiting cancel_consumer0(ConsumerId, {Effects0, S0}); cancel_consumer(ConsumerId, - {Effects0, #state{consumers = Cons0, - single_active_consumer_on = true, - waiting_consumers = WaitingConsumers0 } = State0}) -> + {Effects0, #state{consumers = Cons0, + consumer_strategy = single_active, + waiting_consumers = WaitingConsumers0 } = State0}) -> %% single active consumer on, consumers are waiting case maps:take(ConsumerId, Cons0) of {_CurrentActiveConsumer, _} -> % The active consumer is to be removed % Cancel it {Effects1, State1} = case maps:take(ConsumerId, Cons0) of - {#consumer{checked_out = Checked0}, _} -> - S = return_all(State0, Checked0), - Effects = cancel_consumer_effects(ConsumerId, S, Effects0), - {Effects, State0}; - error -> - {Effects0, State0} - end, + {#consumer{checked_out = Checked0}, _} -> + S = return_all(State0, Checked0), + Effects = cancel_consumer_effects(ConsumerId, S, Effects0), + {Effects, State0}; + error -> + {Effects0, State0} + end, % Take another one from the waiting consumers and put it in consumers - {NewActiveConsumerId, NewActiveConsumer} = lists:nth(1, WaitingConsumers0), - WaitingConsumers1 = lists:delete({NewActiveConsumerId, NewActiveConsumer}, WaitingConsumers0), + [{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0, #state{service_queue = ServiceQueue} = State0, ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue), State2 = State1#state{consumers = #{NewActiveConsumerId => NewActiveConsumer}, service_queue = ServiceQueue1, - waiting_consumers = WaitingConsumers1}, + waiting_consumers = RemainingWaitingConsumers}, {Effects1, State2}; error -> % The cancelled consumer is not the active one % Just remove it from idle_consumers - case lists:keytake(ConsumerId, 1, WaitingConsumers0) of - {value, {ConsumerId, #consumer{checked_out = Checked0}}, WaitingConsumers1} -> - S = return_all(State0, Checked0), - Effects = cancel_consumer_effects(ConsumerId, S, Effects0), - {Effects, #state{waiting_consumers = WaitingConsumers1}}; - false -> - {Effects0, State0} - end + {value, {ConsumerId, #consumer{checked_out = Checked0}}, WaitingConsumers1} = + lists:keytake(ConsumerId, 1, WaitingConsumers0), + S = return_all(State0, Checked0), + Effects = cancel_consumer_effects(ConsumerId, S, Effects0), + {Effects, #state{waiting_consumers = WaitingConsumers1}} end. cancel_consumer0(ConsumerId, @@ -1149,17 +1153,17 @@ uniq_queue_in(Key, Queue) -> update_consumer(ConsumerId, Meta, Spec, - #state{single_active_consumer_on = false} = State0) -> + #state{consumer_strategy = default} = State0) -> %% general case, single active consumer off update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, Spec, - #state{consumers = Cons0, - single_active_consumer_on = true} = State0) when map_size(Cons0) == 0 -> + #state{consumers = Cons0, + consumer_strategy = single_active} = State0) when map_size(Cons0) == 0 -> %% single active consumer on, no one is consuming yet update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, - #state{single_active_consumer_on = true, - waiting_consumers = WaitingConsumers0} = State0) -> + #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers0} = State0) -> %% single active consumer on and one active consumer already %% adding the new consumer to the waiting list Consumer = #consumer{lifetime = Life, meta = Meta, |
