summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2019-01-09 03:03:30 +0300
committerGitHub <noreply@github.com>2019-01-09 03:03:30 +0300
commita375473f3d612b0e8a0db95c9e3805229e3aa868 (patch)
treee0f012ac50694c3a6c9115aed2cd7615112f9090
parent267ee35abb51081a3a0a676b98d22b868720cc95 (diff)
parentd21db02c75a732839ce90ad2bd587127905b975c (diff)
downloadrabbitmq-server-git-a375473f3d612b0e8a0db95c9e3805229e3aa868.tar.gz
Merge pull request #1802 from rabbitmq/rabbitmq-server-1799-single-active-consumer-in-qq
Single active consumer
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl202
-rw-r--r--src/rabbit_fifo.erl177
-rw-r--r--src/rabbit_queue_consumers.erl75
-rw-r--r--src/rabbit_quorum_queue.erl12
-rw-r--r--test/single_active_consumer_SUITE.erl286
-rw-r--r--test/unit_queue_consumers_SUITE.erl102
7 files changed, 762 insertions, 99 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f0d5e825d1..c65ad299ed 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -656,6 +656,7 @@ declare_args() ->
{<<"x-max-priority">>, fun check_max_priority_arg/2},
{<<"x-overflow">>, fun check_overflow/2},
{<<"x-queue-mode">>, fun check_queue_mode/2},
+ {<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2},
{<<"x-queue-type">>, fun check_queue_type/2},
{<<"x-quorum-initial-group-size">>, fun check_default_quorum_initial_group_size_arg/2}].
@@ -698,6 +699,12 @@ check_max_priority_arg({Type, Val}, Args) ->
Error -> Error
end.
+check_single_active_consumer_arg({Type, Val}, Args) ->
+ case check_bool_arg({Type, Val}, Args) of
+ ok -> ok;
+ Error -> Error
+ end.
+
check_default_quorum_initial_group_size_arg({Type, Val}, Args) ->
case check_non_neg_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 52925ce165..37ee8d0a15 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2007-2018 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_process).
@@ -36,8 +36,8 @@
-record(q, {
%% an #amqqueue record
q,
- %% none | {exclusive consumer channel PID, consumer tag}
- exclusive_consumer,
+ %% none | {exclusive consumer channel PID, consumer tag} | {single active consumer channel PID, consumer}
+ active_consumer,
%% Set to true if a queue has ever had a consumer.
%% This is used to determine when to delete auto-delete queues.
has_had_consumers,
@@ -94,7 +94,9 @@
%% example.
mirroring_policy_version = 0,
%% running | flow | idle
- status
+ status,
+ %% true | false
+ single_active_consumer_on
}).
%%----------------------------------------------------------------------------
@@ -155,15 +157,20 @@ init(Q) ->
?MODULE}.
init_state(Q) ->
- State = #q{q = Q,
- exclusive_consumer = none,
- has_had_consumers = false,
- consumers = rabbit_queue_consumers:new(),
- senders = pmon:new(delegate),
- msg_id_to_channel = gb_trees:empty(),
- status = running,
- args_policy_version = 0,
- overflow = 'drop-head'},
+ SingleActiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-single-active-consumer">>) of
+ {bool, true} -> true;
+ _ -> false
+ end,
+ State = #q{q = Q,
+ active_consumer = none,
+ has_had_consumers = false,
+ consumers = rabbit_queue_consumers:new(),
+ senders = pmon:new(delegate),
+ msg_id_to_channel = gb_trees:empty(),
+ status = running,
+ args_policy_version = 0,
+ overflow = 'drop-head',
+ single_active_consumer_on = SingleActiveConsumerOn},
rabbit_event:init_stats_timer(State, #q.stats_timer).
init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@@ -545,7 +552,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(State = #q{consumers = Consumers}) ->
+assert_invariant(#q{single_active_consumer_on = true}) ->
+ %% queue may contain messages and have available consumers with exclusive consumer
+ ok;
+assert_invariant(State = #q{consumers = Consumers, single_active_consumer_on = false}) ->
true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)).
is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
@@ -619,7 +629,8 @@ run_message_queue(ActiveConsumersChanged, State) ->
true -> maybe_notify_decorators(ActiveConsumersChanged, State);
false -> case rabbit_queue_consumers:deliver(
fun(AckRequired) -> fetch(AckRequired, State) end,
- qname(State), State#q.consumers) of
+ qname(State), State#q.consumers,
+ State#q.single_active_consumer_on, State#q.active_consumer) of
{delivered, ActiveConsumersChanged1, State1, Consumers} ->
run_message_queue(
ActiveConsumersChanged or ActiveConsumersChanged1,
@@ -645,7 +656,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
{{Message, Delivered, AckTag}, {BQS1, MTC}};
(false) -> {{Message, Delivered, undefined},
discard(Delivery, BQ, BQS, MTC)}
- end, qname(State), State#q.consumers) of
+ end, qname(State), State#q.consumers, State#q.single_active_consumer_on, State#q.active_consumer) of
{delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} ->
{delivered, maybe_notify_decorators(
ActiveConsumersChanged,
@@ -814,9 +825,10 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{consumers = Consumers,
- exclusive_consumer = Holder,
- senders = Senders}) ->
+handle_ch_down(DownPid, State = #q{consumers = Consumers,
+ active_consumer = Holder,
+ single_active_consumer_on = SingleActiveConsumerOn,
+ senders = Senders}) ->
State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
false ->
Senders;
@@ -840,12 +852,9 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
{ChAckTags, ChCTags, Consumers1} ->
QName = qname(State1),
[emit_consumer_deleted(DownPid, CTag, QName, ?INTERNAL_USER) || CTag <- ChCTags],
- Holder1 = case Holder of
- {DownPid, _} -> none;
- Other -> Other
- end,
+ Holder1 = new_single_active_consumer_after_channel_down(DownPid, Holder, SingleActiveConsumerOn, Consumers1),
State2 = State1#q{consumers = Consumers1,
- exclusive_consumer = Holder1},
+ active_consumer = Holder1},
notify_decorators(State2),
case should_auto_delete(State2) of
true ->
@@ -860,6 +869,22 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
end
end.
+new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = true, Consumers) ->
+ case CurrentSingleActiveConsumer of
+ {DownChPid, _} ->
+ case rabbit_queue_consumers:get_consumer(Consumers) of
+ undefined -> none;
+ Consumer -> Consumer
+ end;
+ false ->
+ CurrentSingleActiveConsumer
+ end;
+new_single_active_consumer_after_channel_down(DownChPid, CurrentSingleActiveConsumer, _SingleActiveConsumerIsOn = false, _Consumers) ->
+ case CurrentSingleActiveConsumer of
+ {DownChPid, _} -> none;
+ Other -> Other
+ end.
+
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -1007,14 +1032,14 @@ i(effective_policy_definition, #q{q = Q}) ->
undefined -> [];
Def -> Def
end;
-i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
- '';
-i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
+i(exclusive_consumer_pid, #q{active_consumer = {ChPid, _ConsumerTag}, single_active_consumer_on = false}) ->
ChPid;
-i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
+i(exclusive_consumer_pid, _) ->
'';
-i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
+i(exclusive_consumer_tag, #q{active_consumer = {_ChPid, ConsumerTag}, single_active_consumer_on = false}) ->
ConsumerTag;
+i(exclusive_consumer_tag, _) ->
+ '';
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
@@ -1213,49 +1238,81 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
- _From, State = #q{consumers = Consumers,
- exclusive_consumer = Holder}) ->
- case check_exclusive_access(Holder, ExclusiveConsume, State) of
- in_use -> reply({error, exclusive_consume_unavailable}, State);
- ok -> Consumers1 = rabbit_queue_consumers:add(
- ChPid, ConsumerTag, NoAck,
- LimiterPid, LimiterActive,
- PrefetchCount, Args, is_empty(State),
- ActingUser, Consumers),
- ExclusiveConsumer =
- if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> Holder
- end,
- State1 = State#q{consumers = Consumers1,
- has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- QName = qname(State1),
- AckRequired = not NoAck,
- rabbit_core_metrics:consumer_created(
- ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
- PrefetchCount, Args),
- emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- AckRequired, QName, PrefetchCount,
- Args, none, ActingUser),
- notify_decorators(State1),
- reply(ok, run_message_queue(State1))
+ _From, State = #q{consumers = Consumers,
+ active_consumer = Holder,
+ single_active_consumer_on = SingleActiveConsumerOn}) ->
+ ConsumerRegistration = case SingleActiveConsumerOn of
+ true ->
+ case ExclusiveConsume of
+ true ->
+ {error, reply({error, exclusive_consume_unavailable}, State)};
+ false ->
+ Consumers1 = rabbit_queue_consumers:add(
+ ChPid, ConsumerTag, NoAck,
+ LimiterPid, LimiterActive,
+ PrefetchCount, Args, is_empty(State),
+ ActingUser, Consumers),
+
+ case Holder of
+ none ->
+ NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
+ {state, State#q{consumers = Consumers1,
+ has_had_consumers = true,
+ active_consumer = NewConsumer}};
+ _ ->
+ {state, State#q{consumers = Consumers1,
+ has_had_consumers = true}}
+ end
+ end;
+ false ->
+ case check_exclusive_access(Holder, ExclusiveConsume, State) of
+ in_use -> {error, reply({error, exclusive_consume_unavailable}, State)};
+ ok ->
+ Consumers1 = rabbit_queue_consumers:add(
+ ChPid, ConsumerTag, NoAck,
+ LimiterPid, LimiterActive,
+ PrefetchCount, Args, is_empty(State),
+ ActingUser, Consumers),
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> Holder
+ end,
+ {state, State#q{consumers = Consumers1,
+ has_had_consumers = true,
+ active_consumer = ExclusiveConsumer}}
+ end
+ end,
+ case ConsumerRegistration of
+ {error, Reply} ->
+ Reply;
+ {state, State1} ->
+ ok = maybe_send_reply(ChPid, OkMsg),
+ QName = qname(State1),
+ AckRequired = not NoAck,
+ rabbit_core_metrics:consumer_created(
+ ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
+ PrefetchCount, Args),
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ AckRequired, QName, PrefetchCount,
+ Args, none, ActingUser),
+ notify_decorators(State1),
+ reply(ok, run_message_queue(State1))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From,
- State = #q{consumers = Consumers,
- exclusive_consumer = Holder}) ->
+ State = #q{consumers = Consumers,
+ active_consumer = Holder,
+ single_active_consumer_on = SingleActiveConsumerOn }) ->
ok = maybe_send_reply(ChPid, OkMsg),
case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of
not_found ->
reply(ok, State);
Consumers1 ->
- Holder1 = case Holder of
- {ChPid, ConsumerTag} -> none;
- _ -> Holder
- end,
+ Holder1 = new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag,
+ Holder, SingleActiveConsumerOn, Consumers1
+ ),
State1 = State#q{consumers = Consumers1,
- exclusive_consumer = Holder1},
+ active_consumer = Holder1},
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser),
notify_decorators(State1),
case should_auto_delete(State1) of
@@ -1325,6 +1382,24 @@ handle_call(sync_mirrors, _From, State) ->
handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State).
+new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer,
+ _SingleActiveConsumerIsOn = true, Consumers) ->
+ case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, CurrentSingleActiveConsumer) of
+ true ->
+ case rabbit_queue_consumers:get_consumer(Consumers) of
+ undefined -> none;
+ Consumer -> Consumer
+ end;
+ false ->
+ CurrentSingleActiveConsumer
+ end;
+new_single_active_consumer_after_basic_cancel(ChPid, ConsumerTag, CurrentSingleActiveConsumer,
+ _SingleActiveConsumerIsOn = false, _Consumers) ->
+ case CurrentSingleActiveConsumer of
+ {ChPid, ConsumerTag} -> none;
+ _ -> CurrentSingleActiveConsumer
+ end.
+
handle_cast(init, State) ->
try
init_it({no_barrier, non_clean_shutdown}, none, State)
@@ -1432,7 +1507,6 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
{unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
run_message_queue(true, State1)
end);
-
handle_cast(notify_decorators, State) ->
notify_decorators(State),
noreply(State);
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index dcf8d071d5..1536cd1f51 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_fifo).
@@ -178,6 +178,8 @@
suspected_down = false :: boolean()
}).
+-type consumer() :: #consumer{}.
+
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
@@ -232,7 +234,12 @@
prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
PrefixMsgs :: non_neg_integer()},
msg_bytes_enqueue = 0 :: non_neg_integer(),
- msg_bytes_checkout = 0 :: non_neg_integer()
+ msg_bytes_checkout = 0 :: non_neg_integer(),
+ %% whether single active consumer is on or not for this queue
+ consumer_strategy = default :: default | single_active,
+ %% waiting consumers, one is picked active consumer is cancelled or dies
+ %% used only when single active consumer is on
+ waiting_consumers = [] :: [{consumer_id(), consumer()}]
}).
-opaque state() :: #state{}.
@@ -241,7 +248,8 @@
queue_resource := rabbit_types:r('queue'),
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
- shadow_copy_interval => non_neg_integer()}.
+ shadow_copy_interval => non_neg_integer(),
+ single_active_consumer_on => boolean()}.
-export_type([protocol/0,
delivery/0,
@@ -268,9 +276,16 @@ update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
+ ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
+ true ->
+ single_active;
+ false ->
+ default
+ end,
State#state{dead_letter_handler = DLH,
become_leader_handler = BLH,
- shadow_copy_interval = SHI}.
+ shadow_copy_interval = SHI,
+ consumer_strategy = ConsumerStrategy}.
zero(_) ->
0.
@@ -693,6 +708,42 @@ num_checked_out(#state{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
+ {Effects0, #state{consumer_strategy = default} = S0}) ->
+ %% general case, single active consumer off
+ cancel_consumer0(ConsumerId, {Effects0, S0});
+cancel_consumer(ConsumerId,
+ {Effects0, #state{consumer_strategy = single_active,
+ waiting_consumers = [] } = S0}) ->
+ %% single active consumer on, no consumers are waiting
+ cancel_consumer0(ConsumerId, {Effects0, S0});
+cancel_consumer(ConsumerId,
+ {Effects0, #state{consumers = Cons0,
+ consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0 } = State0}) ->
+ %% single active consumer on, consumers are waiting
+ case maps:take(ConsumerId, Cons0) of
+ {_CurrentActiveConsumer = #consumer{checked_out = Checked0}, _} ->
+ % The active consumer is to be removed
+ % Cancel it
+ S = return_all(State0, Checked0),
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
+ % Take another one from the waiting consumers and put it in consumers
+ [{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0,
+ #state{service_queue = ServiceQueue} = State0,
+ ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue),
+ State1 = State0#state{consumers = #{NewActiveConsumerId => NewActiveConsumer},
+ service_queue = ServiceQueue1,
+ waiting_consumers = RemainingWaitingConsumers},
+ {Effects, State1};
+ error ->
+ % The cancelled consumer is not the active one
+ % Just remove it from idle_consumers
+ {value, _Consumer, WaitingConsumers1} = lists:keytake(ConsumerId, 1, WaitingConsumers0),
+ % A waiting consumer isn't supposed to have any checked out messages, so nothing special to do here
+ {Effects0, State0#state{waiting_consumers = WaitingConsumers1}}
+ end.
+
+cancel_consumer0(ConsumerId,
{Effects0, #state{consumers = C0} = S0}) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
@@ -703,9 +754,9 @@ cancel_consumer(ConsumerId,
{[{aux, inactive} | Effects], S#state{consumers = Cons}};
_ ->
{Effects, S#state{consumers = Cons}}
- end;
+ end;
error ->
- % already removed - do nothing
+ %% already removed: do nothing
{Effects0, S0}
end.
@@ -1088,23 +1139,42 @@ uniq_queue_in(Key, Queue) ->
end.
+update_consumer(ConsumerId, Meta, Spec,
+ #state{consumer_strategy = default} = State0) ->
+ %% general case, single active consumer off
+ update_consumer0(ConsumerId, Meta, Spec, State0);
+update_consumer(ConsumerId, Meta, Spec,
+ #state{consumers = Cons0,
+ consumer_strategy = single_active} = State0) when map_size(Cons0) == 0 ->
+ %% single active consumer on, no one is consuming yet
+ update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
- #state{consumers = Cons0,
- service_queue = ServiceQueue0} = State0) ->
+ #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0} = State0) ->
+ %% single active consumer on and one active consumer already
+ %% adding the new consumer to the waiting list
+ Consumer = #consumer{lifetime = Life, meta = Meta,
+ credit = Credit, credit_mode = Mode},
+ WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
+ State0#state{waiting_consumers = WaitingConsumers1}.
+
+update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
+ #state{consumers = Cons0,
+ service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
Init = #consumer{lifetime = Life, meta = Meta,
credit = Credit, credit_mode = Mode},
Cons = maps:update_with(ConsumerId,
- fun(S) ->
- %% remove any in-flight messages from
- %% the credit update
- N = maps:size(S#consumer.checked_out),
- C = max(0, Credit - N),
- S#consumer{lifetime = Life,
- credit = C}
- end, Init, Cons0),
+ fun(S) ->
+ %% remove any in-flight messages from
+ %% the credit update
+ N = maps:size(S#consumer.checked_out),
+ C = max(0, Credit - N),
+ S#consumer{lifetime = Life,
+ credit = C}
+ end, Init, Cons0),
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
- ServiceQueue0),
+ ServiceQueue0),
State0#state{consumers = Cons, service_queue = ServiceQueue}.
@@ -1488,7 +1558,6 @@ cancelled_checkout_out_test() ->
{State3, {dequeue, {0, {_, first}}}, _} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
- ?debugFmt("State3 ~p", [State3]),
{_State, {dequeue, {_, {_, second}}}, _} =
apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3),
ok.
@@ -1686,8 +1755,7 @@ return_prefix_msg_count_test() ->
],
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
- {State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
- ?debugFmt("return_prefix_msg_count_test state ~n~p~n", [State]),
+ {_State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
ok.
@@ -1759,7 +1827,6 @@ run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
[begin
- ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
run_snapshot_test0(Name, C)
end || C <- prefixes(Commands, 1, [])].
@@ -1772,11 +1839,8 @@ run_snapshot_test0(Name, Commands) ->
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
- ?debugFmt("running from snapshot: ~b", [SnapIdx]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
- ?debugFmt("Name ~p~nS~p~nState~p~nn",
- [Name, S, State]),
?assertEqual(State, S)
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
@@ -1899,6 +1963,71 @@ down_returns_checked_out_in_order_test() ->
?assertEqual(lists:sort(Returns), Returns),
ok.
+single_active_consumer_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+ ?assertEqual(single_active, State0#state.consumer_strategy),
+ ?assertEqual(0, map_size(State0#state.consumers)),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, self()}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+
+ % the first registered consumer is the active one, the others are waiting
+ ?assertEqual(1, map_size(State1#state.consumers)),
+ ?assert(maps:is_key({<<"ctag1">>, self()}, State1#state.consumers)),
+ ?assertEqual(3, length(State1#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)),
+
+ % cancelling a waiting consumer
+ {State2, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
+ % the active consumer should still be in place
+ ?assertEqual(1, map_size(State2#state.consumers)),
+ ?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)),
+ % the cancelled consumer has been removed from waiting consumers
+ ?assertEqual(2, length(State2#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#state.waiting_consumers)),
+
+ % cancelling the active consumer
+ {State3, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
+ % the second registered consumer is now the active one
+ ?assertEqual(1, map_size(State3#state.consumers)),
+ ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
+ % the new active consumer is no longer in the waiting list
+ ?assertEqual(1, length(State3#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)),
+
+ % cancelling the active consumer
+ {State4, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
+ % the last waiting consumer became the active one
+ ?assertEqual(1, map_size(State4#state.consumers)),
+ ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
+ % the waiting consumer list is now empty
+ ?assertEqual(0, length(State4#state.waiting_consumers)),
+
+ % cancelling the last consumer
+ {State5, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
+ % no active consumer anymore
+ ?assertEqual(0, map_size(State5#state.consumers)),
+ % still nothing in the waiting list
+ ?assertEqual(0, length(State5#state.waiting_consumers)),
+
+ ok.
+
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 98582c8117..e743fbce18 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -11,17 +11,18 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
- send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
+ send_drained/0, deliver/5, record_ack/3, subtract_acks/3,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
- credit/6, utilisation/1]).
+ credit/6, utilisation/1, is_same/3, get_consumer/1, get/3,
+ consumer_tag/1]).
%%----------------------------------------------------------------------------
@@ -42,7 +43,7 @@
acktags,
consumer_count,
%% Queue of {ChPid, #consumer{}} for consumers which have
- %% been blocked for any reason
+ %% been blocked (rate/prefetch limited) for any reason
blocked_consumers,
%% The limiter itself
limiter,
@@ -57,6 +58,9 @@
use :: {'inactive',
time_micros(), time_micros(), ratio()} |
{'active', time_micros(), ratio()}}.
+-type consumer() :: #consumer{tag::rabbit_types:ctag(), ack_required::boolean(),
+ prefetch::non_neg_integer(), args::rabbit_framing:amqp_table(),
+ user::rabbit_types:username()}.
-type ch() :: pid().
-type ack() :: non_neg_integer().
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
@@ -81,7 +85,8 @@
state()}.
-spec send_drained() -> 'ok'.
-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
- rabbit_amqqueue:name(), state()) ->
+ rabbit_amqqueue:name(), state(), boolean(),
+ none | {ch(), rabbit_types:ctag()} | {ch(), consumer()}) ->
{'delivered', boolean(), T, state()} |
{'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
@@ -95,6 +100,7 @@
-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
state()) -> 'unchanged' | {'unblocked', state()}.
-spec utilisation(state()) -> ratio().
+-spec consumer_tag(consumer()) -> rabbit_types:ctag().
%%----------------------------------------------------------------------------
@@ -189,10 +195,34 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
-deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State).
-
+deliver(FetchFun, QName, State, SingleActiveConsumerIsOn, ActiveConsumer) ->
+ deliver(FetchFun, QName, false, State, SingleActiveConsumerIsOn, ActiveConsumer).
+
+deliver(_FetchFun, _QName, false, State, true, none) ->
+ {undelivered, false,
+ State#state{use = update_use(State#state.use, inactive)}};
+deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, SingleActiveConsumer) ->
+ {ChPid, Consumer} = SingleActiveConsumer,
+ %% blocked (rate/prefetch limited) consumers are removed from the queue state, but not the exclusive_consumer field,
+ %% so we need to do this check to avoid adding the exclusive consumer to the channel record
+ %% over and over
+ case is_blocked(SingleActiveConsumer) of
+ true ->
+ {undelivered, false,
+ State#state{use = update_use(State#state.use, inactive)}};
+ false ->
+ case deliver_to_consumer(FetchFun, SingleActiveConsumer, QName) of
+ {delivered, R} ->
+ {delivered, false, R, State};
+ undelivered ->
+ {ChPid, Consumer} = SingleActiveConsumer,
+ Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers),
+ {undelivered, true,
+ State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}}
+ end
+ end;
deliver(FetchFun, QName, ConsumersChanged,
- State = #state{consumers = Consumers}) ->
+ State = #state{consumers = Consumers}, false, _SingleActiveConsumer) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
{undelivered, ConsumersChanged,
@@ -205,7 +235,7 @@ deliver(FetchFun, QName, ConsumersChanged,
Tail)}};
undelivered ->
deliver(FetchFun, QName, true,
- State#state{consumers = Tail})
+ State#state{consumers = Tail}, false, _SingleActiveConsumer)
end
end.
@@ -246,6 +276,10 @@ deliver_to_consumer(FetchFun,
unsent_message_count = Count + 1}),
R.
+is_blocked(Consumer = {ChPid, _C}) ->
+ #cr{blocked_consumers = BlockedConsumers} = lookup_ch(ChPid),
+ priority_queue:member(Consumer, BlockedConsumers).
+
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}),
@@ -357,6 +391,29 @@ utilisation(#state{use = {active, Since, Avg}}) ->
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg).
+is_same(ChPid, ConsumerTag, {ChPid, #consumer{tag = ConsumerTag}}) ->
+ true;
+is_same(_ChPid, _ConsumerTag, _Consumer) ->
+ false.
+
+get_consumer(#state{consumers = Consumers}) ->
+ case priority_queue:out_p(Consumers) of
+ {{value, Consumer, _Priority}, _Tail} -> Consumer;
+ {empty, _} -> undefined
+ end.
+
+get(ChPid, ConsumerTag, #state{consumers = Consumers}) ->
+ Consumers1 = priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP == ChPid) and (CT == ConsumerTag)
+ end, Consumers),
+ case priority_queue:out_p(Consumers1) of
+ {empty, _} -> undefined;
+ {{value, Consumer, _Priority}, _Tail} -> Consumer
+ end.
+
+consumer_tag(#consumer{tag = CTag}) ->
+ CTag.
+
%%----------------------------------------------------------------------------
parse_credit_args(Default, Args) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index c43ef4dfd4..ce5b4b5b6b 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -37,7 +37,8 @@
-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
--include_lib("rabbit_common/include/rabbit.hrl").
+%%-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
-type ra_server_id() :: {Name :: atom(), Node :: node()}.
@@ -150,7 +151,14 @@ ra_machine_config(Q = #amqqueue{name = QName,
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
- become_leader_handler => {?MODULE, become_leader, [QName]}}.
+ become_leader_handler => {?MODULE, become_leader, [QName]},
+ single_active_consumer_on => single_active_consumer_on(Q)}.
+
+single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
+ case rabbit_misc:table_lookup(QArguments, <<"x-single-active-consumer">>) of
+ {bool, true} -> true;
+ _ -> false
+ end.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
Node = node(ChPid),
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
new file mode 100644
index 0000000000..07da4a8d08
--- /dev/null
+++ b/test/single_active_consumer_SUITE.erl
@@ -0,0 +1,286 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(single_active_consumer_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, classic_queue}, {group, quorum_queue}
+ ].
+
+groups() ->
+ [
+ {classic_queue, [], [
+ all_messages_go_to_one_consumer,
+ fallback_to_another_consumer_when_first_one_is_cancelled,
+ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
+ amqp_exclusive_consume_fails_on_exclusive_consumer_queue
+ ]},
+ {quorum_queue, [], [
+ all_messages_go_to_one_consumer,
+ fallback_to_another_consumer_when_first_one_is_cancelled,
+ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled
+ %% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(classic_queue, Config) ->
+ [{single_active_consumer_queue_declare,
+ #'queue.declare'{arguments = [
+ {<<"x-single-active-consumer">>, bool, true},
+ {<<"x-queue-type">>, longstr, <<"classic">>}
+ ],
+ auto_delete = true}
+ } | Config];
+init_per_group(quorum_queue, Config) ->
+ [{single_active_consumer_queue_declare,
+ #'queue.declare'{arguments = [
+ {<<"x-single-active-consumer">>, bool, true},
+ {<<"x-queue-type">>, longstr, <<"quorum">>}
+ ],
+ durable = true, exclusive = false, auto_delete = false}
+ } | Config].
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+all_messages_go_to_one_consumer(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ MessageCount = 5,
+ ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]),
+ #'basic.consume_ok'{consumer_tag = CTag1} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+ #'basic.consume_ok'{consumer_tag = CTag2} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount)],
+
+ receive
+ {consumer_done, {MessagesPerConsumer, MessageCount}} ->
+ ?assertEqual(MessageCount, MessageCount),
+ ?assertEqual(2, maps:size(MessagesPerConsumer)),
+ ?assertEqual(MessageCount, maps:get(CTag1, MessagesPerConsumer)),
+ ?assertEqual(0, maps:get(CTag2, MessagesPerConsumer))
+ after 1000 ->
+ throw(failed)
+ end,
+
+ amqp_connection:close(C),
+ ok.
+
+fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ MessageCount = 10,
+ ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount}]),
+ #'basic.consume_ok'{consumer_tag = CTag1} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+ #'basic.consume_ok'{consumer_tag = CTag2} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+ #'basic.consume_ok'{consumer_tag = CTag3} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
+
+ {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2),
+ FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, MessageCount) -> MessageCount > 0 end, MessagesPerConsumer1)),
+ ?assertEqual(1, length(FirstActiveConsumerInList)),
+
+ FirstActiveConsumer = lists:nth(1, FirstActiveConsumerInList),
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = FirstActiveConsumer}),
+
+ {cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(),
+
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
+
+ {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1),
+ SecondActiveConsumerInList = maps:keys(maps:filter(
+ fun(CTag, MessageCount) -> MessageCount > 0 andalso CTag /= FirstActiveConsumer end,
+ MessagesPerConsumer2)
+ ),
+ ?assertEqual(1, length(SecondActiveConsumerInList)),
+ SecondActiveConsumer = lists:nth(1, SecondActiveConsumerInList),
+
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}),
+
+ amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}),
+ wait_for_messages(1),
+
+ LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))),
+
+ receive
+ {consumer_done, {MessagesPerConsumer, MessageCount}} ->
+ ?assertEqual(MessageCount, MessageCount),
+ ?assertEqual(3, maps:size(MessagesPerConsumer)),
+ ?assertEqual(MessageCount div 2, maps:get(FirstActiveConsumer, MessagesPerConsumer)),
+ ?assertEqual(MessageCount div 2 - 1, maps:get(SecondActiveConsumer, MessagesPerConsumer)),
+ ?assertEqual(1, maps:get(LastActiveConsumer, MessagesPerConsumer))
+ after 1000 ->
+ throw(failed)
+ end,
+
+ amqp_connection:close(C),
+ ok.
+
+fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ {C1, Ch1} = connection_and_channel(Config),
+ {C2, Ch2} = connection_and_channel(Config),
+ {C3, Ch3} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ MessageCount = 10,
+ Consumer1Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2}]),
+ Consumer2Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]),
+ Consumer3Pid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, MessageCount div 2 - 1}]),
+ #'basic.consume_ok'{consumer_tag = CTag1} =
+ amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"1">>}, Consumer1Pid),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch2, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"2">>}, Consumer2Pid),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch3, #'basic.consume'{queue = Q, no_ack = true, consumer_tag = <<"3">>}, Consumer3Pid),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
+
+ {MessagesPerConsumer1, MessageCount1} = consume_results(),
+ ?assertEqual(MessageCount div 2, MessageCount1),
+ ?assertEqual(1, maps:size(MessagesPerConsumer1)),
+ ?assertEqual(MessageCount div 2, maps:get(CTag1, MessagesPerConsumer1)),
+
+ ok = amqp_channel:close(Ch1),
+
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
+
+ {MessagesPerConsumer2, MessageCount2} = consume_results(),
+ ?assertEqual(MessageCount div 2 - 1, MessageCount2),
+ ?assertEqual(1, maps:size(MessagesPerConsumer2)),
+
+ ok = amqp_channel:close(Ch2),
+
+ amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"poison">>}),
+
+ {MessagesPerConsumer3, MessageCount3} = consume_results(),
+ ?assertEqual(1, MessageCount3),
+ ?assertEqual(1, maps:size(MessagesPerConsumer3)),
+
+ [amqp_connection:close(Conn) || Conn <- [C1, C2, C3, C]],
+ ok.
+
+amqp_exclusive_consume_fails_on_exclusive_consumer_queue(Config) ->
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 403, _}}, _},
+ amqp_channel:call(Ch, #'basic.consume'{queue = Q, exclusive = true})
+ ),
+ amqp_connection:close(C),
+ ok.
+
+connection_and_channel(Config) ->
+ C = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Ch} = amqp_connection:open_channel(C),
+ {C, Ch}.
+
+queue_declare(Channel, Config) ->
+ Declare = ?config(single_active_consumer_queue_declare, Config),
+ #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, Declare),
+ Q.
+
+consume({Parent, State, 0}) ->
+ Parent ! {consumer_done, State};
+consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) ->
+ receive
+ #'basic.consume_ok'{consumer_tag = CTag} ->
+ consume({Parent, {maps:put(CTag, 0, MessagesPerConsumer), MessageCount}, CountDown});
+ {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = <<"poison">>}} ->
+ Parent ! {consumer_done,
+ {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
+ MessageCount + 1}};
+ {#'basic.deliver'{consumer_tag = CTag}, _Content} ->
+ NewState = {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
+ MessageCount + 1},
+ Parent ! {message, NewState},
+ consume({Parent, NewState, CountDown - 1});
+ #'basic.cancel_ok'{consumer_tag = CTag} ->
+ Parent ! {cancel_ok, CTag},
+ consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown});
+ _ ->
+ consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown})
+ after 10000 ->
+ Parent ! {consumer_timeout, {MessagesPerConsumer, MessageCount}},
+ exit(consumer_timeout)
+ end.
+
+consume_results() ->
+ receive
+ {consumer_done, {MessagesPerConsumer, MessageCount}} ->
+ {MessagesPerConsumer, MessageCount};
+ {consumer_timeout, {MessagesPerConsumer, MessageCount}} ->
+ {MessagesPerConsumer, MessageCount};
+ _ ->
+ consume_results()
+ after 1000 ->
+ throw(failed)
+ end.
+
+wait_for_messages(ExpectedCount) ->
+ wait_for_messages(ExpectedCount, {}).
+
+wait_for_messages(0, State) ->
+ State;
+wait_for_messages(ExpectedCount, _) ->
+ receive
+ {message, {MessagesPerConsumer, MessageCount}} ->
+ wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount})
+ after 5000 ->
+ throw(message_waiting_timeout)
+ end.
+
+wait_for_cancel_ok() ->
+ receive
+ {cancel_ok, CTag} ->
+ {cancel_ok, CTag}
+ after 5000 ->
+ throw(consumer_cancel_ok_timeout)
+ end.
diff --git a/test/unit_queue_consumers_SUITE.erl b/test/unit_queue_consumers_SUITE.erl
new file mode 100644
index 0000000000..08d12e7ec5
--- /dev/null
+++ b/test/unit_queue_consumers_SUITE.erl
@@ -0,0 +1,102 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(unit_queue_consumers_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ is_same,
+ get_consumer,
+ get
+ ].
+
+is_same(_Config) ->
+ ?assertEqual(
+ true,
+ rabbit_queue_consumers:is_same(
+ self(), <<"1">>,
+ consumer(self(), <<"1">>)
+ )),
+ ?assertEqual(
+ false,
+ rabbit_queue_consumers:is_same(
+ self(), <<"1">>,
+ consumer(self(), <<"2">>)
+ )),
+ Pid = spawn(?MODULE, function_for_process, []),
+ Pid ! whatever,
+ ?assertEqual(
+ false,
+ rabbit_queue_consumers:is_same(
+ self(), <<"1">>,
+ consumer(Pid, <<"1">>)
+ )),
+ ok.
+
+get(_Config) ->
+ Pid = spawn(?MODULE, function_for_process, []),
+ Pid ! whatever,
+ State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])),
+ {Pid, {consumer, <<"2">>, _, _, _, _}} =
+ rabbit_queue_consumers:get(Pid, <<"2">>, State),
+ ?assertEqual(
+ undefined,
+ rabbit_queue_consumers:get(self(), <<"2">>, State)
+ ),
+ ?assertEqual(
+ undefined,
+ rabbit_queue_consumers:get(Pid, <<"1">>, State)
+ ),
+ ok.
+
+get_consumer(_Config) ->
+ Pid = spawn(unit_queue_consumers_SUITE, function_for_process, []),
+ Pid ! whatever,
+ State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])),
+ {_Pid, {consumer, _, _, _, _, _}} =
+ rabbit_queue_consumers:get_consumer(State),
+ ?assertEqual(
+ undefined,
+ rabbit_queue_consumers:get_consumer(state(consumers([])))
+ ),
+ ok.
+
+consumers([]) ->
+ priority_queue:new();
+consumers(Consumers) ->
+ consumers(Consumers, priority_queue:new()).
+
+consumers([H], Q) ->
+ priority_queue:in(H, Q);
+consumers([H | T], Q) ->
+ consumers(T, priority_queue:in(H, Q)).
+
+
+consumer(Pid, ConsumerTag) ->
+ {Pid, {consumer, ConsumerTag, true, 1, [], <<"guest">>}}.
+
+state(Consumers) ->
+ {state, Consumers, {}}.
+
+function_for_process() ->
+ receive
+ _ -> ok
+ end. \ No newline at end of file