diff options
Diffstat (limited to 'deps/rabbit/src/rabbit_amqqueue.erl')
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 1889 |
1 files changed, 1889 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl new file mode 100644 index 0000000000..cd5f894680 --- /dev/null +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -0,0 +1,1889 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_amqqueue). + +-export([warn_file_limit/0]). +-export([recover/1, stop/1, start/1, declare/6, declare/7, + delete_immediately/1, delete_exclusive/2, delete/4, purge/1, + forget_all_durable/1]). +-export([pseudo_queue/2, pseudo_queue/3, immutable/1]). +-export([lookup/1, lookup_many/1, not_found_or_absent/1, not_found_or_absent_dirty/1, + with/2, with/3, with_or_die/2, + assert_equivalence/5, + check_exclusive_access/2, with_exclusive_access_or_die/3, + stat/1, deliver/2, + requeue/3, ack/3, reject/4]). +-export([not_found/1, absent/2]). +-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, + emit_info_all/5, list_local/1, info_local/1, + emit_info_local/4, emit_info_down/4]). +-export([count/0]). +-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0, + list_local_names_down/0, list_with_possible_retry/1]). +-export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]). +-export([force_event_refresh/1, notify_policy_changed/1]). +-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). +-export([basic_get/5, basic_consume/12, basic_cancel/5, notify_decorators/1]). +-export([notify_sent/2, notify_sent_queue_down/1, resume/2]). +-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]). +-export([on_node_up/1, on_node_down/1]). +-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). +-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). +-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). +-export([has_synchronised_mirrors_online/1]). +-export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]). +-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, + list_local_mirrored_classic_queues/0, list_local_mirrored_classic_names/0, + list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1, + list_local_mirrored_classic_without_synchronised_mirrors/0, + list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0]). +-export([ensure_rabbit_queue_record_is_initialized/1]). +-export([format/1]). +-export([delete_immediately_by_resource/1]). +-export([delete_crashed/1, + delete_crashed/2, + delete_crashed_internal/2]). + +-export([pid_of/1, pid_of/2]). +-export([mark_local_durable_queues_stopped/1]). + +-export([rebalance/3]). +-export([collect_info_all/2]). + +-export([is_policy_applicable/2]). +-export([is_server_named_allowed/1]). + +-export([check_max_age/1]). +-export([get_queue_type/1]). + +%% internal +-export([internal_declare/2, internal_delete/2, run_backing_queue/3, + set_ram_duration_target/2, set_maximum_since_use/2, + emit_consumers_local/3, internal_delete/3]). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("stdlib/include/qlc.hrl"). +-include("amqqueue.hrl"). + +-define(INTEGER_ARG_TYPES, [byte, short, signedint, long, + unsignedbyte, unsignedshort, unsignedint]). + +-define(MORE_CONSUMER_CREDIT_AFTER, 50). + +-define(IS_CLASSIC(QPid), is_pid(QPid)). +-define(IS_QUORUM(QPid), is_tuple(QPid)). +%%---------------------------------------------------------------------------- + +-export_type([name/0, qmsg/0, absent_reason/0]). + +-type name() :: rabbit_types:r('queue'). + +-type qpids() :: [pid()]. +-type qlen() :: rabbit_types:ok(non_neg_integer()). +-type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()). +-type qmsg() :: {name(), pid() | {atom(), pid()}, msg_id(), + boolean(), rabbit_types:message()}. +-type msg_id() :: non_neg_integer(). +-type ok_or_errors() :: + 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}. +-type absent_reason() :: 'nodedown' | 'crashed' | stopped | timeout. +-type queue_not_found() :: not_found. +-type queue_absent() :: {'absent', amqqueue:amqqueue(), absent_reason()}. +-type not_found_or_absent() :: queue_not_found() | queue_absent(). + +%%---------------------------------------------------------------------------- + +-define(CONSUMER_INFO_KEYS, + [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, + active, activity_status, arguments]). + +warn_file_limit() -> + DurableQueues = find_recoverable_queues(), + L = length(DurableQueues), + + %% if there are not enough file handles, the server might hang + %% when trying to recover queues, warn the user: + case file_handle_cache:get_limit() < L of + true -> + rabbit_log:warning( + "Recovering ~p queues, available file handles: ~p. Please increase max open file handles limit to at least ~p!~n", + [L, file_handle_cache:get_limit(), L]); + false -> + ok + end. + +-spec recover(rabbit_types:vhost()) -> + {Recovered :: [amqqueue:amqqueue()], + Failed :: [amqqueue:amqqueue()]}. +recover(VHost) -> + AllDurable = find_local_durable_queues(VHost), + rabbit_queue_type:recover(VHost, AllDurable). + +filter_pid_per_type(QPids) -> + lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids). + +filter_resource_per_type(Resources) -> + Queues = [begin + {ok, Q} = lookup(Resource), + QPid = amqqueue:get_pid(Q), + {Resource, QPid} + end || Resource <- Resources], + lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues). + +-spec stop(rabbit_types:vhost()) -> 'ok'. +stop(VHost) -> + %% Classic queues + ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost), + {ok, BQ} = application:get_env(rabbit, backing_queue_module), + ok = BQ:stop(VHost), + rabbit_quorum_queue:stop(VHost). + +-spec start([amqqueue:amqqueue()]) -> 'ok'. + +start(Qs) -> + %% At this point all recovered queues and their bindings are + %% visible to routing, so now it is safe for them to complete + %% their initialisation (which may involve interacting with other + %% queues). + _ = [amqqueue:get_pid(Q) ! {self(), go} + || Q <- Qs, + %% All queues are supposed to be classic here. + amqqueue:is_classic(Q)], + ok. + +mark_local_durable_queues_stopped(VHost) -> + ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( + do_mark_local_durable_queues_stopped(VHost), + do_mark_local_durable_queues_stopped(VHost)). + +do_mark_local_durable_queues_stopped(VHost) -> + Qs = find_local_durable_queues(VHost), + rabbit_misc:execute_mnesia_transaction( + fun() -> + [ store_queue(amqqueue:set_state(Q, stopped)) + || Q <- Qs, amqqueue:get_type(Q) =:= rabbit_classic_queue, + amqqueue:get_state(Q) =/= stopped ] + end). + +find_local_durable_queues(VHost) -> + mnesia:async_dirty( + fun () -> + qlc:e( + qlc:q( + [Q || Q <- mnesia:table(rabbit_durable_queue), + amqqueue:get_vhost(Q) =:= VHost andalso + rabbit_queue_type:is_recoverable(Q) + ])) + end). + +find_recoverable_queues() -> + mnesia:async_dirty( + fun () -> + qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue), + rabbit_queue_type:is_recoverable(Q)])) + end). + +-spec declare(name(), + boolean(), + boolean(), + rabbit_framing:amqp_table(), + rabbit_types:maybe(pid()), + rabbit_types:username()) -> + {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | + {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} | + {'absent', amqqueue:amqqueue(), absent_reason()} | + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. +declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser) -> + declare(QueueName, Durable, AutoDelete, Args, Owner, ActingUser, node()). + + +%% The Node argument suggests where the queue (master if mirrored) +%% should be. Note that in some cases (e.g. with "nodes" policy in +%% effect) this might not be possible to satisfy. + +-spec declare(name(), + boolean(), + boolean(), + rabbit_framing:amqp_table(), + rabbit_types:maybe(pid()), + rabbit_types:username(), + node()) -> + {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | + {'absent', amqqueue:amqqueue(), absent_reason()} | + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. +declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args, + Owner, ActingUser, Node) -> + ok = check_declare_arguments(QueueName, Args), + Type = get_queue_type(Args), + case rabbit_queue_type:is_enabled(Type) of + true -> + Q0 = amqqueue:new(QueueName, + none, + Durable, + AutoDelete, + Owner, + Args, + VHost, + #{user => ActingUser}, + Type), + Q = rabbit_queue_decorator:set( + rabbit_policy:set(Q0)), + rabbit_queue_type:declare(Q, Node); + false -> + {protocol_error, internal_error, + "Cannot declare a queue '~s' of type '~s' on node '~s': " + "the corresponding feature flag is disabled", + [rabbit_misc:rs(QueueName), Type, Node]} + end. + +get_queue_type(Args) -> + case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of + undefined -> + rabbit_queue_type:default(); + {_, V} -> + rabbit_queue_type:discover(V) + end. + +-spec internal_declare(amqqueue:amqqueue(), boolean()) -> + {created | existing, amqqueue:amqqueue()} | queue_absent(). + +internal_declare(Q, Recover) -> + ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( + do_internal_declare(Q, Recover), + begin + Q1 = amqqueue:upgrade(Q), + do_internal_declare(Q1, Recover) + end). + +do_internal_declare(Q, true) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> + ok = store_queue(amqqueue:set_state(Q, live)), + rabbit_misc:const({created, Q}) + end); +do_internal_declare(Q, false) -> + QueueName = amqqueue:get_name(Q), + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> + case not_found_or_absent(QueueName) of + not_found -> Q1 = rabbit_policy:set(Q), + Q2 = amqqueue:set_state(Q1, live), + ok = store_queue(Q2), + fun () -> {created, Q2} end; + {absent, _Q, _} = R -> rabbit_misc:const(R) + end; + [ExistingQ] -> + rabbit_misc:const({existing, ExistingQ}) + end + end). + +-spec update + (name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) -> + 'not_found' | amqqueue:amqqueue(). + +update(Name, Fun) -> + case mnesia:wread({rabbit_queue, Name}) of + [Q] -> + Durable = amqqueue:is_durable(Q), + Q1 = Fun(Q), + ok = mnesia:write(rabbit_queue, Q1, write), + case Durable of + true -> ok = mnesia:write(rabbit_durable_queue, Q1, write); + _ -> ok + end, + Q1; + [] -> + not_found + end. + +%% only really used for quorum queues to ensure the rabbit_queue record +%% is initialised +ensure_rabbit_queue_record_is_initialized(Q) -> + ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( + do_ensure_rabbit_queue_record_is_initialized(Q), + begin + Q1 = amqqueue:upgrade(Q), + do_ensure_rabbit_queue_record_is_initialized(Q1) + end). + +do_ensure_rabbit_queue_record_is_initialized(Q) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> + ok = store_queue(Q), + rabbit_misc:const({ok, Q}) + end). + +-spec store_queue(amqqueue:amqqueue()) -> 'ok'. + +store_queue(Q) when ?amqqueue_is_durable(Q) -> + Q1 = amqqueue:reset_mirroring_and_decorators(Q), + ok = mnesia:write(rabbit_durable_queue, Q1, write), + store_queue_ram(Q); +store_queue(Q) when not ?amqqueue_is_durable(Q) -> + store_queue_ram(Q). + +store_queue_ram(Q) -> + ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write). + +-spec update_decorators(name()) -> 'ok'. + +update_decorators(Name) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_queue, Name}) of + [Q] -> store_queue_ram(Q), + ok; + [] -> ok + end + end). + +-spec policy_changed(amqqueue:amqqueue(), amqqueue:amqqueue()) -> + 'ok'. + +policy_changed(Q1, Q2) -> + Decorators1 = amqqueue:get_decorators(Q1), + Decorators2 = amqqueue:get_decorators(Q2), + rabbit_mirror_queue_misc:update_mirrors(Q1, Q2), + D1 = rabbit_queue_decorator:select(Decorators1), + D2 = rabbit_queue_decorator:select(Decorators2), + [ok = M:policy_changed(Q1, Q2) || M <- lists:usort(D1 ++ D2)], + %% Make sure we emit a stats event even if nothing + %% mirroring-related has changed - the policy may have changed anyway. + notify_policy_changed(Q2). + +is_policy_applicable(QName, Policy) -> + case lookup(QName) of + {ok, Q} -> + rabbit_queue_type:is_policy_applicable(Q, Policy); + _ -> + %% Defaults to previous behaviour. Apply always + true + end. + +is_server_named_allowed(Args) -> + Type = get_queue_type(Args), + rabbit_queue_type:is_server_named_allowed(Type). + +-spec lookup + (name()) -> + rabbit_types:ok(amqqueue:amqqueue()) | + rabbit_types:error('not_found'); + ([name()]) -> + [amqqueue:amqqueue()]. + +lookup([]) -> []; %% optimisation +lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation +lookup(Names) when is_list(Names) -> + %% Normally we'd call mnesia:dirty_read/1 here, but that is quite + %% expensive for reasons explained in rabbit_misc:dirty_read/1. + lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]); +lookup(Name) -> + rabbit_misc:dirty_read({rabbit_queue, Name}). + +-spec lookup_many ([name()]) -> [amqqueue:amqqueue()]. + +lookup_many(Names) when is_list(Names) -> + lookup(Names). + +-spec not_found_or_absent(name()) -> not_found_or_absent(). + +not_found_or_absent(Name) -> + %% NB: we assume that the caller has already performed a lookup on + %% rabbit_queue and not found anything + case mnesia:read({rabbit_durable_queue, Name}) of + [] -> not_found; + [Q] -> {absent, Q, nodedown} %% Q exists on stopped node + end. + +-spec not_found_or_absent_dirty(name()) -> not_found_or_absent(). + +not_found_or_absent_dirty(Name) -> + %% We should read from both tables inside a tx, to get a + %% consistent view. But the chances of an inconsistency are small, + %% and only affect the error kind. + case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of + {error, not_found} -> not_found; + {ok, Q} -> {absent, Q, nodedown} + end. + +-spec get_rebalance_lock(pid()) -> + {true, {rebalance_queues, pid()}} | false. +get_rebalance_lock(Pid) when is_pid(Pid) -> + Id = {rebalance_queues, Pid}, + Nodes = [node()|nodes()], + %% Note that we're not re-trying. We want to immediately know + %% if a re-balance is taking place and stop accordingly. + case global:set_lock(Id, Nodes, 0) of + true -> + {true, Id}; + false -> + false + end. + +-spec rebalance('all' | 'quorum' | 'classic', binary(), binary()) -> + {ok, [{node(), pos_integer()}]} | {error, term()}. +rebalance(Type, VhostSpec, QueueSpec) -> + %% We have not yet acquired the rebalance_queues global lock. + maybe_rebalance(get_rebalance_lock(self()), Type, VhostSpec, QueueSpec). + +maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) -> + rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'", + [Type, VhostSpec, QueueSpec]), + Running = rabbit_nodes:all_running(), + NumRunning = length(Running), + ToRebalance = [Q || Q <- rabbit_amqqueue:list(), + filter_per_type(Type, Q), + is_replicated(Q), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)], + NumToRebalance = length(ToRebalance), + ByNode = group_by_node(ToRebalance), + Rem = case (NumToRebalance rem NumRunning) of + 0 -> 0; + _ -> 1 + end, + MaxQueuesDesired = (NumToRebalance div NumRunning) + Rem, + Result = iterative_rebalance(ByNode, MaxQueuesDesired), + global:del_lock(Id), + rabbit_log:info("Finished queue rebalance operation"), + Result; +maybe_rebalance(false, _Type, _VhostSpec, _QueueSpec) -> + rabbit_log:warning("Queue rebalance operation is in progress, please wait."), + {error, rebalance_in_progress}. + +filter_per_type(all, _) -> + true; +filter_per_type(quorum, Q) -> + ?amqqueue_is_quorum(Q); +filter_per_type(classic, Q) -> + ?amqqueue_is_classic(Q). + +rebalance_module(Q) when ?amqqueue_is_quorum(Q) -> + rabbit_quorum_queue; +rebalance_module(Q) when ?amqqueue_is_classic(Q) -> + rabbit_mirror_queue_misc. + +get_resource_name(#resource{name = Name}) -> + Name. + +is_match(Subj, E) -> + nomatch /= re:run(Subj, E). + +iterative_rebalance(ByNode, MaxQueuesDesired) -> + case maybe_migrate(ByNode, MaxQueuesDesired) of + {ok, Summary} -> + rabbit_log:info("All queue masters are balanced"), + {ok, Summary}; + {migrated, Other} -> + iterative_rebalance(Other, MaxQueuesDesired); + {not_migrated, Other} -> + iterative_rebalance(Other, MaxQueuesDesired) + end. + +maybe_migrate(ByNode, MaxQueuesDesired) -> + maybe_migrate(ByNode, MaxQueuesDesired, maps:keys(ByNode)). + +maybe_migrate(ByNode, _, []) -> + {ok, maps:fold(fun(K, V, Acc) -> + {CQs, QQs} = lists:partition(fun({_, Q, _}) -> + ?amqqueue_is_classic(Q) + end, V), + [[{<<"Node name">>, K}, {<<"Number of quorum queues">>, length(QQs)}, + {<<"Number of classic queues">>, length(CQs)}] | Acc] + end, [], ByNode)}; +maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) -> + case maps:get(N, ByNode, []) of + [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired -> + Name = amqqueue:get_name(Q), + Module = rebalance_module(Q), + OtherNodes = Module:get_replicas(Q) -- [N], + case OtherNodes of + [] -> + {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; + _ -> + [{Length, Destination} | _] = sort_by_number_of_queues(OtherNodes, ByNode), + rabbit_log:warning("Migrating queue ~p from node ~p with ~p queues to node ~p with ~p queues", + [Name, N, length(All), Destination, Length]), + case Module:transfer_leadership(Q, Destination) of + {migrated, NewNode} -> + rabbit_log:warning("Queue ~p migrated to ~p", [Name, NewNode]), + {migrated, update_migrated_queue(Destination, N, Queue, Queues, ByNode)}; + {not_migrated, Reason} -> + rabbit_log:warning("Error migrating queue ~p: ~p", [Name, Reason]), + {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)} + end + end; + [{_, _, true} | _] = All when length(All) > MaxQueuesDesired -> + rabbit_log:warning("Node ~p contains ~p queues, but all have already migrated. " + "Do nothing", [N, length(All)]), + maybe_migrate(ByNode, MaxQueuesDesired, Nodes); + All -> + rabbit_log:warning("Node ~p only contains ~p queues, do nothing", + [N, length(All)]), + maybe_migrate(ByNode, MaxQueuesDesired, Nodes) + end. + +update_not_migrated_queue(N, {Entries, Q, _}, Queues, ByNode) -> + maps:update(N, Queues ++ [{Entries, Q, true}], ByNode). + +update_migrated_queue(NewNode, OldNode, {Entries, Q, _}, Queues, ByNode) -> + maps:update_with(NewNode, + fun(L) -> L ++ [{Entries, Q, true}] end, + [{Entries, Q, true}], maps:update(OldNode, Queues, ByNode)). + +sort_by_number_of_queues(Nodes, ByNode) -> + lists:keysort(1, + lists:map(fun(Node) -> + {num_queues(Node, ByNode), Node} + end, Nodes)). + +num_queues(Node, ByNode) -> + length(maps:get(Node, ByNode, [])). + +group_by_node(Queues) -> + ByNode = lists:foldl(fun(Q, Acc) -> + Module = rebalance_module(Q), + Length = Module:queue_length(Q), + maps:update_with(amqqueue:qnode(Q), + fun(L) -> [{Length, Q, false} | L] end, + [{Length, Q, false}], Acc) + end, #{}, Queues), + maps:map(fun(_K, V) -> lists:keysort(1, V) end, ByNode). + +-spec with(name(), + qfun(A), + fun((not_found_or_absent()) -> rabbit_types:channel_exit())) -> + A | rabbit_types:channel_exit(). + +with(Name, F, E) -> + with(Name, F, E, 2000). + +with(#resource{} = Name, F, E, RetriesLeft) -> + case lookup(Name) of + {ok, Q} when ?amqqueue_state_is(Q, live) andalso RetriesLeft =:= 0 -> + %% Something bad happened to that queue, we are bailing out + %% on processing current request. + E({absent, Q, timeout}); + {ok, Q} when ?amqqueue_state_is(Q, stopped) andalso RetriesLeft =:= 0 -> + %% The queue was stopped and not migrated + E({absent, Q, stopped}); + %% The queue process has crashed with unknown error + {ok, Q} when ?amqqueue_state_is(Q, crashed) -> + E({absent, Q, crashed}); + %% The queue process has been stopped by a supervisor. + %% In that case a synchronised mirror can take over + %% so we should retry. + {ok, Q} when ?amqqueue_state_is(Q, stopped) -> + %% The queue process was stopped by the supervisor + rabbit_misc:with_exit_handler( + fun () -> retry_wait(Q, F, E, RetriesLeft) end, + fun () -> F(Q) end); + %% The queue is supposed to be active. + %% The master node can go away or queue can be killed + %% so we retry, waiting for a mirror to take over. + {ok, Q} when ?amqqueue_state_is(Q, live) -> + %% We check is_process_alive(QPid) in case we receive a + %% nodedown (for example) in F() that has nothing to do + %% with the QPid. F() should be written s.t. that this + %% cannot happen, so we bail if it does since that + %% indicates a code bug and we don't want to get stuck in + %% the retry loop. + rabbit_misc:with_exit_handler( + fun () -> retry_wait(Q, F, E, RetriesLeft) end, + fun () -> F(Q) end); + {error, not_found} -> + E(not_found_or_absent_dirty(Name)) + end. + +-spec retry_wait(amqqueue:amqqueue(), + qfun(A), + fun((not_found_or_absent()) -> rabbit_types:channel_exit()), + non_neg_integer()) -> + A | rabbit_types:channel_exit(). + +retry_wait(Q, F, E, RetriesLeft) -> + Name = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), + QState = amqqueue:get_state(Q), + case {QState, is_replicated(Q)} of + %% We don't want to repeat an operation if + %% there are no mirrors to migrate to + {stopped, false} -> + E({absent, Q, stopped}); + _ -> + case rabbit_mnesia:is_process_alive(QPid) of + true -> + % rabbitmq-server#1682 + % The old check would have crashed here, + % instead, log it and run the exit fun. absent & alive is weird, + % but better than crashing with badmatch,true + rabbit_log:debug("Unexpected alive queue process ~p~n", [QPid]), + E({absent, Q, alive}); + false -> + ok % Expected result + end, + timer:sleep(30), + with(Name, F, E, RetriesLeft - 1) + end. + +-spec with(name(), qfun(A)) -> + A | rabbit_types:error(not_found_or_absent()). + +with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). + +-spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit(). + +with_or_die(Name, F) -> + with(Name, F, die_fun(Name)). + +-spec die_fun(name()) -> + fun((not_found_or_absent()) -> rabbit_types:channel_exit()). + +die_fun(Name) -> + fun (not_found) -> not_found(Name); + ({absent, Q, Reason}) -> absent(Q, Reason) + end. + +-spec not_found(name()) -> rabbit_types:channel_exit(). + +not_found(R) -> rabbit_misc:protocol_error(not_found, "no ~s", [rabbit_misc:rs(R)]). + +-spec absent(amqqueue:amqqueue(), absent_reason()) -> + rabbit_types:channel_exit(). + +absent(Q, AbsentReason) -> + QueueName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), + IsDurable = amqqueue:is_durable(Q), + priv_absent(QueueName, QPid, IsDurable, AbsentReason). + +-spec priv_absent(name(), pid(), boolean(), absent_reason()) -> + rabbit_types:channel_exit(). + +priv_absent(QueueName, QPid, true, nodedown) -> + %% The assertion of durability is mainly there because we mention + %% durability in the error message. That way we will hopefully + %% notice if at some future point our logic changes s.t. we get + %% here with non-durable queues. + rabbit_misc:protocol_error( + not_found, + "home node '~s' of durable ~s is down or inaccessible", + [node(QPid), rabbit_misc:rs(QueueName)]); + +priv_absent(QueueName, _QPid, _IsDurable, stopped) -> + rabbit_misc:protocol_error( + not_found, + "~s process is stopped by supervisor", [rabbit_misc:rs(QueueName)]); + +priv_absent(QueueName, _QPid, _IsDurable, crashed) -> + rabbit_misc:protocol_error( + not_found, + "~s has crashed and failed to restart", [rabbit_misc:rs(QueueName)]); + +priv_absent(QueueName, _QPid, _IsDurable, timeout) -> + rabbit_misc:protocol_error( + not_found, + "failed to perform operation on ~s due to timeout", [rabbit_misc:rs(QueueName)]); + +priv_absent(QueueName, QPid, _IsDurable, alive) -> + rabbit_misc:protocol_error( + not_found, + "failed to perform operation on ~s: its master replica ~w may be stopping or being demoted", + [rabbit_misc:rs(QueueName), QPid]). + +-spec assert_equivalence + (amqqueue:amqqueue(), boolean(), boolean(), + rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> + 'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit(). + +assert_equivalence(Q, DurableDeclare, AutoDeleteDeclare, Args1, Owner) -> + QName = amqqueue:get_name(Q), + DurableQ = amqqueue:is_durable(Q), + AutoDeleteQ = amqqueue:is_auto_delete(Q), + ok = check_exclusive_access(Q, Owner, strict), + ok = rabbit_misc:assert_field_equivalence(DurableQ, DurableDeclare, QName, durable), + ok = rabbit_misc:assert_field_equivalence(AutoDeleteQ, AutoDeleteDeclare, QName, auto_delete), + ok = assert_args_equivalence(Q, Args1). + +-spec check_exclusive_access(amqqueue:amqqueue(), pid()) -> + 'ok' | rabbit_types:channel_exit(). + +check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). + +check_exclusive_access(Q, Owner, _MatchType) + when ?amqqueue_exclusive_owner_is(Q, Owner) -> + ok; +check_exclusive_access(Q, _ReaderPid, lax) + when ?amqqueue_exclusive_owner_is(Q, none) -> + ok; +check_exclusive_access(Q, _ReaderPid, _MatchType) -> + QueueName = amqqueue:get_name(Q), + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s. It could be originally " + "declared on another connection or the exclusive property value does not " + "match that of the original declaration.", + [rabbit_misc:rs(QueueName)]). + +-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) -> + A | rabbit_types:channel_exit(). + +with_exclusive_access_or_die(Name, ReaderPid, F) -> + with_or_die(Name, + fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). + +assert_args_equivalence(Q, RequiredArgs) -> + QueueName = amqqueue:get_name(Q), + Args = amqqueue:get_arguments(Q), + rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, + [Key || {Key, _Fun} <- declare_args()]). + +check_declare_arguments(QueueName, Args) -> + check_arguments(QueueName, Args, declare_args()). + +check_consume_arguments(QueueName, Args) -> + check_arguments(QueueName, Args, consume_args()). + +check_arguments(QueueName, Args, Validators) -> + [case rabbit_misc:table_lookup(Args, Key) of + undefined -> ok; + TypeVal -> case Fun(TypeVal, Args) of + ok -> ok; + {error, Error} -> rabbit_misc:protocol_error( + precondition_failed, + "invalid arg '~s' for ~s: ~255p", + [Key, rabbit_misc:rs(QueueName), + Error]) + end + end || {Key, Fun} <- Validators], + ok. + +declare_args() -> + [{<<"x-expires">>, fun check_expires_arg/2}, + {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, + {<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2}, + {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, + {<<"x-max-length">>, fun check_non_neg_int_arg/2}, + {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}, + {<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2}, + {<<"x-max-in-memory-bytes">>, fun check_non_neg_int_arg/2}, + {<<"x-max-priority">>, fun check_max_priority_arg/2}, + {<<"x-overflow">>, fun check_overflow/2}, + {<<"x-queue-mode">>, fun check_queue_mode/2}, + {<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2}, + {<<"x-queue-type">>, fun check_queue_type/2}, + {<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2}, + {<<"x-max-age">>, fun check_max_age_arg/2}, + {<<"x-max-segment-size">>, fun check_non_neg_int_arg/2}, + {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2}, + {<<"x-queue-leader-locator">>, fun check_queue_leader_locator_arg/2}]. + +consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, + {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. + +check_int_arg({Type, _}, _) -> + case lists:member(Type, ?INTEGER_ARG_TYPES) of + true -> ok; + false -> {error, {unacceptable_type, Type}} + end. + +check_bool_arg({bool, _}, _) -> ok; +check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. + +check_non_neg_int_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_negative, Val}}; + Error -> Error + end. + +check_expires_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val == 0 -> {error, {value_zero, Val}}; + ok -> rabbit_misc:check_expiry(Val); + Error -> Error + end. + +check_message_ttl_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok -> rabbit_misc:check_expiry(Val); + Error -> Error + end. + +check_max_priority_arg({Type, Val}, Args) -> + case check_non_neg_int_arg({Type, Val}, Args) of + ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok; + ok -> {error, {max_value_exceeded, Val}}; + Error -> Error + end. + +check_single_active_consumer_arg({Type, Val}, Args) -> + case check_bool_arg({Type, Val}, Args) of + ok -> ok; + Error -> Error + end. + +check_initial_cluster_size_arg({Type, Val}, Args) -> + case check_non_neg_int_arg({Type, Val}, Args) of + ok when Val == 0 -> {error, {value_zero, Val}}; + ok -> ok; + Error -> Error + end. + +check_max_age_arg({longstr, Val}, _Args) -> + case check_max_age(Val) of + {error, _} = E -> + E; + _ -> + ok + end; +check_max_age_arg({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + +check_max_age(MaxAge) -> + case re:run(MaxAge, "(^[0-9]*)(.*)", [{capture, all_but_first, list}]) of + {match, [Value, Unit]} -> + case list_to_integer(Value) of + I when I > 0 -> + case lists:member(Unit, ["Y", "M", "D", "h", "m", "s"]) of + true -> + Int = list_to_integer(Value), + Int * unit_value_in_ms(Unit); + false -> + {error, invalid_max_age} + end; + _ -> + {error, invalid_max_age} + end; + _ -> + {error, invalid_max_age} + end. + +unit_value_in_ms("Y") -> + 365 * unit_value_in_ms("D"); +unit_value_in_ms("M") -> + 30 * unit_value_in_ms("D"); +unit_value_in_ms("D") -> + 24 * unit_value_in_ms("h"); +unit_value_in_ms("h") -> + 3600 * unit_value_in_ms("s"); +unit_value_in_ms("m") -> + 60 * unit_value_in_ms("s"); +unit_value_in_ms("s") -> + 1000. + +%% Note that the validity of x-dead-letter-exchange is already verified +%% by rabbit_channel's queue.declare handler. +check_dlxname_arg({longstr, _}, _) -> ok; +check_dlxname_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. + +check_dlxrk_arg({longstr, _}, Args) -> + case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of + undefined -> {error, routing_key_but_no_dlx_defined}; + _ -> ok + end; +check_dlxrk_arg({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + +check_overflow({longstr, Val}, _Args) -> + case lists:member(Val, [<<"drop-head">>, + <<"reject-publish">>, + <<"reject-publish-dlx">>]) of + true -> ok; + false -> {error, invalid_overflow} + end; +check_overflow({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + +check_queue_leader_locator_arg({longstr, Val}, _Args) -> + case lists:member(Val, [<<"client-local">>, + <<"random">>, + <<"least-leaders">>]) of + true -> ok; + false -> {error, invalid_queue_locator_arg} + end; +check_queue_leader_locator_arg({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + +check_queue_mode({longstr, Val}, _Args) -> + case lists:member(Val, [<<"default">>, <<"lazy">>]) of + true -> ok; + false -> {error, invalid_queue_mode} + end; +check_queue_mode({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + +check_queue_type({longstr, Val}, _Args) -> + case lists:member(Val, [<<"classic">>, <<"quorum">>, <<"stream">>]) of + true -> ok; + false -> {error, invalid_queue_type} + end; +check_queue_type({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + +-spec list() -> [amqqueue:amqqueue()]. + +list() -> + list_with_possible_retry(fun do_list/0). + +do_list() -> + mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()). + +-spec count() -> non_neg_integer(). + +count() -> + mnesia:table_info(rabbit_queue, size). + +-spec list_names() -> [rabbit_amqqueue:name()]. + +list_names() -> mnesia:dirty_all_keys(rabbit_queue). + +list_names(VHost) -> [amqqueue:get_name(Q) || Q <- list(VHost)]. + +list_local_names() -> + [ amqqueue:get_name(Q) || Q <- list(), + amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())]. + +list_local_names_down() -> + [ amqqueue:get_name(Q) || Q <- list(), + is_down(Q), + is_local_to_node(amqqueue:get_pid(Q), node())]. + +is_down(Q) -> + try + info(Q, [state]) == [{state, down}] + catch + _:_ -> + true + end. + + +-spec sample_local_queues() -> [amqqueue:amqqueue()]. +sample_local_queues() -> sample_n_by_name(list_local_names(), 300). + +-spec sample_n_by_name([rabbit_amqqueue:name()], pos_integer()) -> [amqqueue:amqqueue()]. +sample_n_by_name([], _N) -> + []; +sample_n_by_name(Names, N) when is_list(Names) andalso is_integer(N) andalso N > 0 -> + %% lists:nth/2 throws when position is > list length + M = erlang:min(N, length(Names)), + Ids = lists:foldl(fun( _, Acc) when length(Acc) >= 100 -> + Acc; + (_, Acc) -> + Pick = lists:nth(rand:uniform(M), Names), + [Pick | Acc] + end, + [], lists:seq(1, M)), + lists:map(fun (Id) -> + {ok, Q} = rabbit_amqqueue:lookup(Id), + Q + end, + lists:usort(Ids)). + +-spec sample_n([amqqueue:amqqueue()], pos_integer()) -> [amqqueue:amqqueue()]. +sample_n([], _N) -> + []; +sample_n(Queues, N) when is_list(Queues) andalso is_integer(N) andalso N > 0 -> + Names = [amqqueue:get_name(Q) || Q <- Queues], + sample_n_by_name(Names, N). + + +-spec list_by_type(atom()) -> [amqqueue:amqqueue()]. + +list_by_type(classic) -> list_by_type(rabbit_classic_queue); +list_by_type(quorum) -> list_by_type(rabbit_quorum_queue); +list_by_type(Type) -> + {atomic, Qs} = + mnesia:sync_transaction( + fun () -> + mnesia:match_object(rabbit_durable_queue, + amqqueue:pattern_match_on_type(Type), + read) + end), + Qs. + +-spec list_local_quorum_queue_names() -> [rabbit_amqqueue:name()]. + +list_local_quorum_queue_names() -> + [ amqqueue:get_name(Q) || Q <- list_by_type(quorum), + amqqueue:get_state(Q) =/= crashed, + lists:member(node(), get_quorum_nodes(Q))]. + +-spec list_local_quorum_queues() -> [amqqueue:amqqueue()]. +list_local_quorum_queues() -> + [ Q || Q <- list_by_type(quorum), + amqqueue:get_state(Q) =/= crashed, + lists:member(node(), get_quorum_nodes(Q))]. + +-spec list_local_leaders() -> [amqqueue:amqqueue()]. +list_local_leaders() -> + [ Q || Q <- list(), + amqqueue:is_quorum(Q), + amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()]. + +-spec list_local_followers() -> [amqqueue:amqqueue()]. +list_local_followers() -> + [Q + || Q <- list(), + amqqueue:is_quorum(Q), + amqqueue:get_state(Q) =/= crashed, + amqqueue:get_leader(Q) =/= node(), + rabbit_quorum_queue:is_recoverable(Q) + ]. + +-spec list_local_mirrored_classic_queues() -> [amqqueue:amqqueue()]. +list_local_mirrored_classic_queues() -> + [ Q || Q <- list(), + amqqueue:get_state(Q) =/= crashed, + amqqueue:is_classic(Q), + is_local_to_node(amqqueue:get_pid(Q), node()), + is_replicated(Q)]. + +-spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()]. +list_local_mirrored_classic_names() -> + [ amqqueue:get_name(Q) || Q <- list(), + amqqueue:get_state(Q) =/= crashed, + amqqueue:is_classic(Q), + is_local_to_node(amqqueue:get_pid(Q), node()), + is_replicated(Q)]. + +-spec list_local_mirrored_classic_without_synchronised_mirrors() -> + [amqqueue:amqqueue()]. +list_local_mirrored_classic_without_synchronised_mirrors() -> + [ Q || Q <- list(), + amqqueue:get_state(Q) =/= crashed, + amqqueue:is_classic(Q), + %% filter out exclusive queues as they won't actually be mirrored + is_not_exclusive(Q), + is_local_to_node(amqqueue:get_pid(Q), node()), + is_replicated(Q), + not has_synchronised_mirrors_online(Q)]. + +-spec list_local_mirrored_classic_without_synchronised_mirrors_for_cli() -> + [#{binary => any()}]. +list_local_mirrored_classic_without_synchronised_mirrors_for_cli() -> + ClassicQs = list_local_mirrored_classic_without_synchronised_mirrors(), + [begin + #resource{name = Name} = amqqueue:get_name(Q), + #{ + <<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(amqqueue:get_name(Q))), + <<"name">> => Name, + <<"virtual_host">> => amqqueue:get_vhost(Q), + <<"type">> => <<"classic">> + } + end || Q <- ClassicQs]. + +is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) -> + Node =:= node(QPid); +is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) -> + Node =:= Leader. + +-spec list(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. + +list(VHostPath) -> + list(VHostPath, rabbit_queue). + +list(VHostPath, TableName) -> + list_with_possible_retry(fun() -> do_list(VHostPath, TableName) end). + +%% Not dirty_match_object since that would not be transactional when used in a +%% tx context +do_list(VHostPath, TableName) -> + mnesia:async_dirty( + fun () -> + mnesia:match_object( + TableName, + amqqueue:pattern_match_on_name(rabbit_misc:r(VHostPath, queue)), + read) + end). + +list_with_possible_retry(Fun) -> + %% amqqueue migration: + %% The `rabbit_queue` or `rabbit_durable_queue` tables + %% might be migrated between the time we query the pattern + %% (with the `amqqueue` module) and the time we call + %% `mnesia:dirty_match_object()`. This would lead to an empty list + %% (no object matching the now incorrect pattern), not a Mnesia + %% error. + %% + %% So if the result is an empty list and the version of the + %% `amqqueue` record changed in between, we retry the operation. + %% + %% However, we don't do this if inside a Mnesia transaction: we + %% could end up with a live lock between this started transaction + %% and the Mnesia table migration which is blocked (but the + %% rabbit_feature_flags lock is held). + AmqqueueRecordVersion = amqqueue:record_version_to_use(), + case Fun() of + [] -> + case mnesia:is_transaction() of + true -> + []; + false -> + case amqqueue:record_version_to_use() of + AmqqueueRecordVersion -> []; + _ -> Fun() + end + end; + Ret -> + Ret + end. + +-spec list_down(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. + +list_down(VHostPath) -> + case rabbit_vhost:exists(VHostPath) of + false -> []; + true -> + Present = list(VHostPath), + Durable = list(VHostPath, rabbit_durable_queue), + PresentS = sets:from_list([amqqueue:get_name(Q) || Q <- Present]), + sets:to_list(sets:filter(fun (Q) -> + N = amqqueue:get_name(Q), + not sets:is_element(N, PresentS) + end, sets:from_list(Durable))) + end. + +count(VHost) -> + try + %% this is certainly suboptimal but there is no way to count + %% things using a secondary index in Mnesia. Our counter-table-per-node + %% won't work here because with master migration of mirrored queues + %% the "ownership" of queues by nodes becomes a non-trivial problem + %% that requires a proper consensus algorithm. + length(list_for_count(VHost)) + catch _:Err -> + rabbit_log:error("Failed to fetch number of queues in vhost ~p:~n~p~n", + [VHost, Err]), + 0 + end. + +list_for_count(VHost) -> + list_with_possible_retry( + fun() -> + mnesia:dirty_index_read(rabbit_queue, + VHost, + amqqueue:field_vhost()) + end). + +-spec info_keys() -> rabbit_types:info_keys(). + +%% It should no default to classic queue keys, but a subset of those that must be shared +%% by all queue types. Not sure this is even being used, so will leave it here for backwards +%% compatibility. Each queue type handles now info(Q, all_keys) with the keys it supports. +info_keys() -> rabbit_amqqueue_process:info_keys(). + +map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). + +is_unresponsive(Q, _Timeout) when ?amqqueue_state_is(Q, crashed) -> + false; +is_unresponsive(Q, Timeout) when ?amqqueue_is_classic(Q) -> + QPid = amqqueue:get_pid(Q), + try + delegate:invoke(QPid, {gen_server2, call, [{info, [name]}, Timeout]}), + false + catch + %% TODO catch any exit?? + exit:{timeout, _} -> + true + end; +is_unresponsive(Q, Timeout) when ?amqqueue_is_quorum(Q) -> + try + Leader = amqqueue:get_pid(Q), + case rabbit_fifo_client:stat(Leader, Timeout) of + {ok, _, _} -> false; + {timeout, _} -> true; + {error, _} -> true + end + catch + exit:{timeout, _} -> + true + end. + +format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q); +format(Q) -> rabbit_amqqueue_process:format(Q). + +-spec info(amqqueue:amqqueue()) -> rabbit_types:infos(). + +info(Q) when ?is_amqqueue(Q) -> rabbit_queue_type:info(Q, all_keys). + + +-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> + rabbit_types:infos(). + +info(Q, Items) when ?is_amqqueue(Q) -> + rabbit_queue_type:info(Q, Items). + +info_down(Q, DownReason) -> + rabbit_queue_type:info_down(Q, DownReason). + +info_down(Q, Items, DownReason) -> + rabbit_queue_type:info_down(Q, Items, DownReason). + +-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. + +info_all(VHostPath) -> + map(list(VHostPath), fun (Q) -> info(Q) end) ++ + map(list_down(VHostPath), fun (Q) -> info_down(Q, down) end). + +-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) -> + [rabbit_types:infos()]. + +info_all(VHostPath, Items) -> + map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++ + map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end). + +emit_info_local(VHostPath, Items, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list_local(VHostPath)). + +emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) -> + Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ], + rabbit_control_misc:await_emitters_termination(Pids). + +collect_info_all(VHostPath, Items) -> + Nodes = rabbit_nodes:all_running(), + Ref = make_ref(), + Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ], + rabbit_control_misc:await_emitters_termination(Pids), + wait_for_queues(Ref, length(Pids), []). + +wait_for_queues(Ref, N, Acc) -> + receive + {Ref, finished} when N == 1 -> + Acc; + {Ref, finished} -> + wait_for_queues(Ref, N - 1, Acc); + {Ref, Items, continue} -> + wait_for_queues(Ref, N, [Items | Acc]) + after + 1000 -> + Acc + end. + +emit_info_down(VHostPath, Items, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end, + list_down(VHostPath)). + +emit_unresponsive_local(VHostPath, Items, Timeout, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Q) -> case is_unresponsive(Q, Timeout) of + true -> info_down(Q, Items, unresponsive); + false -> [] + end + end, list_local(VHostPath) + ). + +emit_unresponsive(Nodes, VHostPath, Items, Timeout, Ref, AggregatorPid) -> + Pids = [ spawn_link(Node, rabbit_amqqueue, emit_unresponsive_local, + [VHostPath, Items, Timeout, Ref, AggregatorPid]) || Node <- Nodes ], + rabbit_control_misc:await_emitters_termination(Pids). + +info_local(VHostPath) -> + map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end). + +list_local(VHostPath) -> + [Q || Q <- list(VHostPath), + amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())]. + +-spec force_event_refresh(reference()) -> 'ok'. + +% Note: https://www.pivotaltracker.com/story/show/166962656 +% This event is necessary for the stats timer to be initialized with +% the correct values once the management agent has started +force_event_refresh(Ref) -> + %% note: quorum queuse emit stats on periodic ticks that run unconditionally, + %% so force_event_refresh is unnecessary (and, in fact, would only produce log noise) for QQs. + ClassicQs = list_by_type(rabbit_classic_queue), + [gen_server2:cast(amqqueue:get_pid(Q), + {force_event_refresh, Ref}) || Q <- ClassicQs], + ok. + +-spec notify_policy_changed(amqqueue:amqqueue()) -> 'ok'. +notify_policy_changed(Q) when ?is_amqqueue(Q) -> + rabbit_queue_type:policy_changed(Q). + +-spec consumers(amqqueue:amqqueue()) -> + [{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(), + boolean(), atom(), + rabbit_framing:amqp_table(), rabbit_types:username()}]. + +consumers(Q) when ?amqqueue_is_classic(Q) -> + QPid = amqqueue:get_pid(Q), + delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}); +consumers(Q) when ?amqqueue_is_quorum(Q) -> + QPid = amqqueue:get_pid(Q), + case ra:local_query(QPid, fun rabbit_fifo:query_consumers/1) of + {ok, {_, Result}, _} -> maps:values(Result); + _ -> [] + end; +consumers(Q) when ?amqqueue_is_stream(Q) -> + %% TODO how??? they only exist on the channel + %% we could list the offset listener on the writer but we don't even have a consumer tag, + %% only a (channel) pid and offset + []. + +-spec consumer_info_keys() -> rabbit_types:info_keys(). + +consumer_info_keys() -> ?CONSUMER_INFO_KEYS. + +-spec consumers_all(rabbit_types:vhost()) -> + [{name(), pid(), rabbit_types:ctag(), boolean(), + non_neg_integer(), rabbit_framing:amqp_table()}]. + +consumers_all(VHostPath) -> + ConsumerInfoKeys = consumer_info_keys(), + lists:append( + map(list(VHostPath), + fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)). + +emit_consumers_all(Nodes, VHostPath, Ref, AggregatorPid) -> + Pids = [ spawn_link(Node, rabbit_amqqueue, emit_consumers_local, [VHostPath, Ref, AggregatorPid]) || Node <- Nodes ], + rabbit_control_misc:await_emitters_termination(Pids), + ok. + +emit_consumers_local(VHostPath, Ref, AggregatorPid) -> + ConsumerInfoKeys = consumer_info_keys(), + rabbit_control_misc:emitting_map( + AggregatorPid, Ref, + fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end, + list_local(VHostPath)). + +get_queue_consumer_info(Q, ConsumerInfoKeys) -> + [lists:zip(ConsumerInfoKeys, + [amqqueue:get_name(Q), ChPid, CTag, + AckRequired, Prefetch, Active, ActivityStatus, Args]) || + {ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)]. + +-spec stat(amqqueue:amqqueue()) -> + {'ok', non_neg_integer(), non_neg_integer()}. +stat(Q) -> + rabbit_queue_type:stat(Q). + +-spec pid_of(amqqueue:amqqueue()) -> + pid(). + +pid_of(Q) -> amqqueue:get_pid(Q). + +-spec pid_of(rabbit_types:vhost(), rabbit_misc:resource_name()) -> + pid() | rabbit_types:error('not_found'). + +pid_of(VHost, QueueName) -> + case lookup(rabbit_misc:r(VHost, queue, QueueName)) of + {ok, Q} -> pid_of(Q); + {error, not_found} = E -> E + end. + +-spec delete_exclusive(qpids(), pid()) -> 'ok'. + +delete_exclusive(QPids, ConnId) -> + rabbit_amqqueue_common:delete_exclusive(QPids, ConnId). + +-spec delete_immediately(qpids()) -> 'ok'. + +delete_immediately(QPids) -> + {Classic, Quorum} = filter_pid_per_type(QPids), + [gen_server2:cast(QPid, delete_immediately) || QPid <- Classic], + case Quorum of + [] -> ok; + _ -> {error, cannot_delete_quorum_queues, Quorum} + end. + +delete_immediately_by_resource(Resources) -> + {Classic, Quorum} = filter_resource_per_type(Resources), + [gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic], + [rabbit_quorum_queue:delete_immediately(Resource, QPid) + || {Resource, QPid} <- Quorum], + ok. + +-spec delete + (amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) -> + qlen() | + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}; + (amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) -> + qlen() | rabbit_types:error('in_use') | + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}; + (amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) -> + qlen() | rabbit_types:error('not_empty') | + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}; + (amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) -> + qlen() | + rabbit_types:error('in_use') | + rabbit_types:error('not_empty') | + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. +delete(Q, IfUnused, IfEmpty, ActingUser) -> + rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser). + +%% delete_crashed* INCLUDED FOR BACKWARDS COMPATBILITY REASONS +delete_crashed(Q) when ?amqqueue_is_classic(Q) -> + rabbit_classic_queue:delete_crashed(Q). + +delete_crashed(Q, ActingUser) when ?amqqueue_is_classic(Q) -> + rabbit_classic_queue:delete_crashed(Q, ActingUser). + +-spec delete_crashed_internal(amqqueue:amqqueue(), rabbit_types:username()) -> 'ok'. +delete_crashed_internal(Q, ActingUser) when ?amqqueue_is_classic(Q) -> + rabbit_classic_queue:delete_crashed_internal(Q, ActingUser). + +-spec purge(amqqueue:amqqueue()) -> qlen(). +purge(Q) when ?is_amqqueue(Q) -> + rabbit_queue_type:purge(Q). + +-spec requeue(name(), + {rabbit_fifo:consumer_tag(), [msg_id()]}, + rabbit_queue_type:state()) -> + {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()}. +requeue(QRef, {CTag, MsgIds}, QStates) -> + reject(QRef, true, {CTag, MsgIds}, QStates). + +-spec ack(name(), + {rabbit_fifo:consumer_tag(), [msg_id()]}, + rabbit_queue_type:state()) -> + {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()}. +ack(QPid, {CTag, MsgIds}, QueueStates) -> + rabbit_queue_type:settle(QPid, complete, CTag, MsgIds, QueueStates). + + +-spec reject(name(), + boolean(), + {rabbit_fifo:consumer_tag(), [msg_id()]}, + rabbit_queue_type:state()) -> + {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()}. +reject(QRef, Requeue, {CTag, MsgIds}, QStates) -> + Op = case Requeue of + true -> requeue; + false -> discard + end, + rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QStates). + +-spec notify_down_all(qpids(), pid()) -> ok_or_errors(). +notify_down_all(QPids, ChPid) -> + notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT). + +-spec notify_down_all(qpids(), pid(), non_neg_integer()) -> + ok_or_errors(). +notify_down_all(QPids, ChPid, Timeout) -> + case rpc:call(node(), delegate, invoke, + [QPids, {gen_server2, call, [{notify_down, ChPid}, infinity]}], Timeout) of + {badrpc, timeout} -> {error, {channel_operation_timeout, Timeout}}; + {badrpc, Reason} -> {error, Reason}; + {_, Bads} -> + case lists:filter( + fun ({_Pid, {exit, {R, _}, _}}) -> + rabbit_misc:is_abnormal_exit(R); + ({_Pid, _}) -> false + end, Bads) of + [] -> ok; + Bads1 -> {error, Bads1} + end; + Error -> {error, Error} + end. + +-spec activate_limit_all(qpids(), pid()) -> ok. + +activate_limit_all(QRefs, ChPid) -> + QPids = [P || P <- QRefs, ?IS_CLASSIC(P)], + delegate:invoke_no_result(QPids, {gen_server2, cast, + [{activate_limit, ChPid}]}). + +-spec credit(amqqueue:amqqueue(), + rabbit_types:ctag(), + non_neg_integer(), + boolean(), + rabbit_queue_type:state()) -> + {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()}. +credit(Q, CTag, Credit, Drain, QStates) -> + rabbit_queue_type:credit(Q, CTag, Credit, Drain, QStates). + +-spec basic_get(amqqueue:amqqueue(), boolean(), pid(), rabbit_types:ctag(), + rabbit_queue_type:state()) -> + {'ok', non_neg_integer(), qmsg(), rabbit_queue_type:state()} | + {'empty', rabbit_queue_type:state()} | + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. +basic_get(Q, NoAck, LimiterPid, CTag, QStates0) -> + rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates0). + + +-spec basic_consume(amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(), + non_neg_integer(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table(), any(), rabbit_types:username(), + rabbit_queue_type:state()) -> + {ok, rabbit_queue_type:state(), rabbit_queue_type:actions()} | + {error, term()} | + {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}. +basic_consume(Q, NoAck, ChPid, LimiterPid, + LimiterActive, ConsumerPrefetchCount, ConsumerTag, + ExclusiveConsume, Args, OkMsg, ActingUser, Contexts) -> + + QName = amqqueue:get_name(Q), + %% first phase argument validation + %% each queue type may do further validations + ok = check_consume_arguments(QName, Args), + Spec = #{no_ack => NoAck, + channel_pid => ChPid, + limiter_pid => LimiterPid, + limiter_active => LimiterActive, + prefetch_count => ConsumerPrefetchCount, + consumer_tag => ConsumerTag, + exclusive_consume => ExclusiveConsume, + args => Args, + ok_msg => OkMsg, + acting_user => ActingUser}, + rabbit_queue_type:consume(Q, Spec, Contexts). + +-spec basic_cancel(amqqueue:amqqueue(), rabbit_types:ctag(), any(), + rabbit_types:username(), + rabbit_queue_type:state()) -> + {ok, rabbit_queue_type:state()} | {error, term()}. +basic_cancel(Q, ConsumerTag, OkMsg, ActingUser, QStates) -> + rabbit_queue_type:cancel(Q, ConsumerTag, + OkMsg, ActingUser, QStates). + +-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'. + +notify_decorators(Q) -> + QPid = amqqueue:get_pid(Q), + delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}). + +notify_sent(QPid, ChPid) -> + rabbit_amqqueue_common:notify_sent(QPid, ChPid). + +notify_sent_queue_down(QPid) -> + rabbit_amqqueue_common:notify_sent_queue_down(QPid). + +-spec resume(pid(), pid()) -> 'ok'. + +resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, + [{resume, ChPid}]}). + +internal_delete1(QueueName, OnlyDurable) -> + internal_delete1(QueueName, OnlyDurable, normal). + +internal_delete1(QueueName, OnlyDurable, Reason) -> + ok = mnesia:delete({rabbit_queue, QueueName}), + case Reason of + auto_delete -> + case mnesia:wread({rabbit_durable_queue, QueueName}) of + [] -> ok; + [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName}) + end; + _ -> + mnesia:delete({rabbit_durable_queue, QueueName}) + end, + %% we want to execute some things, as decided by rabbit_exchange, + %% after the transaction. + rabbit_binding:remove_for_destination(QueueName, OnlyDurable). + +-spec internal_delete(name(), rabbit_types:username()) -> 'ok'. + +internal_delete(QueueName, ActingUser) -> + internal_delete(QueueName, ActingUser, normal). + +internal_delete(QueueName, ActingUser, Reason) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> + case {mnesia:wread({rabbit_queue, QueueName}), + mnesia:wread({rabbit_durable_queue, QueueName})} of + {[], []} -> + rabbit_misc:const(ok); + _ -> + Deletions = internal_delete1(QueueName, false, Reason), + T = rabbit_binding:process_deletions(Deletions, + ?INTERNAL_USER), + fun() -> + ok = T(), + rabbit_core_metrics:queue_deleted(QueueName), + ok = rabbit_event:notify(queue_deleted, + [{name, QueueName}, + {user_who_performed_action, ActingUser}]) + end + end + end). + +-spec forget_all_durable(node()) -> 'ok'. + +forget_all_durable(Node) -> + %% Note rabbit is not running so we avoid e.g. the worker pool. Also why + %% we don't invoke the return from rabbit_binding:process_deletions/1. + {atomic, ok} = + mnesia:sync_transaction( + fun () -> + Qs = mnesia:match_object(rabbit_durable_queue, + amqqueue:pattern_match_all(), write), + [forget_node_for_queue(Node, Q) || + Q <- Qs, + is_local_to_node(amqqueue:get_pid(Q), Node)], + ok + end), + ok. + +%% Try to promote a mirror while down - it should recover as a +%% master. We try to take the oldest mirror here for best chance of +%% recovery. +forget_node_for_queue(_DeadNode, Q) + when ?amqqueue_is_quorum(Q) -> + ok; +forget_node_for_queue(DeadNode, Q) -> + RS = amqqueue:get_recoverable_slaves(Q), + forget_node_for_queue(DeadNode, RS, Q). + +forget_node_for_queue(_DeadNode, [], Q) -> + %% No mirrors to recover from, queue is gone. + %% Don't process_deletions since that just calls callbacks and we + %% are not really up. + Name = amqqueue:get_name(Q), + internal_delete1(Name, true); + +%% Should not happen, but let's be conservative. +forget_node_for_queue(DeadNode, [DeadNode | T], Q) -> + forget_node_for_queue(DeadNode, T, Q); + +forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) -> + Type = amqqueue:get_type(Q), + case {node_permits_offline_promotion(H), Type} of + {false, _} -> forget_node_for_queue(DeadNode, T, Q); + {true, rabbit_classic_queue} -> + Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)), + ok = mnesia:write(rabbit_durable_queue, Q1, write); + {true, rabbit_quorum_queue} -> + ok + end. + +node_permits_offline_promotion(Node) -> + case node() of + Node -> not rabbit:is_running(); %% [1] + _ -> All = rabbit_mnesia:cluster_nodes(all), + Running = rabbit_nodes:all_running(), + lists:member(Node, All) andalso + not lists:member(Node, Running) %% [2] + end. +%% [1] In this case if we are a real running node (i.e. rabbitmqctl +%% has RPCed into us) then we cannot allow promotion. If on the other +%% hand we *are* rabbitmqctl impersonating the node for offline +%% node-forgetting then we can. +%% +%% [2] This is simpler; as long as it's down that's OK + +-spec run_backing_queue + (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> + 'ok'. + +run_backing_queue(QPid, Mod, Fun) -> + gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). + +-spec set_ram_duration_target(pid(), number() | 'infinity') -> 'ok'. + +set_ram_duration_target(QPid, Duration) -> + gen_server2:cast(QPid, {set_ram_duration_target, Duration}). + +-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. + +set_maximum_since_use(QPid, Age) -> + gen_server2:cast(QPid, {set_maximum_since_use, Age}). + +-spec update_mirroring(pid()) -> 'ok'. + +update_mirroring(QPid) -> + ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}). + +-spec sync_mirrors(amqqueue:amqqueue() | pid()) -> + 'ok' | rabbit_types:error('not_mirrored'). + +sync_mirrors(Q) when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), + delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}); +sync_mirrors(QPid) -> + delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}). + +-spec cancel_sync_mirrors(amqqueue:amqqueue() | pid()) -> + 'ok' | {'ok', 'not_syncing'}. + +cancel_sync_mirrors(Q) when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), + delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}); +cancel_sync_mirrors(QPid) -> + delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}). + +-spec is_replicated(amqqueue:amqqueue()) -> boolean(). + +is_replicated(Q) when ?amqqueue_is_quorum(Q) -> + true; +is_replicated(Q) -> + rabbit_mirror_queue_misc:is_mirrored(Q). + +is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) -> + false; +is_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) -> + true. + +is_not_exclusive(Q) -> + not is_exclusive(Q). + +is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) -> + false; +is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) -> + Pid = amqqueue:get_pid(Q), + not rabbit_mnesia:is_process_alive(Pid). + +-spec has_synchronised_mirrors_online(amqqueue:amqqueue()) -> boolean(). +has_synchronised_mirrors_online(Q) -> + %% a queue with all mirrors down would have no mirror pids. + %% We treat these as in sync intentionally to avoid false positives. + MirrorPids = amqqueue:get_sync_slave_pids(Q), + MirrorPids =/= [] andalso lists:any(fun rabbit_misc:is_process_alive/1, MirrorPids). + +-spec on_node_up(node()) -> 'ok'. + +on_node_up(Node) -> + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + Qs = mnesia:match_object(rabbit_queue, + amqqueue:pattern_match_all(), write), + [maybe_clear_recoverable_node(Node, Q) || Q <- Qs], + ok + end). + +maybe_clear_recoverable_node(Node, Q) -> + SPids = amqqueue:get_sync_slave_pids(Q), + RSs = amqqueue:get_recoverable_slaves(Q), + case lists:member(Node, RSs) of + true -> + %% There is a race with + %% rabbit_mirror_queue_slave:record_synchronised/1 called + %% by the incoming mirror node and this function, called + %% by the master node. If this function is executed after + %% record_synchronised/1, the node is erroneously removed + %% from the recoverable mirrors list. + %% + %% We check if the mirror node's queue PID is alive. If it is + %% the case, then this function is executed after. In this + %% situation, we don't touch the queue record, it is already + %% correct. + DoClearNode = + case [SP || SP <- SPids, node(SP) =:= Node] of + [SPid] -> not rabbit_misc:is_process_alive(SPid); + _ -> true + end, + if + DoClearNode -> RSs1 = RSs -- [Node], + store_queue( + amqqueue:set_recoverable_slaves(Q, RSs1)); + true -> ok + end; + false -> + ok + end. + +-spec on_node_down(node()) -> 'ok'. + +on_node_down(Node) -> + {QueueNames, QueueDeletions} = delete_queues_on_node_down(Node), + notify_queue_binding_deletions(QueueDeletions), + rabbit_core_metrics:queues_deleted(QueueNames), + notify_queues_deleted(QueueNames), + ok. + +delete_queues_on_node_down(Node) -> + lists:unzip(lists:flatten([ + rabbit_misc:execute_mnesia_transaction( + fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end + ) || Queues <- partition_queues(queues_to_delete_when_node_down(Node)) + ])). + +delete_queue(QueueName) -> + ok = mnesia:delete({rabbit_queue, QueueName}), + rabbit_binding:remove_transient_for_destination(QueueName). + +% If there are many queues and we delete them all in a single Mnesia transaction, +% this can block all other Mnesia operations for a really long time. +% In situations where a node wants to (re-)join a cluster, +% Mnesia won't be able to sync on the new node until this operation finishes. +% As a result, we want to have multiple Mnesia transactions so that other +% operations can make progress in between these queue delete transactions. +% +% 10 queues per Mnesia transaction is an arbitrary number, but it seems to work OK with 50k queues per node. +partition_queues([Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9 | T]) -> + [[Q0,Q1,Q2,Q3,Q4,Q5,Q6,Q7,Q8,Q9] | partition_queues(T)]; +partition_queues(T) -> + [T]. + +queues_to_delete_when_node_down(NodeDown) -> + rabbit_misc:execute_mnesia_transaction(fun () -> + qlc:e(qlc:q([amqqueue:get_name(Q) || + Q <- mnesia:table(rabbit_queue), + amqqueue:qnode(Q) == NodeDown andalso + not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) andalso + (not rabbit_amqqueue:is_replicated(Q) orelse + rabbit_amqqueue:is_dead_exclusive(Q))] + )) + end). + +notify_queue_binding_deletions(QueueDeletions) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun() -> + rabbit_binding:process_deletions( + lists:foldl( + fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + QueueDeletions + ), + ?INTERNAL_USER + ) + end + ). + +notify_queues_deleted(QueueDeletions) -> + lists:foreach( + fun(Queue) -> + ok = rabbit_event:notify(queue_deleted, + [{name, Queue}, + {user, ?INTERNAL_USER}]) + end, + QueueDeletions). + +-spec pseudo_queue(name(), pid()) -> amqqueue:amqqueue(). + +pseudo_queue(QueueName, Pid) -> + pseudo_queue(QueueName, Pid, false). + +-spec pseudo_queue(name(), pid(), boolean()) -> amqqueue:amqqueue(). + +pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable) + when is_pid(Pid) andalso + is_boolean(Durable) -> + amqqueue:new(QueueName, + Pid, + Durable, + false, + none, % Owner, + [], + undefined, % VHost, + #{user => undefined}, % ActingUser + rabbit_classic_queue % Type + ). + +-spec immutable(amqqueue:amqqueue()) -> amqqueue:amqqueue(). + +immutable(Q) -> amqqueue:set_immutable(Q). + +-spec deliver([amqqueue:amqqueue()], rabbit_types:delivery()) -> 'ok'. + +deliver(Qs, Delivery) -> + _ = rabbit_queue_type:deliver(Qs, Delivery, stateless), + ok. + +get_quorum_nodes(Q) -> + case amqqueue:get_type_state(Q) of + #{nodes := Nodes} -> + Nodes; + _ -> + [] + end. |