summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl148
-rw-r--r--src/rabbit_queue_consumers.erl52
-rw-r--r--test/exclusive_consumer_SUITE.erl134
-rw-r--r--test/unit_queue_consumers_SUITE.erl102
5 files changed, 394 insertions, 51 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ca4fb1c019..539cc7287d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -575,7 +575,8 @@ declare_args() ->
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
{<<"x-max-priority">>, fun check_max_priority_arg/2},
{<<"x-overflow">>, fun check_overflow/2},
- {<<"x-queue-mode">>, fun check_queue_mode/2}].
+ {<<"x-queue-mode">>, fun check_queue_mode/2},
+ {<<"x-exclusive-consumer">>, fun check_exclusive_consumer_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
@@ -616,6 +617,12 @@ check_max_priority_arg({Type, Val}, Args) ->
Error -> Error
end.
+check_exclusive_consumer_arg({Type, Val}, Args) ->
+ case check_bool_arg({Type, Val}, Args) of
+ ok -> ok;
+ Error -> Error
+ end.
+
%% Note that the validity of x-dead-letter-exchange is already verified
%% by rabbit_channel's queue.declare handler.
check_dlxname_arg({longstr, _}, _) -> ok;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 334980bf49..d6e0296e83 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -36,7 +36,7 @@
-record(q, {
%% an #amqqueue record
q,
- %% none | {exclusive consumer channel PID, consumer tag}
+ %% none | {exclusive consumer channel PID, consumer tag} | {exclusive consumer channel PID, consumer}
exclusive_consumer,
%% Set to true if a queue has ever had a consumer.
%% This is used to determine when to delete auto-delete queues.
@@ -94,7 +94,8 @@
%% example.
mirroring_policy_version = 0,
%% running | flow | idle
- status
+ status,
+ exclusive_consumer_on
}).
%%----------------------------------------------------------------------------
@@ -155,15 +156,20 @@ init(Q) ->
?MODULE}.
init_state(Q) ->
- State = #q{q = Q,
- exclusive_consumer = none,
- has_had_consumers = false,
- consumers = rabbit_queue_consumers:new(),
- senders = pmon:new(delegate),
- msg_id_to_channel = gb_trees:empty(),
- status = running,
- args_policy_version = 0,
- overflow = 'drop-head'},
+ ExclusiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-exclusive-consumer">>) of
+ {bool, true} -> true;
+ _ -> false
+ end,
+ State = #q{q = Q,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ consumers = rabbit_queue_consumers:new(),
+ senders = pmon:new(delegate),
+ msg_id_to_channel = gb_trees:empty(),
+ status = running,
+ args_policy_version = 0,
+ overflow = 'drop-head',
+ exclusive_consumer_on = ExclusiveConsumerOn},
rabbit_event:init_stats_timer(State, #q.stats_timer).
init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@@ -545,7 +551,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(State = #q{consumers = Consumers}) ->
+assert_invariant(#q{exclusive_consumer_on = true}) ->
+ %% queue may contain messages and have available consumers with exclusive consumer
+ ok;
+assert_invariant(State = #q{consumers = Consumers, exclusive_consumer_on = false}) ->
true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)).
is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
@@ -619,7 +628,8 @@ run_message_queue(ActiveConsumersChanged, State) ->
true -> maybe_notify_decorators(ActiveConsumersChanged, State);
false -> case rabbit_queue_consumers:deliver(
fun(AckRequired) -> fetch(AckRequired, State) end,
- qname(State), State#q.consumers) of
+ qname(State), State#q.consumers,
+ State#q.exclusive_consumer_on, State#q.exclusive_consumer) of
{delivered, ActiveConsumersChanged1, State1, Consumers} ->
run_message_queue(
ActiveConsumersChanged or ActiveConsumersChanged1,
@@ -645,7 +655,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
{{Message, Delivered, AckTag}, {BQS1, MTC}};
(false) -> {{Message, Delivered, undefined},
discard(Delivery, BQ, BQS, MTC)}
- end, qname(State), State#q.consumers) of
+ end, qname(State), State#q.consumers, State#q.exclusive_consumer_on, State#q.exclusive_consumer) of
{delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} ->
{delivered, maybe_notify_decorators(
ActiveConsumersChanged,
@@ -1204,47 +1214,99 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser},
- _From, State = #q{consumers = Consumers,
- exclusive_consumer = Holder}) ->
- case check_exclusive_access(Holder, ExclusiveConsume, State) of
- in_use -> reply({error, exclusive_consume_unavailable}, State);
- ok -> Consumers1 = rabbit_queue_consumers:add(
- ChPid, ConsumerTag, NoAck,
- LimiterPid, LimiterActive,
- PrefetchCount, Args, is_empty(State),
- ActingUser, Consumers),
- ExclusiveConsumer =
- if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> Holder
- end,
- State1 = State#q{consumers = Consumers1,
- has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
+ _From, State = #q{consumers = Consumers,
+ exclusive_consumer = Holder,
+ exclusive_consumer_on = ExclusiveConsumerOn}) ->
+ case ExclusiveConsumerOn of
+ true ->
+ case ExclusiveConsume of
+ true ->
+ reply({error, exclusive_consume_unavailable}, State);
+ false ->
+ Consumers1 = rabbit_queue_consumers:add(
+ ChPid, ConsumerTag, NoAck,
+ LimiterPid, LimiterActive,
+ PrefetchCount, Args, is_empty(State),
+ ActingUser, Consumers),
+
+ State1 = case Holder of
+ none ->
+ NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1),
+ State#q{consumers = Consumers1,
+ has_had_consumers = true,
+ exclusive_consumer = NewConsumer};
+ _ ->
+ State#q{consumers = Consumers1,
+ has_had_consumers = true}
+ end,
ok = maybe_send_reply(ChPid, OkMsg),
- QName = qname(State1),
- AckRequired = not NoAck,
- rabbit_core_metrics:consumer_created(
- ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
- PrefetchCount, Args),
+ QName = qname(State1),
+ AckRequired = not NoAck,
+ rabbit_core_metrics:consumer_created(
+ ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
+ PrefetchCount, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- AckRequired, QName, PrefetchCount,
- Args, none, ActingUser),
+ AckRequired, QName, PrefetchCount,
+ Args, none, ActingUser),
notify_decorators(State1),
reply(ok, run_message_queue(State1))
+ end;
+ false ->
+ case check_exclusive_access(Holder, ExclusiveConsume, State) of
+ in_use -> reply({error, exclusive_consume_unavailable}, State);
+ ok -> Consumers1 = rabbit_queue_consumers:add(
+ ChPid, ConsumerTag, NoAck,
+ LimiterPid, LimiterActive,
+ PrefetchCount, Args, is_empty(State),
+ ActingUser, Consumers),
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> Holder
+ end,
+ State1 = State#q{consumers = Consumers1,
+ has_had_consumers = true,
+ exclusive_consumer = ExclusiveConsumer},
+ ok = maybe_send_reply(ChPid, OkMsg),
+ QName = qname(State1),
+ AckRequired = not NoAck,
+ rabbit_core_metrics:consumer_created(
+ ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName,
+ PrefetchCount, Args),
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ AckRequired, QName, PrefetchCount,
+ Args, none, ActingUser),
+ notify_decorators(State1),
+ reply(ok, run_message_queue(State1))
+ end
end;
+
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From,
- State = #q{consumers = Consumers,
- exclusive_consumer = Holder}) ->
+ State = #q{consumers = Consumers,
+ exclusive_consumer = Holder,
+ exclusive_consumer_on = ExclusiveConsumerOn }) ->
ok = maybe_send_reply(ChPid, OkMsg),
case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of
not_found ->
reply(ok, State);
Consumers1 ->
- Holder1 = case Holder of
- {ChPid, ConsumerTag} -> none;
- _ -> Holder
- end,
+ Holder1 = case ExclusiveConsumerOn of
+ true ->
+ case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, Holder) of
+ true ->
+ case rabbit_queue_consumers:get_consumer(Consumers1) of
+ undefined -> none;
+ Consumer -> Consumer
+ end;
+ false ->
+ Holder
+ end;
+ false ->
+ case Holder of
+ {ChPid, ConsumerTag} -> none;
+ _ -> Holder
+ end
+ end,
State1 = State#q{consumers = Consumers1,
exclusive_consumer = Holder1},
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser),
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 0fe3065fe8..c9192a1175 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -18,10 +18,10 @@
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
- send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
+ send_drained/0, deliver/5, record_ack/3, subtract_acks/3,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
- credit/6, utilisation/1]).
+ credit/6, utilisation/1, is_same/3, get_consumer/1, get/3]).
%%----------------------------------------------------------------------------
@@ -55,6 +55,9 @@
use :: {'inactive',
time_micros(), time_micros(), ratio()} |
{'active', time_micros(), ratio()}}.
+-type consumer() :: #consumer{tag::rabbit_types:ctag(), ack_required::boolean(),
+ prefetch::non_neg_integer(), args::rabbit_framing:amqp_table(),
+ user::rabbit_types:username()}.
-type ch() :: pid().
-type ack() :: non_neg_integer().
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
@@ -79,7 +82,8 @@
state()}.
-spec send_drained() -> 'ok'.
-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
- rabbit_amqqueue:name(), state()) ->
+ rabbit_amqqueue:name(), state(), boolean(),
+ none | {ch(), rabbit_types:ctag()} | {ch(), consumer()}) ->
{'delivered', boolean(), T, state()} |
{'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
@@ -187,10 +191,24 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
-deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State).
-
+deliver(FetchFun, QName, State, ExclusiveConsumerIsOn, ExclusiveConsumer) ->
+ deliver(FetchFun, QName, false, State, ExclusiveConsumerIsOn, ExclusiveConsumer).
+
+deliver(_FetchFun, _QName, false, State, true, none) ->
+ {undelivered, false,
+ State#state{use = update_use(State#state.use, inactive)}};
+deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, ExclusiveConsumer) ->
+ case deliver_to_consumer(FetchFun, ExclusiveConsumer, QName) of
+ {delivered, R} ->
+ {delivered, false, R, State};
+ undelivered ->
+ {ChPid, Consumer} = ExclusiveConsumer,
+ Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers),
+ {undelivered, true,
+ State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}}
+ end;
deliver(FetchFun, QName, ConsumersChanged,
- State = #state{consumers = Consumers}) ->
+ State = #state{consumers = Consumers}, false, _ExclusiveConsumer) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
{undelivered, ConsumersChanged,
@@ -203,7 +221,7 @@ deliver(FetchFun, QName, ConsumersChanged,
Tail)}};
undelivered ->
deliver(FetchFun, QName, true,
- State#state{consumers = Tail})
+ State#state{consumers = Tail}, false, _ExclusiveConsumer)
end
end.
@@ -353,6 +371,26 @@ utilisation(#state{use = {active, Since, Avg}}) ->
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg).
+is_same(ChPid, ConsumerTag, {ChPid, #consumer{tag = ConsumerTag}}) ->
+ true;
+is_same(_ChPid, _ConsumerTag, _Consumer) ->
+ false.
+
+get_consumer(#state{consumers = Consumers}) ->
+ case priority_queue:out_p(Consumers) of
+ {{value, Consumer, _Priority}, _Tail} -> Consumer;
+ {empty, _} -> undefined
+ end.
+
+get(ChPid, ConsumerTag, #state{consumers = Consumers}) ->
+ Consumers1 = priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP == ChPid) and (CT == ConsumerTag)
+ end, Consumers),
+ case priority_queue:out_p(Consumers1) of
+ {empty, _} -> undefined;
+ {{value, Consumer, _Priority}, _Tail} -> Consumer
+ end.
+
%%----------------------------------------------------------------------------
parse_credit_args(Default, Args) ->
diff --git a/test/exclusive_consumer_SUITE.erl b/test/exclusive_consumer_SUITE.erl
new file mode 100644
index 0000000000..75397b548a
--- /dev/null
+++ b/test/exclusive_consumer_SUITE.erl
@@ -0,0 +1,134 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(exclusive_consumer_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include("amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ all_messages_go_to_one_consumer,
+ fallback_to_another_consumer_when_first_one_is_cancelled
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+
+all_messages_go_to_one_consumer(Config) ->
+ {C, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
+ Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}]},
+ NbMessages = 5,
+ #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, Declare),
+ ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]),
+ #'basic.consume_ok'{consumer_tag = CTag1} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+ #'basic.consume_ok'{consumer_tag = CTag2} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages)],
+
+ receive
+ {consumer_done, {MessagesPerConsumer, MessageCount}} ->
+ ?assertEqual(NbMessages, MessageCount),
+ ?assertEqual(2, maps:size(MessagesPerConsumer)),
+ ?assertEqual(NbMessages, maps:get(CTag1, MessagesPerConsumer)),
+ ?assertEqual(0, maps:get(CTag2, MessagesPerConsumer))
+ after 1000 ->
+ throw(failed)
+ end,
+
+ amqp_connection:close(C),
+ ok.
+
+fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
+ {C, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
+ Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}]},
+ NbMessages = 10,
+ #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, Declare),
+ ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]),
+ #'basic.consume_ok'{consumer_tag = CTag1} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+ #'basic.consume_ok'{consumer_tag = CTag2} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+ #'basic.consume_ok'{consumer_tag = _CTag3} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages div 2)],
+
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag1}),
+
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(NbMessages div 2 + 1, NbMessages - 1)],
+
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag2}),
+
+ amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}),
+
+ receive
+ {consumer_done, {MessagesPerConsumer, MessageCount}} ->
+ ?assertEqual(NbMessages, MessageCount),
+ ?assertEqual(3, maps:size(MessagesPerConsumer)),
+ ?assertEqual(NbMessages div 2, maps:get(CTag1, MessagesPerConsumer)),
+ Counts = maps:values(MessagesPerConsumer),
+ ?assert(lists:member(NbMessages div 2, Counts)),
+ ?assert(lists:member(NbMessages div 2 - 1, Counts)),
+ ?assert(lists:member(1, Counts))
+ after 1000 ->
+ throw(failed)
+ end,
+
+ amqp_connection:close(C),
+ ok.
+
+consume({Parent, State, 0}) ->
+ Parent ! {consumer_done, State};
+consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) ->
+ receive
+ #'basic.consume_ok'{consumer_tag = CTag} ->
+ consume({Parent, {maps:put(CTag, 0, MessagesPerConsumer), MessageCount}, CountDown});
+ {#'basic.deliver'{consumer_tag = CTag}, _Content} ->
+ consume({Parent,
+ {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer),
+ MessageCount + 1},
+ CountDown - 1});
+ #'basic.cancel_ok'{} ->
+ consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown})
+ after 500 ->
+ exit(consumer_timeout)
+ end. \ No newline at end of file
diff --git a/test/unit_queue_consumers_SUITE.erl b/test/unit_queue_consumers_SUITE.erl
new file mode 100644
index 0000000000..08d12e7ec5
--- /dev/null
+++ b/test/unit_queue_consumers_SUITE.erl
@@ -0,0 +1,102 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(unit_queue_consumers_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ is_same,
+ get_consumer,
+ get
+ ].
+
+is_same(_Config) ->
+ ?assertEqual(
+ true,
+ rabbit_queue_consumers:is_same(
+ self(), <<"1">>,
+ consumer(self(), <<"1">>)
+ )),
+ ?assertEqual(
+ false,
+ rabbit_queue_consumers:is_same(
+ self(), <<"1">>,
+ consumer(self(), <<"2">>)
+ )),
+ Pid = spawn(?MODULE, function_for_process, []),
+ Pid ! whatever,
+ ?assertEqual(
+ false,
+ rabbit_queue_consumers:is_same(
+ self(), <<"1">>,
+ consumer(Pid, <<"1">>)
+ )),
+ ok.
+
+get(_Config) ->
+ Pid = spawn(?MODULE, function_for_process, []),
+ Pid ! whatever,
+ State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])),
+ {Pid, {consumer, <<"2">>, _, _, _, _}} =
+ rabbit_queue_consumers:get(Pid, <<"2">>, State),
+ ?assertEqual(
+ undefined,
+ rabbit_queue_consumers:get(self(), <<"2">>, State)
+ ),
+ ?assertEqual(
+ undefined,
+ rabbit_queue_consumers:get(Pid, <<"1">>, State)
+ ),
+ ok.
+
+get_consumer(_Config) ->
+ Pid = spawn(unit_queue_consumers_SUITE, function_for_process, []),
+ Pid ! whatever,
+ State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])),
+ {_Pid, {consumer, _, _, _, _, _}} =
+ rabbit_queue_consumers:get_consumer(State),
+ ?assertEqual(
+ undefined,
+ rabbit_queue_consumers:get_consumer(state(consumers([])))
+ ),
+ ok.
+
+consumers([]) ->
+ priority_queue:new();
+consumers(Consumers) ->
+ consumers(Consumers, priority_queue:new()).
+
+consumers([H], Q) ->
+ priority_queue:in(H, Q);
+consumers([H | T], Q) ->
+ consumers(T, priority_queue:in(H, Q)).
+
+
+consumer(Pid, ConsumerTag) ->
+ {Pid, {consumer, ConsumerTag, true, 1, [], <<"guest">>}}.
+
+state(Consumers) ->
+ {state, Consumers, {}}.
+
+function_for_process() ->
+ receive
+ _ -> ok
+ end. \ No newline at end of file