summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-12-14 18:09:30 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-12-14 18:09:30 +0100
commitb4d354546d44799f9cdab979c02eecb4245e67ad (patch)
tree91f036e9ee19aa726c9af1d93c95014ecd3dae25 /src
parent6bec963773f5687c9fa24a7ce902d2bc9fa272c2 (diff)
downloadrabbitmq-server-git-b4d354546d44799f9cdab979c02eecb4245e67ad.tar.gz
Support single active consumer in quorum queue
Uses a buffer list for non-active consumers. The active consumer is stored in the usual consumers structure, so the logic around servicing consumers is kept the same. [#162582065] Fixes #1799
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl102
-rw-r--r--src/rabbit_quorum_queue.erl12
2 files changed, 98 insertions, 16 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 2d5c267227..867e291d65 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -226,7 +226,12 @@
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
- PrefixMsgs :: non_neg_integer()}
+ PrefixMsgs :: non_neg_integer()},
+ %% whether single active consumer is on or not for this queue
+ single_active_consumer_on = false :: boolean(),
+ %% waiting consumers, one is picked active consumer is cancelled or dies
+ %% used only when single active consumer is on
+ waiting_consumers = [] :: list()
}).
-opaque state() :: #state{}.
@@ -262,9 +267,11 @@ update_state(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
+ SingleActiveConsumerOn = maps:get(single_active_consumer_on, Conf, false),
State#state{dead_letter_handler = DLH,
become_leader_handler = BLH,
- shadow_copy_interval = SHI}.
+ shadow_copy_interval = SHI,
+ single_active_consumer_on = SingleActiveConsumerOn}.
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
@@ -667,6 +674,54 @@ num_checked_out(#state{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
+ {Effects0, #state{single_active_consumer_on = false} = S0}) ->
+ %% general case, single active consumer off
+ cancel_consumer0(ConsumerId, {Effects0, S0});
+cancel_consumer(ConsumerId,
+ {Effects0, #state{single_active_consumer_on = true,
+ waiting_consumers = WaitingConsumers } = S0}) when length(WaitingConsumers) == 0 ->
+ %% single active consumer on, no consumers are waiting
+ cancel_consumer0(ConsumerId, {Effects0, S0});
+cancel_consumer(ConsumerId,
+ {Effects0, #state{consumers = Cons0,
+ single_active_consumer_on = true,
+ waiting_consumers = WaitingConsumers0 } = State0}) ->
+ %% single active consumer on, consumers are waiting
+ case maps:take(ConsumerId, Cons0) of
+ {_CurrentActiveConsumer, _} ->
+ % The active consumer is to be removed
+ % Cancel it
+ {Effects1, State1} = case maps:take(ConsumerId, Cons0) of
+ {#consumer{checked_out = Checked0}, _} ->
+ S = return_all(State0, Checked0),
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
+ {Effects, State0};
+ error ->
+ {Effects0, State0}
+ end,
+ % Take another one from the waiting consumers and put it in consumers
+ {NewActiveConsumerId, NewActiveConsumer} = lists:nth(1, WaitingConsumers0),
+ WaitingConsumers1 = lists:delete({NewActiveConsumerId, NewActiveConsumer}, WaitingConsumers0),
+ #state{service_queue = ServiceQueue} = State0,
+ ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue),
+ State2 = State1#state{consumers = #{NewActiveConsumerId => NewActiveConsumer},
+ service_queue = ServiceQueue1,
+ waiting_consumers = WaitingConsumers1},
+ {Effects1, State2};
+ error ->
+ % The cancelled consumer is not the active one
+ % Just remove it from idle_consumers
+ case lists:keytake(ConsumerId, 1, WaitingConsumers0) of
+ {value, {ConsumerId, #consumer{checked_out = Checked0}}, WaitingConsumers1} ->
+ S = return_all(State0, Checked0),
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
+ {Effects, #state{waiting_consumers = WaitingConsumers1}};
+ false ->
+ {Effects0, State0}
+ end
+ end.
+
+cancel_consumer0(ConsumerId,
{Effects0, #state{consumers = C0} = S0}) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
@@ -677,7 +732,7 @@ cancel_consumer(ConsumerId,
{[{aux, inactive} | Effects], S#state{consumers = Cons}};
_ ->
{Effects, S#state{consumers = Cons}}
- end;
+ end;
error ->
% already removed - do nothing
{Effects0, S0}
@@ -1053,23 +1108,42 @@ uniq_queue_in(Key, Queue) ->
end.
-update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
+update_consumer(ConsumerId, Meta, Spec,
+ #state{single_active_consumer_on = false} = State0) ->
+ %% general case, single active consumer off
+ update_consumer0(ConsumerId, Meta, Spec, State0);
+update_consumer(ConsumerId, Meta, Spec,
#state{consumers = Cons0,
- service_queue = ServiceQueue0} = State0) ->
+ single_active_consumer_on = true} = State0) when map_size(Cons0) == 0 ->
+ %% single active consumer on, no one is consuming yet
+ update_consumer0(ConsumerId, Meta, Spec, State0);
+update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
+ #state{single_active_consumer_on = true,
+ waiting_consumers = WaitingConsumers0} = State0) ->
+ %% single active consumer on and one active consumer already
+ %% adding the new consumer to the waiting list
+ Consumer = #consumer{lifetime = Life, meta = Meta,
+ credit = Credit, credit_mode = Mode},
+ WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
+ State0#state{waiting_consumers = WaitingConsumers1}.
+
+update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
+ #state{consumers = Cons0,
+ service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
Init = #consumer{lifetime = Life, meta = Meta,
credit = Credit, credit_mode = Mode},
Cons = maps:update_with(ConsumerId,
- fun(S) ->
- %% remove any in-flight messages from
- %% the credit update
- N = maps:size(S#consumer.checked_out),
- C = max(0, Credit - N),
- S#consumer{lifetime = Life,
- credit = C}
- end, Init, Cons0),
+ fun(S) ->
+ %% remove any in-flight messages from
+ %% the credit update
+ N = maps:size(S#consumer.checked_out),
+ C = max(0, Credit - N),
+ S#consumer{lifetime = Life,
+ credit = C}
+ end, Init, Cons0),
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
- ServiceQueue0),
+ ServiceQueue0),
State0#state{consumers = Cons, service_queue = ServiceQueue}.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index bb8af13b9d..b6c3207ff4 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -37,7 +37,8 @@
-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
--include_lib("rabbit_common/include/rabbit.hrl").
+%%-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
-type ra_server_id() :: {Name :: atom(), Node :: node()}.
@@ -151,7 +152,14 @@ ra_machine_config(Q = #amqqueue{name = QName}) ->
#{dead_letter_handler => dlx_mfa(Q),
queue_resource => QName,
become_leader_handler => {?MODULE, become_leader, [QName]},
- metrics_handler => {?MODULE, update_metrics, [QName]}}.
+ metrics_handler => {?MODULE, update_metrics, [QName]},
+ single_active_consumer_on => single_active_consumer_on(Q)}.
+
+single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
+ case rabbit_misc:table_lookup(QArguments, <<"x-single-active-consumer">>) of
+ {bool, true} -> true;
+ _ -> false
+ end.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
Node = node(ChPid),