diff options
| -rw-r--r-- | include/rabbit.hrl | 14 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 192 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 122 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 6 |
5 files changed, 184 insertions, 193 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 145f6104ae..0d75310b11 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -51,7 +51,8 @@ -record(exchange, {name, type, durable, auto_delete, arguments}). --record(amqqueue, {name, durable, auto_delete, arguments, pid}). +-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, + arguments, pid}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). @@ -104,11 +105,12 @@ write :: regexp(), read :: regexp()}). -type(amqqueue() :: - #amqqueue{name :: queue_name(), - durable :: boolean(), - auto_delete :: boolean(), - arguments :: amqp_table(), - pid :: maybe(pid())}). + #amqqueue{name :: queue_name(), + durable :: boolean(), + auto_delete :: boolean(), + exclusive_owner :: maybe(pid()), + arguments :: amqp_table(), + pid :: maybe(pid())}). -type(exchange() :: #exchange{name :: exchange_name(), type :: exchange_type(), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7b88c45d26..483b5a93b7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -module(rabbit_amqqueue). --export([start/0, declare/4, delete/3, purge/1]). +-export([start/0, declare/5, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, @@ -41,8 +41,7 @@ stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). --export([claim_queue/2]). --export([basic_get/3, basic_consume/8, basic_cancel/4]). +-export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -66,8 +65,8 @@ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). --spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> - amqqueue()). +-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), + maybe(pid())) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). @@ -97,14 +96,12 @@ -spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). --spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). --spec(basic_consume/8 :: - (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(), +-spec(basic_consume/7 :: + (amqqueue(), boolean(), pid(), pid() | 'undefined', ctag(), boolean(), any()) -> - 'ok' | {'error', 'queue_owned_by_another_connection' | - 'exclusive_consume_unavailable'}). + 'ok' | {'error', 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). @@ -148,11 +145,12 @@ recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. -declare(QueueName, Durable, AutoDelete, Args) -> +declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q = start_queue_process(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, arguments = Args, + exclusive_owner = Owner, pid = none}), case gen_server2:call(Q#amqqueue.pid, {init, false}) of not_found -> rabbit_misc:not_found(QueueName); @@ -298,15 +296,12 @@ limit_all(QPids, ChPid, LimiterPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end). -claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - delegate_call(QPid, {claim_queue, ReaderPid}, infinity). - basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + delegate_call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, infinity). @@ -324,19 +319,21 @@ flush_all(QPids, ChPid) -> delegate:invoke_no_result( QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end). +internal_delete1(QueueName) -> + ok = mnesia:delete({rabbit_queue, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}), + %% we want to execute some things, as + %% decided by rabbit_exchange, after the + %% transaction. + rabbit_exchange:delete_queue_bindings(QueueName). + internal_delete(QueueName) -> case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [_] -> - ok = mnesia:delete({rabbit_queue, QueueName}), - ok = mnesia:delete({rabbit_durable_queue, QueueName}), - %% we want to execute some things, as - %% decided by rabbit_exchange, after the - %% transaction. - rabbit_exchange:delete_queue_bindings(QueueName) + [_] -> internal_delete1(QueueName) end end) of Err = {error, _} -> Err; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f12e1b70f8..a176dc4643 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -50,7 +50,6 @@ % Queue's state -record(q, {q, - owner, exclusive_consumer, has_had_consumers, backing_queue, @@ -104,7 +103,6 @@ init(Q) -> {ok, BQ} = application:get_env(backing_queue_module), {ok, #q{q = Q#amqqueue{pid = self()}, - owner = none, exclusive_consumer = none, has_had_consumers = false, backing_queue = BQ, @@ -433,10 +431,6 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> cancel_holder(_ChPid, _ConsumerTag, Holder) -> Holder. -check_queue_owner(none, _) -> ok; -check_queue_owner({ReaderPid, _}, ReaderPid) -> ok; -check_queue_owner({_, _}, _) -> mismatch. - check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; check_exclusive_access(none, false, _State) -> @@ -488,10 +482,10 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; i(pid, _) -> self(); -i(owner_pid, #q{owner = none}) -> +i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> ''; -i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) -> - ReaderPid; +i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> + ExclusiveOwner; i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) -> @@ -520,25 +514,50 @@ i(Item, _) -> %--------------------------------------------------------------------------- handle_call({init, Recover}, From, - State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, + State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable, + exclusive_owner = ExclusiveOwner}, backing_queue = BQ, backing_queue_state = undefined}) -> - %% TODO: If we're exclusively owned && our owner isn't alive && - %% Recover then we should BQ:init and then {stop, normal, - %% not_found, State}, relying on terminate to delete the queue. - case rabbit_amqqueue:internal_declare(Q, Recover) of - not_found -> - {stop, normal, not_found, State}; - Q -> - gen_server2:reply(From, Q), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), - ok = rabbit_memory_monitor:register( - self(), - {rabbit_amqqueue, set_ram_duration_target, [self()]}), - noreply(State#q{backing_queue_state = - BQ:init(QName, IsDurable, Recover)}); - Q1 -> - {stop, normal, Q1, State} + Declare = + fun() -> + case rabbit_amqqueue:internal_declare(Q, Recover) of + not_found -> + {stop, normal, not_found, State}; + Q -> + gen_server2:reply(From, Q), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, + [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, + set_ram_duration_target, [self()]}), + noreply( + State#q{backing_queue_state = + BQ:init(QName, IsDurable, Recover)}); + Q1 -> + {stop, normal, Q1, State} + end + end, + + case ExclusiveOwner of + none -> + Declare(); + Owner -> + case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of + true -> + erlang:monitor(process, Owner), + Declare(); + _ -> + case Recover of + true -> ok; + _ -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", + [QName]) + end, + %% Rely on terminate to delete the queue. + {stop, normal, not_found, + State#q{backing_queue_state = + BQ:init(QName, IsDurable, Recover)}} + end end; handle_call(info, _From, State) -> @@ -613,51 +632,44 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1}) end; -handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, +handle_call({basic_consume, NoAck, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{owner = Owner, - exclusive_consumer = ExistingHolder}) -> - case check_queue_owner(Owner, ReaderPid) of - mismatch -> - reply({error, queue_owned_by_another_connection}, State); + _From, State = #q{exclusive_consumer = ExistingHolder}) -> + case check_exclusive_access(ExistingHolder, ExclusiveConsume, + State) of + in_use -> + reply({error, exclusive_consume_unavailable}, State); ok -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume, - State) of - in_use -> - reply({error, exclusive_consume_unavailable}, State); - ok -> - C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), - Consumer = #consumer{tag = ConsumerTag, - ack_required = not NoAck}, - store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), - case ConsumerCount of - 0 -> ok = rabbit_limiter:register(LimiterPid, self()); - _ -> ok - end, - ExclusiveConsumer = case ExclusiveConsume of - true -> {ChPid, ConsumerTag}; - false -> ExistingHolder - end, - State1 = State#q{has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, - ok = maybe_send_reply(ChPid, OkMsg), - State2 = - case is_ch_blocked(C) of - true -> State1#q{ - blocked_consumers = - add_consumer( - ChPid, Consumer, - State1#q.blocked_consumers)}; - false -> run_message_queue( - State1#q{ - active_consumers = - add_consumer( - ChPid, Consumer, - State1#q.active_consumers)}) - end, - reply(ok, State2) - end + C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), + Consumer = #consumer{tag = ConsumerTag, + ack_required = not NoAck}, + store_ch_record(C#cr{consumer_count = ConsumerCount +1, + limiter_pid = LimiterPid}), + ok = case ConsumerCount of + 0 -> rabbit_limiter:register(LimiterPid, self()); + _ -> ok + end, + ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder + end, + State1 = State#q{has_had_consumers = true, + exclusive_consumer = ExclusiveConsumer}, + ok = maybe_send_reply(ChPid, OkMsg), + State2 = + case is_ch_blocked(C) of + true -> State1#q{ + blocked_consumers = + add_consumer( + ChPid, Consumer, + State1#q.blocked_consumers)}; + false -> run_message_queue( + State1#q{ + active_consumers = + add_consumer( + ChPid, Consumer, + State1#q.active_consumers)}) + end, + reply(ok, State2) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, @@ -713,29 +725,6 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, {Count, BQS1} = BQ:purge(BQS), reply({ok, Count}, State#q{backing_queue_state = BQS1}); -handle_call({claim_queue, ReaderPid}, _From, - State = #q{owner = Owner, exclusive_consumer = Holder}) -> - case Owner of - none -> - case check_exclusive_access(Holder, true, State) of - in_use -> - %% FIXME: Is this really the right answer? What if - %% an active consumer's reader is actually the - %% claiming pid? Should that be allowed? In order - %% to check, we'd need to hold not just the ch - %% pid for each consumer, but also its reader - %% pid... - reply(locked, State); - ok -> - MonitorRef = erlang:monitor(process, ReaderPid), - reply(ok, State#q{owner = {ReaderPid, MonitorRef}}) - end; - {ReaderPid, _MonitorRef} -> - reply(ok, State); - _ -> - reply(locked, State) - end; - handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). @@ -825,19 +814,10 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State). -handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, - State = #q{owner = {DownPid, MonitorRef}}) -> - %% We know here that there are no consumers on this queue that are - %% owned by other pids than the one that just went down, so since - %% exclusive in some sense implies autodelete, we delete the queue - %% here. The other way of implementing the "exclusive implies - %% autodelete" feature is to actually set autodelete when an - %% exclusive declaration is seen, but this has the problem that - %% the python tests rely on the queue not going away after a - %% basic.cancel when the queue was declared exclusive and - %% nonautodelete. - NewState = State#q{owner = none}, - {stop, normal, NewState}; +handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, + State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> + %% Exclusively owned queues must disappear with their owner. + {stop, normal, State}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of {ok, NewState} -> noreply(NewState); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a48db9c8b3..f23b6d9c7f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -298,6 +298,26 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). +with_exclusive_access_or_die(QName, ReaderPid, F) -> + case rabbit_amqqueue:with_or_die( + QName, fun(Q) -> case Q of + #amqqueue{exclusive_owner = none} -> + F(Q); + #amqqueue{exclusive_owner = ReaderPid} -> + F(Q); + _ -> + {error, wrong_exclusive_owner} + end + end) of + {error, wrong_exclusive_owner} -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QName)]); + Else -> + Else + end. + expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_allowed, "no previously declared queue", []); @@ -483,11 +503,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% In order to ensure that the consume_ok gets sent before %% any messages are sent to the consumer, we get the queue %% process to send the consume_ok on our behalf. - case rabbit_amqqueue:with_or_die( - QueueName, + case with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, self(), LimiterPid, + Q, NoAck, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -497,14 +517,6 @@ handle_method(#'basic.consume'{queue = QueueNameBin, dict:store(ActualConsumerTag, QueueName, ConsumerMapping)}}; - {error, queue_owned_by_another_connection} -> - %% The spec is silent on which exception to use - %% here. This seems reasonable? - %% FIXME: check this - - rabbit_misc:protocol_error( - resource_locked, "~s owned by another connection", - [rabbit_misc:rs(QueueName)]); {error, exclusive_consume_unavailable} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", @@ -674,34 +686,38 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; -handle_method(#'queue.declare'{queue = QueueNameBin, - passive = false, - durable = Durable, - exclusive = ExclusiveDeclare, +handle_method(#'queue.declare'{queue = QueueNameBin, + passive = false, + durable = Durable, + exclusive = ExclusiveDeclare, auto_delete = AutoDelete, - nowait = NoWait, - arguments = Args}, - _, State = #ch { virtual_host = VHostPath, - reader_pid = ReaderPid }) -> - %% FIXME: atomic create&claim + nowait = NoWait, + arguments = Args}, + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid}) -> + Owner = case ExclusiveDeclare of + true -> ReaderPid; + false -> none + end, + %% We use this in both branches, because queue_declare may yet return an + %% existing queue. Finish = - fun (Q) -> - if ExclusiveDeclare -> - case rabbit_amqqueue:claim_queue(Q, ReaderPid) of - locked -> - %% AMQP 0-8 doesn't say which - %% exception to use, so we mimic QPid - %% here. - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(Q#amqqueue.name)]); - ok -> ok - end; - true -> - ok - end, - Q + fun(Q = #amqqueue{name = QueueName}) -> + case Q of + %% "equivalent" rule. NB: we don't pay attention to + %% anything in the arguments table, so for the sake of the + %% "equivalent" rule, all tables of arguments are + %% semantically equivalant. + #amqqueue{exclusive_owner = Owner} -> + check_configure_permitted(QueueName, State), + Q; + %% exclusivity trumps non-equivalence arbitrarily + #amqqueue{} -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]) + end end, Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), @@ -713,34 +729,32 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configure_permitted(QueueName, State), - Finish(rabbit_amqqueue:declare(QueueName, - Durable, AutoDelete, Args)); - Other = #amqqueue{name = QueueName} -> - check_configure_permitted(QueueName, State), + Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner)); + #amqqueue{} = Other -> Other end, return_queue_declare_ok(State, NoWait, Q); -handle_method(#'queue.declare'{queue = QueueNameBin, +handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, - nowait = NoWait}, - _, State = #ch{ virtual_host = VHostPath }) -> + nowait = NoWait}, + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), + Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun(Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty, - nowait = NoWait - }, - _, State) -> + nowait = NoWait}, + _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with_or_die( - QueueName, + case with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> rabbit_misc:protocol_error( @@ -773,11 +787,11 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State) -> + _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( - QueueName, + {ok, PurgedMessageCount} = with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 76ebd982f4..7afa731663 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -746,17 +746,15 @@ test_user_management() -> passed. test_server_status() -> - %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>), [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), - false, false, []) || + false, false, [], none) || Name <- [<<"foo">>, <<"bar">>]], - ok = rabbit_amqqueue:claim_queue(Q, self()), - ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined, + ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined, <<"ctag">>, true, undefined), %% list queues |
