diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-10-22 14:46:46 +0200 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-10-22 14:46:46 +0200 |
| commit | a59ee6e05bb7c32e0886eb05ebe5197a095b49eb (patch) | |
| tree | e9d69c6c8de47c5097f8617c324fca1b3c4f42e6 | |
| parent | f9d4907acf96e99634352248625505d2cb6c34fa (diff) | |
| download | rabbitmq-server-git-a59ee6e05bb7c32e0886eb05ebe5197a095b49eb.tar.gz | |
Implement exclusive consumer (WIP)
This commit introduces exclusive consumer. A queue is declared with the
x-exclusive-consumer argument set to true and it will dispatch messages
to only one consumer at a time. This exclusive consumer is typically
the first one to consume from the queue. Other consumers are simply
ignored. If the exclusive consumer is cancelled, another consumer is
picked up randomly.
Subsequent commits will handle other cases than simple cancellation
(e.g. the owning channel of the exclusive consumer is closed).
[#161090309]
References #1743
| -rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 148 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 52 | ||||
| -rw-r--r-- | test/exclusive_consumer_SUITE.erl | 134 | ||||
| -rw-r--r-- | test/unit_queue_consumers_SUITE.erl | 102 |
5 files changed, 394 insertions, 51 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ca4fb1c019..539cc7287d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -575,7 +575,8 @@ declare_args() -> {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}, {<<"x-max-priority">>, fun check_max_priority_arg/2}, {<<"x-overflow">>, fun check_overflow/2}, - {<<"x-queue-mode">>, fun check_queue_mode/2}]. + {<<"x-queue-mode">>, fun check_queue_mode/2}, + {<<"x-exclusive-consumer">>, fun check_exclusive_consumer_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. @@ -616,6 +617,12 @@ check_max_priority_arg({Type, Val}, Args) -> Error -> Error end. +check_exclusive_consumer_arg({Type, Val}, Args) -> + case check_bool_arg({Type, Val}, Args) of + ok -> ok; + Error -> Error + end. + %% Note that the validity of x-dead-letter-exchange is already verified %% by rabbit_channel's queue.declare handler. check_dlxname_arg({longstr, _}, _) -> ok; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 334980bf49..d6e0296e83 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,7 +36,7 @@ -record(q, { %% an #amqqueue record q, - %% none | {exclusive consumer channel PID, consumer tag} + %% none | {exclusive consumer channel PID, consumer tag} | {exclusive consumer channel PID, consumer} exclusive_consumer, %% Set to true if a queue has ever had a consumer. %% This is used to determine when to delete auto-delete queues. @@ -94,7 +94,8 @@ %% example. mirroring_policy_version = 0, %% running | flow | idle - status + status, + exclusive_consumer_on }). %%---------------------------------------------------------------------------- @@ -155,15 +156,20 @@ init(Q) -> ?MODULE}. init_state(Q) -> - State = #q{q = Q, - exclusive_consumer = none, - has_had_consumers = false, - consumers = rabbit_queue_consumers:new(), - senders = pmon:new(delegate), - msg_id_to_channel = gb_trees:empty(), - status = running, - args_policy_version = 0, - overflow = 'drop-head'}, + ExclusiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-exclusive-consumer">>) of + {bool, true} -> true; + _ -> false + end, + State = #q{q = Q, + exclusive_consumer = none, + has_had_consumers = false, + consumers = rabbit_queue_consumers:new(), + senders = pmon:new(delegate), + msg_id_to_channel = gb_trees:empty(), + status = running, + args_policy_version = 0, + overflow = 'drop-head', + exclusive_consumer_on = ExclusiveConsumerOn}, rabbit_event:init_stats_timer(State, #q.stats_timer). init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> @@ -545,7 +551,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref). ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). -assert_invariant(State = #q{consumers = Consumers}) -> +assert_invariant(#q{exclusive_consumer_on = true}) -> + %% queue may contain messages and have available consumers with exclusive consumer + ok; +assert_invariant(State = #q{consumers = Consumers, exclusive_consumer_on = false}) -> true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)). is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). @@ -619,7 +628,8 @@ run_message_queue(ActiveConsumersChanged, State) -> true -> maybe_notify_decorators(ActiveConsumersChanged, State); false -> case rabbit_queue_consumers:deliver( fun(AckRequired) -> fetch(AckRequired, State) end, - qname(State), State#q.consumers) of + qname(State), State#q.consumers, + State#q.exclusive_consumer_on, State#q.exclusive_consumer) of {delivered, ActiveConsumersChanged1, State1, Consumers} -> run_message_queue( ActiveConsumersChanged or ActiveConsumersChanged1, @@ -645,7 +655,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, {{Message, Delivered, AckTag}, {BQS1, MTC}}; (false) -> {{Message, Delivered, undefined}, discard(Delivery, BQ, BQS, MTC)} - end, qname(State), State#q.consumers) of + end, qname(State), State#q.consumers, State#q.exclusive_consumer_on, State#q.exclusive_consumer) of {delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} -> {delivered, maybe_notify_decorators( ActiveConsumersChanged, @@ -1204,47 +1214,99 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser}, - _From, State = #q{consumers = Consumers, - exclusive_consumer = Holder}) -> - case check_exclusive_access(Holder, ExclusiveConsume, State) of - in_use -> reply({error, exclusive_consume_unavailable}, State); - ok -> Consumers1 = rabbit_queue_consumers:add( - ChPid, ConsumerTag, NoAck, - LimiterPid, LimiterActive, - PrefetchCount, Args, is_empty(State), - ActingUser, Consumers), - ExclusiveConsumer = - if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> Holder - end, - State1 = State#q{consumers = Consumers1, - has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, + _From, State = #q{consumers = Consumers, + exclusive_consumer = Holder, + exclusive_consumer_on = ExclusiveConsumerOn}) -> + case ExclusiveConsumerOn of + true -> + case ExclusiveConsume of + true -> + reply({error, exclusive_consume_unavailable}, State); + false -> + Consumers1 = rabbit_queue_consumers:add( + ChPid, ConsumerTag, NoAck, + LimiterPid, LimiterActive, + PrefetchCount, Args, is_empty(State), + ActingUser, Consumers), + + State1 = case Holder of + none -> + NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1), + State#q{consumers = Consumers1, + has_had_consumers = true, + exclusive_consumer = NewConsumer}; + _ -> + State#q{consumers = Consumers1, + has_had_consumers = true} + end, ok = maybe_send_reply(ChPid, OkMsg), - QName = qname(State1), - AckRequired = not NoAck, - rabbit_core_metrics:consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, - PrefetchCount, Args), + QName = qname(State1), + AckRequired = not NoAck, + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, + PrefetchCount, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, PrefetchCount, - Args, none, ActingUser), + AckRequired, QName, PrefetchCount, + Args, none, ActingUser), notify_decorators(State1), reply(ok, run_message_queue(State1)) + end; + false -> + case check_exclusive_access(Holder, ExclusiveConsume, State) of + in_use -> reply({error, exclusive_consume_unavailable}, State); + ok -> Consumers1 = rabbit_queue_consumers:add( + ChPid, ConsumerTag, NoAck, + LimiterPid, LimiterActive, + PrefetchCount, Args, is_empty(State), + ActingUser, Consumers), + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> Holder + end, + State1 = State#q{consumers = Consumers1, + has_had_consumers = true, + exclusive_consumer = ExclusiveConsumer}, + ok = maybe_send_reply(ChPid, OkMsg), + QName = qname(State1), + AckRequired = not NoAck, + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, + PrefetchCount, Args), + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, PrefetchCount, + Args, none, ActingUser), + notify_decorators(State1), + reply(ok, run_message_queue(State1)) + end end; + handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From, - State = #q{consumers = Consumers, - exclusive_consumer = Holder}) -> + State = #q{consumers = Consumers, + exclusive_consumer = Holder, + exclusive_consumer_on = ExclusiveConsumerOn }) -> ok = maybe_send_reply(ChPid, OkMsg), case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of not_found -> reply(ok, State); Consumers1 -> - Holder1 = case Holder of - {ChPid, ConsumerTag} -> none; - _ -> Holder - end, + Holder1 = case ExclusiveConsumerOn of + true -> + case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, Holder) of + true -> + case rabbit_queue_consumers:get_consumer(Consumers1) of + undefined -> none; + Consumer -> Consumer + end; + false -> + Holder + end; + false -> + case Holder of + {ChPid, ConsumerTag} -> none; + _ -> Holder + end + end, State1 = State#q{consumers = Consumers1, exclusive_consumer = Holder1}, emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser), diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 0fe3065fe8..c9192a1175 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -18,10 +18,10 @@ -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/10, remove/3, erase_ch/2, - send_drained/0, deliver/3, record_ack/3, subtract_acks/3, + send_drained/0, deliver/5, record_ack/3, subtract_acks/3, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, - credit/6, utilisation/1]). + credit/6, utilisation/1, is_same/3, get_consumer/1, get/3]). %%---------------------------------------------------------------------------- @@ -55,6 +55,9 @@ use :: {'inactive', time_micros(), time_micros(), ratio()} | {'active', time_micros(), ratio()}}. +-type consumer() :: #consumer{tag::rabbit_types:ctag(), ack_required::boolean(), + prefetch::non_neg_integer(), args::rabbit_framing:amqp_table(), + user::rabbit_types:username()}. -type ch() :: pid(). -type ack() :: non_neg_integer(). -type cr_fun() :: fun ((#cr{}) -> #cr{}). @@ -79,7 +82,8 @@ state()}. -spec send_drained() -> 'ok'. -spec deliver(fun ((boolean()) -> {fetch_result(), T}), - rabbit_amqqueue:name(), state()) -> + rabbit_amqqueue:name(), state(), boolean(), + none | {ch(), rabbit_types:ctag()} | {ch(), consumer()}) -> {'delivered', boolean(), T, state()} | {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. @@ -187,10 +191,24 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. -deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State). - +deliver(FetchFun, QName, State, ExclusiveConsumerIsOn, ExclusiveConsumer) -> + deliver(FetchFun, QName, false, State, ExclusiveConsumerIsOn, ExclusiveConsumer). + +deliver(_FetchFun, _QName, false, State, true, none) -> + {undelivered, false, + State#state{use = update_use(State#state.use, inactive)}}; +deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, ExclusiveConsumer) -> + case deliver_to_consumer(FetchFun, ExclusiveConsumer, QName) of + {delivered, R} -> + {delivered, false, R, State}; + undelivered -> + {ChPid, Consumer} = ExclusiveConsumer, + Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers), + {undelivered, true, + State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}} + end; deliver(FetchFun, QName, ConsumersChanged, - State = #state{consumers = Consumers}) -> + State = #state{consumers = Consumers}, false, _ExclusiveConsumer) -> case priority_queue:out_p(Consumers) of {empty, _} -> {undelivered, ConsumersChanged, @@ -203,7 +221,7 @@ deliver(FetchFun, QName, ConsumersChanged, Tail)}}; undelivered -> deliver(FetchFun, QName, true, - State#state{consumers = Tail}) + State#state{consumers = Tail}, false, _ExclusiveConsumer) end end. @@ -353,6 +371,26 @@ utilisation(#state{use = {active, Since, Avg}}) -> utilisation(#state{use = {inactive, Since, Active, Avg}}) -> use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg). +is_same(ChPid, ConsumerTag, {ChPid, #consumer{tag = ConsumerTag}}) -> + true; +is_same(_ChPid, _ConsumerTag, _Consumer) -> + false. + +get_consumer(#state{consumers = Consumers}) -> + case priority_queue:out_p(Consumers) of + {{value, Consumer, _Priority}, _Tail} -> Consumer; + {empty, _} -> undefined + end. + +get(ChPid, ConsumerTag, #state{consumers = Consumers}) -> + Consumers1 = priority_queue:filter(fun ({CP, #consumer{tag = CT}}) -> + (CP == ChPid) and (CT == ConsumerTag) + end, Consumers), + case priority_queue:out_p(Consumers1) of + {empty, _} -> undefined; + {{value, Consumer, _Priority}, _Tail} -> Consumer + end. + %%---------------------------------------------------------------------------- parse_credit_args(Default, Args) -> diff --git a/test/exclusive_consumer_SUITE.erl b/test/exclusive_consumer_SUITE.erl new file mode 100644 index 0000000000..75397b548a --- /dev/null +++ b/test/exclusive_consumer_SUITE.erl @@ -0,0 +1,134 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% + +-module(exclusive_consumer_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include("amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + all_messages_go_to_one_consumer, + fallback_to_another_consumer_when_first_one_is_cancelled + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + + +all_messages_go_to_one_consumer(Config) -> + {C, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}]}, + NbMessages = 5, + #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, Declare), + ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]), + #'basic.consume_ok'{consumer_tag = CTag1} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + #'basic.consume_ok'{consumer_tag = CTag2} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages)], + + receive + {consumer_done, {MessagesPerConsumer, MessageCount}} -> + ?assertEqual(NbMessages, MessageCount), + ?assertEqual(2, maps:size(MessagesPerConsumer)), + ?assertEqual(NbMessages, maps:get(CTag1, MessagesPerConsumer)), + ?assertEqual(0, maps:get(CTag2, MessagesPerConsumer)) + after 1000 -> + throw(failed) + end, + + amqp_connection:close(C), + ok. + +fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> + {C, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}]}, + NbMessages = 10, + #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, Declare), + ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]), + #'basic.consume_ok'{consumer_tag = CTag1} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + #'basic.consume_ok'{consumer_tag = CTag2} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + #'basic.consume_ok'{consumer_tag = _CTag3} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages div 2)], + + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag1}), + + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(NbMessages div 2 + 1, NbMessages - 1)], + + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag2}), + + amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}), + + receive + {consumer_done, {MessagesPerConsumer, MessageCount}} -> + ?assertEqual(NbMessages, MessageCount), + ?assertEqual(3, maps:size(MessagesPerConsumer)), + ?assertEqual(NbMessages div 2, maps:get(CTag1, MessagesPerConsumer)), + Counts = maps:values(MessagesPerConsumer), + ?assert(lists:member(NbMessages div 2, Counts)), + ?assert(lists:member(NbMessages div 2 - 1, Counts)), + ?assert(lists:member(1, Counts)) + after 1000 -> + throw(failed) + end, + + amqp_connection:close(C), + ok. + +consume({Parent, State, 0}) -> + Parent ! {consumer_done, State}; +consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) -> + receive + #'basic.consume_ok'{consumer_tag = CTag} -> + consume({Parent, {maps:put(CTag, 0, MessagesPerConsumer), MessageCount}, CountDown}); + {#'basic.deliver'{consumer_tag = CTag}, _Content} -> + consume({Parent, + {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer), + MessageCount + 1}, + CountDown - 1}); + #'basic.cancel_ok'{} -> + consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) + after 500 -> + exit(consumer_timeout) + end.
\ No newline at end of file diff --git a/test/unit_queue_consumers_SUITE.erl b/test/unit_queue_consumers_SUITE.erl new file mode 100644 index 0000000000..08d12e7ec5 --- /dev/null +++ b/test/unit_queue_consumers_SUITE.erl @@ -0,0 +1,102 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% + +-module(unit_queue_consumers_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +all() -> + [ + is_same, + get_consumer, + get + ]. + +is_same(_Config) -> + ?assertEqual( + true, + rabbit_queue_consumers:is_same( + self(), <<"1">>, + consumer(self(), <<"1">>) + )), + ?assertEqual( + false, + rabbit_queue_consumers:is_same( + self(), <<"1">>, + consumer(self(), <<"2">>) + )), + Pid = spawn(?MODULE, function_for_process, []), + Pid ! whatever, + ?assertEqual( + false, + rabbit_queue_consumers:is_same( + self(), <<"1">>, + consumer(Pid, <<"1">>) + )), + ok. + +get(_Config) -> + Pid = spawn(?MODULE, function_for_process, []), + Pid ! whatever, + State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])), + {Pid, {consumer, <<"2">>, _, _, _, _}} = + rabbit_queue_consumers:get(Pid, <<"2">>, State), + ?assertEqual( + undefined, + rabbit_queue_consumers:get(self(), <<"2">>, State) + ), + ?assertEqual( + undefined, + rabbit_queue_consumers:get(Pid, <<"1">>, State) + ), + ok. + +get_consumer(_Config) -> + Pid = spawn(unit_queue_consumers_SUITE, function_for_process, []), + Pid ! whatever, + State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])), + {_Pid, {consumer, _, _, _, _, _}} = + rabbit_queue_consumers:get_consumer(State), + ?assertEqual( + undefined, + rabbit_queue_consumers:get_consumer(state(consumers([]))) + ), + ok. + +consumers([]) -> + priority_queue:new(); +consumers(Consumers) -> + consumers(Consumers, priority_queue:new()). + +consumers([H], Q) -> + priority_queue:in(H, Q); +consumers([H | T], Q) -> + consumers(T, priority_queue:in(H, Q)). + + +consumer(Pid, ConsumerTag) -> + {Pid, {consumer, ConsumerTag, true, 1, [], <<"guest">>}}. + +state(Consumers) -> + {state, Consumers, {}}. + +function_for_process() -> + receive + _ -> ok + end.
\ No newline at end of file |
