diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 148 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 52 | ||||
| -rw-r--r-- | test/exclusive_consumer_SUITE.erl | 134 | ||||
| -rw-r--r-- | test/unit_queue_consumers_SUITE.erl | 102 |
5 files changed, 394 insertions, 51 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ca4fb1c019..539cc7287d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -575,7 +575,8 @@ declare_args() -> {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}, {<<"x-max-priority">>, fun check_max_priority_arg/2}, {<<"x-overflow">>, fun check_overflow/2}, - {<<"x-queue-mode">>, fun check_queue_mode/2}]. + {<<"x-queue-mode">>, fun check_queue_mode/2}, + {<<"x-exclusive-consumer">>, fun check_exclusive_consumer_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. @@ -616,6 +617,12 @@ check_max_priority_arg({Type, Val}, Args) -> Error -> Error end. +check_exclusive_consumer_arg({Type, Val}, Args) -> + case check_bool_arg({Type, Val}, Args) of + ok -> ok; + Error -> Error + end. + %% Note that the validity of x-dead-letter-exchange is already verified %% by rabbit_channel's queue.declare handler. check_dlxname_arg({longstr, _}, _) -> ok; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 334980bf49..d6e0296e83 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,7 +36,7 @@ -record(q, { %% an #amqqueue record q, - %% none | {exclusive consumer channel PID, consumer tag} + %% none | {exclusive consumer channel PID, consumer tag} | {exclusive consumer channel PID, consumer} exclusive_consumer, %% Set to true if a queue has ever had a consumer. %% This is used to determine when to delete auto-delete queues. @@ -94,7 +94,8 @@ %% example. mirroring_policy_version = 0, %% running | flow | idle - status + status, + exclusive_consumer_on }). %%---------------------------------------------------------------------------- @@ -155,15 +156,20 @@ init(Q) -> ?MODULE}. init_state(Q) -> - State = #q{q = Q, - exclusive_consumer = none, - has_had_consumers = false, - consumers = rabbit_queue_consumers:new(), - senders = pmon:new(delegate), - msg_id_to_channel = gb_trees:empty(), - status = running, - args_policy_version = 0, - overflow = 'drop-head'}, + ExclusiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-exclusive-consumer">>) of + {bool, true} -> true; + _ -> false + end, + State = #q{q = Q, + exclusive_consumer = none, + has_had_consumers = false, + consumers = rabbit_queue_consumers:new(), + senders = pmon:new(delegate), + msg_id_to_channel = gb_trees:empty(), + status = running, + args_policy_version = 0, + overflow = 'drop-head', + exclusive_consumer_on = ExclusiveConsumerOn}, rabbit_event:init_stats_timer(State, #q.stats_timer). init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> @@ -545,7 +551,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref). ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). -assert_invariant(State = #q{consumers = Consumers}) -> +assert_invariant(#q{exclusive_consumer_on = true}) -> + %% queue may contain messages and have available consumers with exclusive consumer + ok; +assert_invariant(State = #q{consumers = Consumers, exclusive_consumer_on = false}) -> true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)). is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). @@ -619,7 +628,8 @@ run_message_queue(ActiveConsumersChanged, State) -> true -> maybe_notify_decorators(ActiveConsumersChanged, State); false -> case rabbit_queue_consumers:deliver( fun(AckRequired) -> fetch(AckRequired, State) end, - qname(State), State#q.consumers) of + qname(State), State#q.consumers, + State#q.exclusive_consumer_on, State#q.exclusive_consumer) of {delivered, ActiveConsumersChanged1, State1, Consumers} -> run_message_queue( ActiveConsumersChanged or ActiveConsumersChanged1, @@ -645,7 +655,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, {{Message, Delivered, AckTag}, {BQS1, MTC}}; (false) -> {{Message, Delivered, undefined}, discard(Delivery, BQ, BQS, MTC)} - end, qname(State), State#q.consumers) of + end, qname(State), State#q.consumers, State#q.exclusive_consumer_on, State#q.exclusive_consumer) of {delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} -> {delivered, maybe_notify_decorators( ActiveConsumersChanged, @@ -1204,47 +1214,99 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser}, - _From, State = #q{consumers = Consumers, - exclusive_consumer = Holder}) -> - case check_exclusive_access(Holder, ExclusiveConsume, State) of - in_use -> reply({error, exclusive_consume_unavailable}, State); - ok -> Consumers1 = rabbit_queue_consumers:add( - ChPid, ConsumerTag, NoAck, - LimiterPid, LimiterActive, - PrefetchCount, Args, is_empty(State), - ActingUser, Consumers), - ExclusiveConsumer = - if ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> Holder - end, - State1 = State#q{consumers = Consumers1, - has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, + _From, State = #q{consumers = Consumers, + exclusive_consumer = Holder, + exclusive_consumer_on = ExclusiveConsumerOn}) -> + case ExclusiveConsumerOn of + true -> + case ExclusiveConsume of + true -> + reply({error, exclusive_consume_unavailable}, State); + false -> + Consumers1 = rabbit_queue_consumers:add( + ChPid, ConsumerTag, NoAck, + LimiterPid, LimiterActive, + PrefetchCount, Args, is_empty(State), + ActingUser, Consumers), + + State1 = case Holder of + none -> + NewConsumer = rabbit_queue_consumers:get(ChPid, ConsumerTag, Consumers1), + State#q{consumers = Consumers1, + has_had_consumers = true, + exclusive_consumer = NewConsumer}; + _ -> + State#q{consumers = Consumers1, + has_had_consumers = true} + end, ok = maybe_send_reply(ChPid, OkMsg), - QName = qname(State1), - AckRequired = not NoAck, - rabbit_core_metrics:consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, - PrefetchCount, Args), + QName = qname(State1), + AckRequired = not NoAck, + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, + PrefetchCount, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, PrefetchCount, - Args, none, ActingUser), + AckRequired, QName, PrefetchCount, + Args, none, ActingUser), notify_decorators(State1), reply(ok, run_message_queue(State1)) + end; + false -> + case check_exclusive_access(Holder, ExclusiveConsume, State) of + in_use -> reply({error, exclusive_consume_unavailable}, State); + ok -> Consumers1 = rabbit_queue_consumers:add( + ChPid, ConsumerTag, NoAck, + LimiterPid, LimiterActive, + PrefetchCount, Args, is_empty(State), + ActingUser, Consumers), + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> Holder + end, + State1 = State#q{consumers = Consumers1, + has_had_consumers = true, + exclusive_consumer = ExclusiveConsumer}, + ok = maybe_send_reply(ChPid, OkMsg), + QName = qname(State1), + AckRequired = not NoAck, + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, AckRequired, QName, + PrefetchCount, Args), + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, PrefetchCount, + Args, none, ActingUser), + notify_decorators(State1), + reply(ok, run_message_queue(State1)) + end end; + handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, _From, - State = #q{consumers = Consumers, - exclusive_consumer = Holder}) -> + State = #q{consumers = Consumers, + exclusive_consumer = Holder, + exclusive_consumer_on = ExclusiveConsumerOn }) -> ok = maybe_send_reply(ChPid, OkMsg), case rabbit_queue_consumers:remove(ChPid, ConsumerTag, Consumers) of not_found -> reply(ok, State); Consumers1 -> - Holder1 = case Holder of - {ChPid, ConsumerTag} -> none; - _ -> Holder - end, + Holder1 = case ExclusiveConsumerOn of + true -> + case rabbit_queue_consumers:is_same(ChPid, ConsumerTag, Holder) of + true -> + case rabbit_queue_consumers:get_consumer(Consumers1) of + undefined -> none; + Consumer -> Consumer + end; + false -> + Holder + end; + false -> + case Holder of + {ChPid, ConsumerTag} -> none; + _ -> Holder + end + end, State1 = State#q{consumers = Consumers1, exclusive_consumer = Holder1}, emit_consumer_deleted(ChPid, ConsumerTag, qname(State1), ActingUser), diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 0fe3065fe8..c9192a1175 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -18,10 +18,10 @@ -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/10, remove/3, erase_ch/2, - send_drained/0, deliver/3, record_ack/3, subtract_acks/3, + send_drained/0, deliver/5, record_ack/3, subtract_acks/3, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, - credit/6, utilisation/1]). + credit/6, utilisation/1, is_same/3, get_consumer/1, get/3]). %%---------------------------------------------------------------------------- @@ -55,6 +55,9 @@ use :: {'inactive', time_micros(), time_micros(), ratio()} | {'active', time_micros(), ratio()}}. +-type consumer() :: #consumer{tag::rabbit_types:ctag(), ack_required::boolean(), + prefetch::non_neg_integer(), args::rabbit_framing:amqp_table(), + user::rabbit_types:username()}. -type ch() :: pid(). -type ack() :: non_neg_integer(). -type cr_fun() :: fun ((#cr{}) -> #cr{}). @@ -79,7 +82,8 @@ state()}. -spec send_drained() -> 'ok'. -spec deliver(fun ((boolean()) -> {fetch_result(), T}), - rabbit_amqqueue:name(), state()) -> + rabbit_amqqueue:name(), state(), boolean(), + none | {ch(), rabbit_types:ctag()} | {ch(), consumer()}) -> {'delivered', boolean(), T, state()} | {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. @@ -187,10 +191,24 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. -deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State). - +deliver(FetchFun, QName, State, ExclusiveConsumerIsOn, ExclusiveConsumer) -> + deliver(FetchFun, QName, false, State, ExclusiveConsumerIsOn, ExclusiveConsumer). + +deliver(_FetchFun, _QName, false, State, true, none) -> + {undelivered, false, + State#state{use = update_use(State#state.use, inactive)}}; +deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, ExclusiveConsumer) -> + case deliver_to_consumer(FetchFun, ExclusiveConsumer, QName) of + {delivered, R} -> + {delivered, false, R, State}; + undelivered -> + {ChPid, Consumer} = ExclusiveConsumer, + Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers), + {undelivered, true, + State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}} + end; deliver(FetchFun, QName, ConsumersChanged, - State = #state{consumers = Consumers}) -> + State = #state{consumers = Consumers}, false, _ExclusiveConsumer) -> case priority_queue:out_p(Consumers) of {empty, _} -> {undelivered, ConsumersChanged, @@ -203,7 +221,7 @@ deliver(FetchFun, QName, ConsumersChanged, Tail)}}; undelivered -> deliver(FetchFun, QName, true, - State#state{consumers = Tail}) + State#state{consumers = Tail}, false, _ExclusiveConsumer) end end. @@ -353,6 +371,26 @@ utilisation(#state{use = {active, Since, Avg}}) -> utilisation(#state{use = {inactive, Since, Active, Avg}}) -> use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg). +is_same(ChPid, ConsumerTag, {ChPid, #consumer{tag = ConsumerTag}}) -> + true; +is_same(_ChPid, _ConsumerTag, _Consumer) -> + false. + +get_consumer(#state{consumers = Consumers}) -> + case priority_queue:out_p(Consumers) of + {{value, Consumer, _Priority}, _Tail} -> Consumer; + {empty, _} -> undefined + end. + +get(ChPid, ConsumerTag, #state{consumers = Consumers}) -> + Consumers1 = priority_queue:filter(fun ({CP, #consumer{tag = CT}}) -> + (CP == ChPid) and (CT == ConsumerTag) + end, Consumers), + case priority_queue:out_p(Consumers1) of + {empty, _} -> undefined; + {{value, Consumer, _Priority}, _Tail} -> Consumer + end. + %%---------------------------------------------------------------------------- parse_credit_args(Default, Args) -> diff --git a/test/exclusive_consumer_SUITE.erl b/test/exclusive_consumer_SUITE.erl new file mode 100644 index 0000000000..75397b548a --- /dev/null +++ b/test/exclusive_consumer_SUITE.erl @@ -0,0 +1,134 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% + +-module(exclusive_consumer_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include("amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + all_messages_go_to_one_consumer, + fallback_to_another_consumer_when_first_one_is_cancelled + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + + +all_messages_go_to_one_consumer(Config) -> + {C, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}]}, + NbMessages = 5, + #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, Declare), + ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]), + #'basic.consume_ok'{consumer_tag = CTag1} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + #'basic.consume_ok'{consumer_tag = CTag2} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages)], + + receive + {consumer_done, {MessagesPerConsumer, MessageCount}} -> + ?assertEqual(NbMessages, MessageCount), + ?assertEqual(2, maps:size(MessagesPerConsumer)), + ?assertEqual(NbMessages, maps:get(CTag1, MessagesPerConsumer)), + ?assertEqual(0, maps:get(CTag2, MessagesPerConsumer)) + after 1000 -> + throw(failed) + end, + + amqp_connection:close(C), + ok. + +fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> + {C, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + Declare = #'queue.declare'{arguments = [{"x-exclusive-consumer", bool, true}]}, + NbMessages = 10, + #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, Declare), + ConsumerPid = spawn(?MODULE, consume, [{self(), {maps:new(), 0}, NbMessages}]), + #'basic.consume_ok'{consumer_tag = CTag1} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + #'basic.consume_ok'{consumer_tag = CTag2} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + #'basic.consume_ok'{consumer_tag = _CTag3} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, ConsumerPid), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, NbMessages div 2)], + + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag1}), + + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(NbMessages div 2 + 1, NbMessages - 1)], + + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag2}), + + amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}), + + receive + {consumer_done, {MessagesPerConsumer, MessageCount}} -> + ?assertEqual(NbMessages, MessageCount), + ?assertEqual(3, maps:size(MessagesPerConsumer)), + ?assertEqual(NbMessages div 2, maps:get(CTag1, MessagesPerConsumer)), + Counts = maps:values(MessagesPerConsumer), + ?assert(lists:member(NbMessages div 2, Counts)), + ?assert(lists:member(NbMessages div 2 - 1, Counts)), + ?assert(lists:member(1, Counts)) + after 1000 -> + throw(failed) + end, + + amqp_connection:close(C), + ok. + +consume({Parent, State, 0}) -> + Parent ! {consumer_done, State}; +consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) -> + receive + #'basic.consume_ok'{consumer_tag = CTag} -> + consume({Parent, {maps:put(CTag, 0, MessagesPerConsumer), MessageCount}, CountDown}); + {#'basic.deliver'{consumer_tag = CTag}, _Content} -> + consume({Parent, + {maps:update_with(CTag, fun(V) -> V + 1 end, MessagesPerConsumer), + MessageCount + 1}, + CountDown - 1}); + #'basic.cancel_ok'{} -> + consume({Parent, {MessagesPerConsumer, MessageCount}, CountDown}) + after 500 -> + exit(consumer_timeout) + end.
\ No newline at end of file diff --git a/test/unit_queue_consumers_SUITE.erl b/test/unit_queue_consumers_SUITE.erl new file mode 100644 index 0000000000..08d12e7ec5 --- /dev/null +++ b/test/unit_queue_consumers_SUITE.erl @@ -0,0 +1,102 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% + +-module(unit_queue_consumers_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +all() -> + [ + is_same, + get_consumer, + get + ]. + +is_same(_Config) -> + ?assertEqual( + true, + rabbit_queue_consumers:is_same( + self(), <<"1">>, + consumer(self(), <<"1">>) + )), + ?assertEqual( + false, + rabbit_queue_consumers:is_same( + self(), <<"1">>, + consumer(self(), <<"2">>) + )), + Pid = spawn(?MODULE, function_for_process, []), + Pid ! whatever, + ?assertEqual( + false, + rabbit_queue_consumers:is_same( + self(), <<"1">>, + consumer(Pid, <<"1">>) + )), + ok. + +get(_Config) -> + Pid = spawn(?MODULE, function_for_process, []), + Pid ! whatever, + State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])), + {Pid, {consumer, <<"2">>, _, _, _, _}} = + rabbit_queue_consumers:get(Pid, <<"2">>, State), + ?assertEqual( + undefined, + rabbit_queue_consumers:get(self(), <<"2">>, State) + ), + ?assertEqual( + undefined, + rabbit_queue_consumers:get(Pid, <<"1">>, State) + ), + ok. + +get_consumer(_Config) -> + Pid = spawn(unit_queue_consumers_SUITE, function_for_process, []), + Pid ! whatever, + State = state(consumers([consumer(self(), <<"1">>), consumer(Pid, <<"2">>), consumer(self(), <<"3">>)])), + {_Pid, {consumer, _, _, _, _, _}} = + rabbit_queue_consumers:get_consumer(State), + ?assertEqual( + undefined, + rabbit_queue_consumers:get_consumer(state(consumers([]))) + ), + ok. + +consumers([]) -> + priority_queue:new(); +consumers(Consumers) -> + consumers(Consumers, priority_queue:new()). + +consumers([H], Q) -> + priority_queue:in(H, Q); +consumers([H | T], Q) -> + consumers(T, priority_queue:in(H, Q)). + + +consumer(Pid, ConsumerTag) -> + {Pid, {consumer, ConsumerTag, true, 1, [], <<"guest">>}}. + +state(Consumers) -> + {state, Consumers, {}}. + +function_for_process() -> + receive + _ -> ok + end.
\ No newline at end of file |
