summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-08 11:15:52 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-08 11:15:52 +0100
commit377661dbd1d608ec09ad81d5873944ab551f50d0 (patch)
tree7b0debf2419ce3adea11c551217861082fd3798e /src
parentf841d7a34810a89055096c2ec6ddeb73fbf30cf8 (diff)
downloadrabbitmq-server-git-377661dbd1d608ec09ad81d5873944ab551f50d0.tar.gz
Address comments after single active consumer review
References #1799
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl74
1 files changed, 39 insertions, 35 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 706094037f..33e843c7e9 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_fifo).
@@ -178,6 +178,8 @@
suspected_down = false :: boolean()
}).
+-type consumer() :: #consumer{}.
+
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
@@ -234,10 +236,10 @@
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer(),
%% whether single active consumer is on or not for this queue
- single_active_consumer_on = false :: boolean(),
+ consumer_strategy = default :: default | single_active,
%% waiting consumers, one is picked active consumer is cancelled or dies
%% used only when single active consumer is on
- waiting_consumers = [] :: list()
+ waiting_consumers = [] :: [{consumer_id(), consumer()}]
}).
-opaque state() :: #state{}.
@@ -246,7 +248,8 @@
queue_resource := rabbit_types:r('queue'),
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
- shadow_copy_interval => non_neg_integer()}.
+ shadow_copy_interval => non_neg_integer(),
+ single_active_consumer_on => boolean()}.
-export_type([protocol/0,
delivery/0,
@@ -273,11 +276,16 @@ update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
- SingleActiveConsumerOn = maps:get(single_active_consumer_on, Conf, false),
+ ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
+ true ->
+ single_active;
+ false ->
+ default
+ end,
State#state{dead_letter_handler = DLH,
become_leader_handler = BLH,
shadow_copy_interval = SHI,
- single_active_consumer_on = SingleActiveConsumerOn}.
+ consumer_strategy = ConsumerStrategy}.
zero(_) ->
0.
@@ -705,51 +713,47 @@ num_checked_out(#state{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
- {Effects0, #state{single_active_consumer_on = false} = S0}) ->
+ {Effects0, #state{consumer_strategy = default} = S0}) ->
%% general case, single active consumer off
cancel_consumer0(ConsumerId, {Effects0, S0});
cancel_consumer(ConsumerId,
- {Effects0, #state{single_active_consumer_on = true,
- waiting_consumers = WaitingConsumers } = S0}) when length(WaitingConsumers) == 0 ->
+ {Effects0, #state{consumer_strategy = single_active,
+ waiting_consumers = [] } = S0}) ->
%% single active consumer on, no consumers are waiting
cancel_consumer0(ConsumerId, {Effects0, S0});
cancel_consumer(ConsumerId,
- {Effects0, #state{consumers = Cons0,
- single_active_consumer_on = true,
- waiting_consumers = WaitingConsumers0 } = State0}) ->
+ {Effects0, #state{consumers = Cons0,
+ consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0 } = State0}) ->
%% single active consumer on, consumers are waiting
case maps:take(ConsumerId, Cons0) of
{_CurrentActiveConsumer, _} ->
% The active consumer is to be removed
% Cancel it
{Effects1, State1} = case maps:take(ConsumerId, Cons0) of
- {#consumer{checked_out = Checked0}, _} ->
- S = return_all(State0, Checked0),
- Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
- {Effects, State0};
- error ->
- {Effects0, State0}
- end,
+ {#consumer{checked_out = Checked0}, _} ->
+ S = return_all(State0, Checked0),
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
+ {Effects, State0};
+ error ->
+ {Effects0, State0}
+ end,
% Take another one from the waiting consumers and put it in consumers
- {NewActiveConsumerId, NewActiveConsumer} = lists:nth(1, WaitingConsumers0),
- WaitingConsumers1 = lists:delete({NewActiveConsumerId, NewActiveConsumer}, WaitingConsumers0),
+ [{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0,
#state{service_queue = ServiceQueue} = State0,
ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue),
State2 = State1#state{consumers = #{NewActiveConsumerId => NewActiveConsumer},
service_queue = ServiceQueue1,
- waiting_consumers = WaitingConsumers1},
+ waiting_consumers = RemainingWaitingConsumers},
{Effects1, State2};
error ->
% The cancelled consumer is not the active one
% Just remove it from idle_consumers
- case lists:keytake(ConsumerId, 1, WaitingConsumers0) of
- {value, {ConsumerId, #consumer{checked_out = Checked0}}, WaitingConsumers1} ->
- S = return_all(State0, Checked0),
- Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
- {Effects, #state{waiting_consumers = WaitingConsumers1}};
- false ->
- {Effects0, State0}
- end
+ {value, {ConsumerId, #consumer{checked_out = Checked0}}, WaitingConsumers1} =
+ lists:keytake(ConsumerId, 1, WaitingConsumers0),
+ S = return_all(State0, Checked0),
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
+ {Effects, #state{waiting_consumers = WaitingConsumers1}}
end.
cancel_consumer0(ConsumerId,
@@ -1149,17 +1153,17 @@ uniq_queue_in(Key, Queue) ->
update_consumer(ConsumerId, Meta, Spec,
- #state{single_active_consumer_on = false} = State0) ->
+ #state{consumer_strategy = default} = State0) ->
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
- #state{consumers = Cons0,
- single_active_consumer_on = true} = State0) when map_size(Cons0) == 0 ->
+ #state{consumers = Cons0,
+ consumer_strategy = single_active} = State0) when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
- #state{single_active_consumer_on = true,
- waiting_consumers = WaitingConsumers0} = State0) ->
+ #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
Consumer = #consumer{lifetime = Life, meta = Meta,