summaryrefslogtreecommitdiff
path: root/src/rabbit_quorum_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_quorum_queue.erl')
-rw-r--r--src/rabbit_quorum_queue.erl1496
1 files changed, 0 insertions, 1496 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
deleted file mode 100644
index a51fc3f43e..0000000000
--- a/src/rabbit_quorum_queue.erl
+++ /dev/null
@@ -1,1496 +0,0 @@
-%% This Source Code Form is subject to the terms of the Mozilla Public
-%% License, v. 2.0. If a copy of the MPL was not distributed with this
-%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
-%%
-%% Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved.
-%%
-
--module(rabbit_quorum_queue).
-
--behaviour(rabbit_queue_type).
-
--export([init/1,
- close/1,
- update/2,
- handle_event/2]).
--export([is_recoverable/1, recover/2, stop/1, delete/4, delete_immediately/2]).
--export([state_info/1, info/2, stat/1, infos/1]).
--export([settle/4, dequeue/4, consume/3, cancel/5]).
--export([credit/4]).
--export([purge/1]).
--export([stateless_deliver/2, deliver/3, deliver/2]).
--export([dead_letter_publish/4]).
--export([queue_name/1]).
--export([cluster_state/1, status/2]).
--export([update_consumer_handler/8, update_consumer/9]).
--export([cancel_consumer_handler/2, cancel_consumer/3]).
--export([become_leader/2, handle_tick/3, spawn_deleter/1]).
--export([rpc_delete_metrics/1]).
--export([format/1]).
--export([open_files/1]).
--export([peek/2, peek/3]).
--export([add_member/4]).
--export([delete_member/3]).
--export([requeue/3]).
--export([policy_changed/1]).
--export([format_ra_event/3]).
--export([cleanup_data_dir/0]).
--export([shrink_all/1,
- grow/4]).
--export([transfer_leadership/2, get_replicas/1, queue_length/1]).
--export([file_handle_leader_reservation/1, file_handle_other_reservation/0]).
--export([file_handle_release_reservation/0]).
--export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0,
- filter_quorum_critical/1, filter_quorum_critical/2,
- all_replica_states/0]).
--export([capabilities/0]).
--export([repair_amqqueue_nodes/1,
- repair_amqqueue_nodes/2
- ]).
--export([reclaim_memory/2]).
-
--export([is_enabled/0,
- declare/2]).
-
--import(rabbit_queue_type_util, [args_policy_lookup/3,
- qname_to_internal_name/1]).
-
--include_lib("stdlib/include/qlc.hrl").
--include("rabbit.hrl").
--include("amqqueue.hrl").
-
--type msg_id() :: non_neg_integer().
--type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(), rabbit_types:message()}.
-
--define(STATISTICS_KEYS,
- [policy,
- operator_policy,
- effective_policy_definition,
- consumers,
- memory,
- state,
- garbage_collection,
- leader,
- online,
- members,
- open_files,
- single_active_consumer_pid,
- single_active_consumer_ctag,
- messages_ram,
- message_bytes_ram
- ]).
-
--define(INFO_KEYS, [name, durable, auto_delete, arguments, pid, messages, messages_ready,
- messages_unacknowledged, local_state, type] ++ ?STATISTICS_KEYS).
-
--define(RPC_TIMEOUT, 1000).
--define(TICK_TIMEOUT, 5000). %% the ra server tick time
--define(DELETE_TIMEOUT, 5000).
--define(ADD_MEMBER_TIMEOUT, 5000).
-
-%%----------- rabbit_queue_type ---------------------------------------------
-
--spec is_enabled() -> boolean().
-is_enabled() ->
- rabbit_feature_flags:is_enabled(quorum_queue).
-
-%%----------------------------------------------------------------------------
-
--spec init(amqqueue:amqqueue()) -> rabbit_fifo_client:state().
-init(Q) when ?is_amqqueue(Q) ->
- {ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
- %% This lookup could potentially return an {error, not_found}, but we do not
- %% know what to do if the queue has `disappeared`. Let it crash.
- {Name, _LeaderNode} = Leader = amqqueue:get_pid(Q),
- Nodes = get_nodes(Q),
- QName = amqqueue:get_name(Q),
- %% Ensure the leader is listed first
- Servers0 = [{Name, N} || N <- Nodes],
- Servers = [Leader | lists:delete(Leader, Servers0)],
- rabbit_fifo_client:init(QName, Servers, SoftLimit,
- fun() -> credit_flow:block(Name) end,
- fun() -> credit_flow:unblock(Name), ok end).
-
--spec close(rabbit_fifo_client:state()) -> ok.
-close(_State) ->
- ok.
-
--spec update(amqqueue:amqqueue(), rabbit_fifo_client:state()) ->
- rabbit_fifo_client:state().
-update(Q, State) when ?amqqueue_is_quorum(Q) ->
- %% QQ state maintains it's own updates
- State.
-
--spec handle_event({amqqueue:ra_server_id(), any()},
- rabbit_fifo_client:state()) ->
- {ok, rabbit_fifo_client:state(), rabbit_queue_type:actions()} |
- eol |
- {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
-handle_event({From, Evt}, QState) ->
- rabbit_fifo_client:handle_ra_event(From, Evt, QState).
-
--spec declare(amqqueue:amqqueue(), node()) ->
- {new | existing, amqqueue:amqqueue()} |
- {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
-declare(Q, _Node) when ?amqqueue_is_quorum(Q) ->
- case rabbit_queue_type_util:run_checks(
- [fun rabbit_queue_type_util:check_auto_delete/1,
- fun rabbit_queue_type_util:check_exclusive/1,
- fun rabbit_queue_type_util:check_non_durable/1],
- Q) of
- ok ->
- start_cluster(Q);
- Err ->
- Err
- end.
-
-start_cluster(Q) ->
- QName = amqqueue:get_name(Q),
- Durable = amqqueue:is_durable(Q),
- AutoDelete = amqqueue:is_auto_delete(Q),
- Arguments = amqqueue:get_arguments(Q),
- Opts = amqqueue:get_options(Q),
- ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
- QuorumSize = get_default_quorum_initial_group_size(Arguments),
- RaName = qname_to_internal_name(QName),
- Id = {RaName, node()},
- Nodes = select_quorum_nodes(QuorumSize, rabbit_mnesia:cluster_nodes(all)),
- NewQ0 = amqqueue:set_pid(Q, Id),
- NewQ1 = amqqueue:set_type_state(NewQ0, #{nodes => Nodes}),
- case rabbit_amqqueue:internal_declare(NewQ1, false) of
- {created, NewQ} ->
- TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
- RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout)
- || ServerId <- members(NewQ)],
- case ra:start_cluster(RaConfs) of
- {ok, _, _} ->
- %% TODO: handle error - what should be done if the
- %% config cannot be updated
- ok = rabbit_fifo_client:update_machine_state(Id,
- ra_machine_config(NewQ)),
- %% force a policy change to ensure the latest config is
- %% updated even when running the machine version from 0
- rabbit_event:notify(queue_created,
- [{name, QName},
- {durable, Durable},
- {auto_delete, AutoDelete},
- {arguments, Arguments},
- {user_who_performed_action,
- ActingUser}]),
- {new, NewQ};
- {error, Error} ->
- _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
- {protocol_error, internal_error,
- "Cannot declare a queue '~s' on node '~s': ~255p",
- [rabbit_misc:rs(QName), node(), Error]}
- end;
- {existing, _} = Ex ->
- Ex
- end.
-
-ra_machine(Q) ->
- {module, rabbit_fifo, ra_machine_config(Q)}.
-
-ra_machine_config(Q) when ?is_amqqueue(Q) ->
- QName = amqqueue:get_name(Q),
- {Name, _} = amqqueue:get_pid(Q),
- %% take the minimum value of the policy and the queue arg if present
- MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
- %% prefer the policy defined strategy if available
- Overflow = args_policy_lookup(<<"overflow">>, fun (A, _B) -> A end , Q),
- MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
- MaxMemoryLength = args_policy_lookup(<<"max-in-memory-length">>, fun min/2, Q),
- MaxMemoryBytes = args_policy_lookup(<<"max-in-memory-bytes">>, fun min/2, Q),
- DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
- Expires = args_policy_lookup(<<"expires">>,
- fun (A, _B) -> A end,
- Q),
- #{name => Name,
- queue_resource => QName,
- dead_letter_handler => dlx_mfa(Q),
- become_leader_handler => {?MODULE, become_leader, [QName]},
- max_length => MaxLength,
- max_bytes => MaxBytes,
- max_in_memory_length => MaxMemoryLength,
- max_in_memory_bytes => MaxMemoryBytes,
- single_active_consumer_on => single_active_consumer_on(Q),
- delivery_limit => DeliveryLimit,
- overflow_strategy => overflow(Overflow, drop_head, QName),
- created => erlang:system_time(millisecond),
- expires => Expires
- }.
-
-single_active_consumer_on(Q) ->
- QArguments = amqqueue:get_arguments(Q),
- case rabbit_misc:table_lookup(QArguments, <<"x-single-active-consumer">>) of
- {bool, true} -> true;
- _ -> false
- end.
-
-update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
- local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
- [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args]).
-
-update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
- catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired,
- QName, Prefetch, Active, ActivityStatus, Args).
-
-cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
- local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer,
- [QName, ChPid, ConsumerTag]).
-
-cancel_consumer(QName, ChPid, ConsumerTag) ->
- catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
- emit_consumer_deleted(ChPid, ConsumerTag, QName, ?INTERNAL_USER).
-
-local_or_remote_handler(ChPid, Module, Function, Args) ->
- Node = node(ChPid),
- case Node == node() of
- true ->
- erlang:apply(Module, Function, Args);
- false ->
- %% this could potentially block for a while if the node is
- %% in disconnected state or tcp buffers are full
- rpc:cast(Node, Module, Function, Args)
- end.
-
-become_leader(QName, Name) ->
- Fun = fun (Q1) ->
- amqqueue:set_state(
- amqqueue:set_pid(Q1, {Name, node()}),
- live)
- end,
- %% as this function is called synchronously when a ra node becomes leader
- %% we need to ensure there is no chance of blocking as else the ra node
- %% may not be able to establish it's leadership
- spawn(fun() ->
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- rabbit_amqqueue:update(QName, Fun)
- end),
- case rabbit_amqqueue:lookup(QName) of
- {ok, Q0} when ?is_amqqueue(Q0) ->
- Nodes = get_nodes(Q0),
- [rpc:call(Node, ?MODULE, rpc_delete_metrics,
- [QName], ?RPC_TIMEOUT)
- || Node <- Nodes, Node =/= node()];
- _ ->
- ok
- end
- end).
-
--spec all_replica_states() -> {node(), #{atom() => atom()}}.
-all_replica_states() ->
- Rows = ets:tab2list(ra_state),
- {node(), maps:from_list(Rows)}.
-
--spec list_with_minimum_quorum() -> [amqqueue:amqqueue()].
-list_with_minimum_quorum() ->
- filter_quorum_critical(
- rabbit_amqqueue:list_local_quorum_queues()).
-
--spec list_with_minimum_quorum_for_cli() -> [#{binary() => term()}].
-list_with_minimum_quorum_for_cli() ->
- QQs = list_with_minimum_quorum(),
- [begin
- #resource{name = Name} = amqqueue:get_name(Q),
- #{
- <<"readable_name">> => rabbit_data_coercion:to_binary(rabbit_misc:rs(amqqueue:get_name(Q))),
- <<"name">> => Name,
- <<"virtual_host">> => amqqueue:get_vhost(Q),
- <<"type">> => <<"quorum">>
- }
- end || Q <- QQs].
-
--spec filter_quorum_critical([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()].
-filter_quorum_critical(Queues) ->
- %% Example map of QQ replica states:
- %% #{rabbit@warp10 =>
- %% #{'%2F_qq.636' => leader,'%2F_qq.243' => leader,
- %% '%2F_qq.1939' => leader,'%2F_qq.1150' => leader,
- %% '%2F_qq.1109' => leader,'%2F_qq.1654' => leader,
- %% '%2F_qq.1679' => leader,'%2F_qq.1003' => leader,
- %% '%2F_qq.1593' => leader,'%2F_qq.1765' => leader,
- %% '%2F_qq.933' => leader,'%2F_qq.38' => leader,
- %% '%2F_qq.1357' => leader,'%2F_qq.1345' => leader,
- %% '%2F_qq.1694' => leader,'%2F_qq.994' => leader,
- %% '%2F_qq.490' => leader,'%2F_qq.1704' => leader,
- %% '%2F_qq.58' => leader,'%2F_qq.564' => leader,
- %% '%2F_qq.683' => leader,'%2F_qq.386' => leader,
- %% '%2F_qq.753' => leader,'%2F_qq.6' => leader,
- %% '%2F_qq.1590' => leader,'%2F_qq.1363' => leader,
- %% '%2F_qq.882' => leader,'%2F_qq.1161' => leader,...}}
- ReplicaStates = maps:from_list(
- rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(),
- ?MODULE, all_replica_states, [])),
- filter_quorum_critical(Queues, ReplicaStates).
-
--spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => #{atom() => atom()}}) -> [amqqueue:amqqueue()].
-
-filter_quorum_critical(Queues, ReplicaStates) ->
- lists:filter(fun (Q) ->
- MemberNodes = rabbit_amqqueue:get_quorum_nodes(Q),
- {Name, _Node} = amqqueue:get_pid(Q),
- AllUp = lists:filter(fun (N) ->
- {Name, _} = amqqueue:get_pid(Q),
- case maps:get(N, ReplicaStates, undefined) of
- #{Name := State} when State =:= follower orelse State =:= leader ->
- true;
- _ -> false
- end
- end, MemberNodes),
- MinQuorum = length(MemberNodes) div 2 + 1,
- length(AllUp) =< MinQuorum
- end, Queues).
-
-capabilities() ->
- #{policies => [<<"max-length">>, <<"max-length-bytes">>, <<"overflow">>,
- <<"expires">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
- <<"delivery-limit">>, <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>],
- queue_arguments => [<<"x-expires">>, <<"x-dead-letter-exchange">>,
- <<"x-dead-letter-routing-key">>, <<"x-max-length">>,
- <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
- <<"x-max-in-memory-bytes">>, <<"x-overflow">>,
- <<"x-single-active-consumer">>, <<"x-queue-type">>,
- <<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>],
- consumer_arguments => [<<"x-priority">>, <<"x-credit">>],
- server_named => false}.
-
-rpc_delete_metrics(QName) ->
- ets:delete(queue_coarse_metrics, QName),
- ets:delete(queue_metrics, QName),
- ok.
-
-spawn_deleter(QName) ->
- spawn(fun () ->
- {ok, Q} = rabbit_amqqueue:lookup(QName),
- delete(Q, false, false, <<"expired">>)
- end).
-
-handle_tick(QName,
- {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
- Nodes) ->
- %% this makes calls to remote processes so cannot be run inside the
- %% ra server
- Self = self(),
- _ = spawn(fun() ->
- R = reductions(Name),
- rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
- Util = case C of
- 0 -> 0;
- _ -> rabbit_fifo:usage(Name)
- end,
- Infos = [{consumers, C},
- {consumer_utilisation, Util},
- {message_bytes_ready, MsgBytesReady},
- {message_bytes_unacknowledged, MsgBytesUnack},
- {message_bytes, MsgBytesReady + MsgBytesUnack},
- {message_bytes_persistent, MsgBytesReady + MsgBytesUnack},
- {messages_persistent, M}
-
- | infos(QName, ?STATISTICS_KEYS -- [consumers])],
- rabbit_core_metrics:queue_stats(QName, Infos),
- rabbit_event:notify(queue_stats,
- Infos ++ [{name, QName},
- {messages, M},
- {messages_ready, MR},
- {messages_unacknowledged, MU},
- {reductions, R}]),
- ok = repair_leader_record(QName, Self),
- ExpectedNodes = rabbit_mnesia:cluster_nodes(all),
- case Nodes -- ExpectedNodes of
- [] ->
- ok;
- Stale ->
- rabbit_log:info("~s: stale nodes detected. Purging ~w~n",
- [rabbit_misc:rs(QName), Stale]),
- %% pipeline purge command
- {ok, Q} = rabbit_amqqueue:lookup(QName),
- ok = ra:pipeline_command(amqqueue:get_pid(Q),
- rabbit_fifo:make_purge_nodes(Stale)),
-
- ok
- end
- end),
- ok.
-
-repair_leader_record(QName, Self) ->
- {ok, Q} = rabbit_amqqueue:lookup(QName),
- Node = node(),
- case amqqueue:get_pid(Q) of
- {_, Node} ->
- %% it's ok - we don't need to do anything
- ok;
- _ ->
- rabbit_log:debug("~s: repairing leader record",
- [rabbit_misc:rs(QName)]),
- {_, Name} = erlang:process_info(Self, registered_name),
- become_leader(QName, Name)
- end,
- ok.
-
-repair_amqqueue_nodes(VHost, QueueName) ->
- QName = #resource{virtual_host = VHost, name = QueueName, kind = queue},
- repair_amqqueue_nodes(QName).
-
--spec repair_amqqueue_nodes(rabbit_types:r('queue') | amqqueue:amqqueue()) ->
- ok | repaired.
-repair_amqqueue_nodes(QName = #resource{}) ->
- {ok, Q0} = rabbit_amqqueue:lookup(QName),
- repair_amqqueue_nodes(Q0);
-repair_amqqueue_nodes(Q0) ->
- QName = amqqueue:get_name(Q0),
- Leader = amqqueue:get_pid(Q0),
- {ok, Members, _} = ra:members(Leader),
- RaNodes = [N || {_, N} <- Members],
- #{nodes := Nodes} = amqqueue:get_type_state(Q0),
- case lists:sort(RaNodes) =:= lists:sort(Nodes) of
- true ->
- %% up to date
- ok;
- false ->
- %% update amqqueue record
- Fun = fun (Q) ->
- TS0 = amqqueue:get_type_state(Q),
- TS = TS0#{nodes => RaNodes},
- amqqueue:set_type_state(Q, TS)
- end,
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- rabbit_amqqueue:update(QName, Fun)
- end),
- repaired
- end.
-
-reductions(Name) ->
- try
- {reductions, R} = process_info(whereis(Name), reductions),
- R
- catch
- error:badarg ->
- 0
- end.
-
-is_recoverable(Q) ->
- Node = node(),
- Nodes = get_nodes(Q),
- lists:member(Node, Nodes).
-
--spec recover(binary(), [amqqueue:amqqueue()]) ->
- {[amqqueue:amqqueue()], [amqqueue:amqqueue()]}.
-recover(_Vhost, Queues) ->
- lists:foldl(
- fun (Q0, {R0, F0}) ->
- {Name, _} = amqqueue:get_pid(Q0),
- QName = amqqueue:get_name(Q0),
- Nodes = get_nodes(Q0),
- Formatter = {?MODULE, format_ra_event, [QName]},
- Res = case ra:restart_server({Name, node()},
- #{ra_event_formatter => Formatter}) of
- ok ->
- % queue was restarted, good
- ok;
- {error, Err1}
- when Err1 == not_started orelse
- Err1 == name_not_registered ->
- % queue was never started on this node
- % so needs to be started from scratch.
- Machine = ra_machine(Q0),
- RaNodes = [{Name, Node} || Node <- Nodes],
- case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of
- ok -> ok;
- Err2 ->
- rabbit_log:warning("recover: quorum queue ~w could not"
- " be started ~w", [Name, Err2]),
- fail
- end;
- {error, {already_started, _}} ->
- %% this is fine and can happen if a vhost crashes and performs
- %% recovery whilst the ra application and servers are still
- %% running
- ok;
- Err ->
- %% catch all clause to avoid causing the vhost not to start
- rabbit_log:warning("recover: quorum queue ~w could not be "
- "restarted ~w", [Name, Err]),
- fail
- end,
- %% we have to ensure the quorum queue is
- %% present in the rabbit_queue table and not just in
- %% rabbit_durable_queue
- %% So many code paths are dependent on this.
- {ok, Q} = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Q0),
- case Res of
- ok ->
- {[Q | R0], F0};
- fail ->
- {R0, [Q | F0]}
- end
- end, {[], []}, Queues).
-
--spec stop(rabbit_types:vhost()) -> ok.
-stop(VHost) ->
- _ = [begin
- Pid = amqqueue:get_pid(Q),
- ra:stop_server(Pid)
- end || Q <- find_quorum_queues(VHost)],
- ok.
-
--spec delete(amqqueue:amqqueue(),
- boolean(), boolean(),
- rabbit_types:username()) ->
- {ok, QLen :: non_neg_integer()} |
- {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
-delete(Q, true, _IfEmpty, _ActingUser) when ?amqqueue_is_quorum(Q) ->
- {protocol_error, not_implemented,
- "cannot delete ~s. queue.delete operations with if-unused flag set are not supported by quorum queues",
- [rabbit_misc:rs(amqqueue:get_name(Q))]};
-delete(Q, _IfUnused, true, _ActingUser) when ?amqqueue_is_quorum(Q) ->
- {protocol_error, not_implemented,
- "cannot delete ~s. queue.delete operations with if-empty flag set are not supported by quorum queues",
- [rabbit_misc:rs(amqqueue:get_name(Q))]};
-delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
- {Name, _} = amqqueue:get_pid(Q),
- QName = amqqueue:get_name(Q),
- QNodes = get_nodes(Q),
- %% TODO Quorum queue needs to support consumer tracking for IfUnused
- Timeout = ?DELETE_TIMEOUT,
- {ok, ReadyMsgs, _} = stat(Q),
- Servers = [{Name, Node} || Node <- QNodes],
- case ra:delete_cluster(Servers, Timeout) of
- {ok, {_, LeaderNode} = Leader} ->
- MRef = erlang:monitor(process, Leader),
- receive
- {'DOWN', MRef, process, _, _} ->
- ok
- after Timeout ->
- ok = force_delete_queue(Servers)
- end,
- ok = delete_queue_data(QName, ActingUser),
- rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
- ?RPC_TIMEOUT),
- {ok, ReadyMsgs};
- {error, {no_more_servers_to_try, Errs}} ->
- case lists:all(fun({{error, noproc}, _}) -> true;
- (_) -> false
- end, Errs) of
- true ->
- %% If all ra nodes were already down, the delete
- %% has succeed
- delete_queue_data(QName, ActingUser),
- {ok, ReadyMsgs};
- false ->
- %% attempt forced deletion of all servers
- rabbit_log:warning(
- "Could not delete quorum queue '~s', not enough nodes "
- " online to reach a quorum: ~255p."
- " Attempting force delete.",
- [rabbit_misc:rs(QName), Errs]),
- ok = force_delete_queue(Servers),
- delete_queue_data(QName, ActingUser),
- {ok, ReadyMsgs}
- end
- end.
-
-force_delete_queue(Servers) ->
- [begin
- case catch(ra:force_delete_server(S)) of
- ok -> ok;
- Err ->
- rabbit_log:warning(
- "Force delete of ~w failed with: ~w"
- "This may require manual data clean up~n",
- [S, Err]),
- ok
- end
- end || S <- Servers],
- ok.
-
-delete_queue_data(QName, ActingUser) ->
- _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
- ok.
-
-
-delete_immediately(Resource, {_Name, _} = QPid) ->
- _ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
- {ok, _} = ra:delete_cluster([QPid]),
- rabbit_core_metrics:queue_deleted(Resource),
- ok.
-
-settle(complete, CTag, MsgIds, QState) ->
- rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, QState);
-settle(requeue, CTag, MsgIds, QState) ->
- rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, QState);
-settle(discard, CTag, MsgIds, QState) ->
- rabbit_fifo_client:discard(quorum_ctag(CTag), MsgIds, QState).
-
-credit(CTag, Credit, Drain, QState) ->
- rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState).
-
--spec dequeue(NoAck :: boolean(), pid(),
- rabbit_types:ctag(), rabbit_fifo_client:state()) ->
- {empty, rabbit_fifo_client:state()} |
- {ok, QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()} |
- {error, term()}.
-dequeue(NoAck, _LimiterPid, CTag0, QState0) ->
- CTag = quorum_ctag(CTag0),
- Settlement = case NoAck of
- true ->
- settled;
- false ->
- unsettled
- end,
- rabbit_fifo_client:dequeue(CTag, Settlement, QState0).
-
--spec consume(amqqueue:amqqueue(),
- rabbit_queue_type:consume_spec(),
- rabbit_fifo_client:state()) ->
- {ok, rabbit_fifo_client:state(), rabbit_queue_type:actions()} |
- {error, global_qos_not_supported_for_queue_type}.
-consume(Q, #{limiter_active := true}, _State)
- when ?amqqueue_is_quorum(Q) ->
- {error, global_qos_not_supported_for_queue_type};
-consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
- #{no_ack := NoAck,
- channel_pid := ChPid,
- prefetch_count := ConsumerPrefetchCount,
- consumer_tag := ConsumerTag0,
- exclusive_consume := ExclusiveConsume,
- args := Args,
- ok_msg := OkMsg,
- acting_user := ActingUser} = Spec,
- %% TODO: validate consumer arguments
- %% currently quorum queues do not support any arguments
- QName = amqqueue:get_name(Q),
- QPid = amqqueue:get_pid(Q),
- maybe_send_reply(ChPid, OkMsg),
- ConsumerTag = quorum_ctag(ConsumerTag0),
- %% A prefetch count of 0 means no limitation,
- %% let's make it into something large for ra
- Prefetch = case ConsumerPrefetchCount of
- 0 -> 2000;
- Other -> Other
- end,
- %% consumer info is used to describe the consumer properties
- AckRequired = not NoAck,
- ConsumerMeta = #{ack => AckRequired,
- prefetch => ConsumerPrefetchCount,
- args => Args,
- username => ActingUser},
- {ok, QState} = rabbit_fifo_client:checkout(ConsumerTag,
- Prefetch,
- ConsumerMeta,
- QState0),
- case ra:local_query(QPid,
- fun rabbit_fifo:query_single_active_consumer/1) of
- {ok, {_, SacResult}, _} ->
- SingleActiveConsumerOn = single_active_consumer_on(Q),
- {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
- {false, _} ->
- {true, up};
- {true, {value, {ConsumerTag, ChPid}}} ->
- {true, single_active};
- _ ->
- {false, waiting}
- end,
- rabbit_core_metrics:consumer_created(
- ChPid, ConsumerTag, ExclusiveConsume,
- AckRequired, QName,
- ConsumerPrefetchCount, IsSingleActiveConsumer,
- ActivityStatus, Args),
- emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- AckRequired, QName, Prefetch,
- Args, none, ActingUser),
- {ok, QState, []};
- {error, Error} ->
- Error;
- {timeout, _} ->
- {error, timeout}
- end.
-
-% -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
-% {'ok', rabbit_fifo_client:state()}.
-
-cancel(_Q, ConsumerTag, OkMsg, _ActingUser, State) ->
- maybe_send_reply(self(), OkMsg),
- rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), State).
-
-emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, PrefetchCount, Args, Ref, ActingUser) ->
- rabbit_event:notify(consumer_created,
- [{consumer_tag, CTag},
- {exclusive, Exclusive},
- {ack_required, AckRequired},
- {channel, ChPid},
- {queue, QName},
- {prefetch_count, PrefetchCount},
- {arguments, Args},
- {user_who_performed_action, ActingUser}],
- Ref).
-
-emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) ->
- rabbit_event:notify(consumer_deleted,
- [{consumer_tag, ConsumerTag},
- {channel, ChPid},
- {queue, QName},
- {user_who_performed_action, ActingUser}]).
-
--spec stateless_deliver(amqqueue:ra_server_id(), rabbit_types:delivery()) -> 'ok'.
-
-stateless_deliver(ServerId, Delivery) ->
- ok = rabbit_fifo_client:untracked_enqueue([ServerId],
- Delivery#delivery.message).
-
--spec deliver(Confirm :: boolean(), rabbit_types:delivery(),
- rabbit_fifo_client:state()) ->
- {ok | slow, rabbit_fifo_client:state()} |
- {reject_publish, rabbit_fifo_client:state()}.
-deliver(false, Delivery, QState0) ->
- case rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0) of
- {ok, _} = Res -> Res;
- {slow, _} = Res -> Res;
- {reject_publish, State} ->
- {ok, State}
- end;
-deliver(true, Delivery, QState0) ->
- rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
- Delivery#delivery.message, QState0).
-
-deliver(QSs, #delivery{confirm = Confirm} = Delivery) ->
- lists:foldl(
- fun({Q, stateless}, {Qs, Actions}) ->
- QRef = amqqueue:get_pid(Q),
- ok = rabbit_fifo_client:untracked_enqueue(
- [QRef], Delivery#delivery.message),
- {Qs, Actions};
- ({Q, S0}, {Qs, Actions}) ->
- case deliver(Confirm, Delivery, S0) of
- {reject_publish, S} ->
- Seq = Delivery#delivery.msg_seq_no,
- QName = rabbit_fifo_client:cluster_name(S),
- {[{Q, S} | Qs], [{rejected, QName, [Seq]} | Actions]};
- {_, S} ->
- {[{Q, S} | Qs], Actions}
- end
- end, {[], []}, QSs).
-
-
-state_info(S) ->
- #{pending_raft_commands => rabbit_fifo_client:pending_size(S)}.
-
-
-
--spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
-infos(QName) ->
- infos(QName, ?STATISTICS_KEYS).
-
-infos(QName, Keys) ->
- case rabbit_amqqueue:lookup(QName) of
- {ok, Q} ->
- info(Q, Keys);
- {error, not_found} ->
- []
- end.
-
-info(Q, all_keys) ->
- info(Q, ?INFO_KEYS);
-info(Q, Items) ->
- lists:foldr(fun(totals, Acc) ->
- i_totals(Q) ++ Acc;
- (type_specific, Acc) ->
- format(Q) ++ Acc;
- (Item, Acc) ->
- [{Item, i(Item, Q)} | Acc]
- end, [], Items).
-
--spec stat(amqqueue:amqqueue()) ->
- {'ok', non_neg_integer(), non_neg_integer()}.
-stat(Q) when ?is_amqqueue(Q) ->
- %% same short default timeout as in rabbit_fifo_client:stat/1
- stat(Q, 250).
-
--spec stat(amqqueue:amqqueue(), non_neg_integer()) -> {'ok', non_neg_integer(), non_neg_integer()}.
-
-stat(Q, Timeout) when ?is_amqqueue(Q) ->
- Leader = amqqueue:get_pid(Q),
- try
- case rabbit_fifo_client:stat(Leader, Timeout) of
- {ok, _, _} = Success -> Success;
- {error, _} -> {ok, 0, 0};
- {timeout, _} -> {ok, 0, 0}
- end
- catch
- _:_ ->
- %% Leader is not available, cluster might be in minority
- {ok, 0, 0}
- end.
-
--spec purge(amqqueue:amqqueue()) ->
- {ok, non_neg_integer()}.
-purge(Q) when ?is_amqqueue(Q) ->
- Node = amqqueue:get_pid(Q),
- rabbit_fifo_client:purge(Node).
-
-requeue(ConsumerTag, MsgIds, QState) ->
- rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState).
-
-cleanup_data_dir() ->
- Names = [begin
- {Name, _} = amqqueue:get_pid(Q),
- Name
- end
- || Q <- rabbit_amqqueue:list_by_type(?MODULE),
- lists:member(node(), get_nodes(Q))],
- NoQQClusters = rabbit_ra_registry:list_not_quorum_clusters(),
- Registered = ra_directory:list_registered(),
- Running = Names ++ NoQQClusters,
- _ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
- not lists:member(Name, Running)],
- ok.
-
-maybe_delete_data_dir(UId) ->
- Dir = ra_env:server_data_dir(UId),
- {ok, Config} = ra_log:read_config(Dir),
- case maps:get(machine, Config) of
- {module, rabbit_fifo, _} ->
- ra_lib:recursive_delete(Dir),
- ra_directory:unregister_name(UId);
- _ ->
- ok
- end.
-
-policy_changed(Q) ->
- QPid = amqqueue:get_pid(Q),
- _ = rabbit_fifo_client:update_machine_state(QPid, ra_machine_config(Q)),
- ok.
-
--spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
-
-cluster_state(Name) ->
- case whereis(Name) of
- undefined -> down;
- _ ->
- case ets:lookup(ra_state, Name) of
- [{_, recover}] -> recovering;
- _ -> running
- end
- end.
-
--spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) ->
- [[{binary(), term()}]] | {error, term()}.
-status(Vhost, QueueName) ->
- %% Handle not found queues
- QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
- RName = qname_to_internal_name(QName),
- case rabbit_amqqueue:lookup(QName) of
- {ok, Q} when ?amqqueue_is_classic(Q) ->
- {error, classic_queue_not_supported};
- {ok, Q} when ?amqqueue_is_quorum(Q) ->
- Nodes = get_nodes(Q),
- [begin
- case get_sys_status({RName, N}) of
- {ok, Sys} ->
- {_, M} = lists:keyfind(ra_server_state, 1, Sys),
- {_, RaftState} = lists:keyfind(raft_state, 1, Sys),
- #{commit_index := Commit,
- machine_version := MacVer,
- current_term := Term,
- log := #{last_index := Last,
- snapshot_index := SnapIdx}} = M,
- [{<<"Node Name">>, N},
- {<<"Raft State">>, RaftState},
- {<<"Log Index">>, Last},
- {<<"Commit Index">>, Commit},
- {<<"Snapshot Index">>, SnapIdx},
- {<<"Term">>, Term},
- {<<"Machine Version">>, MacVer}
- ];
- {error, Err} ->
- [{<<"Node Name">>, N},
- {<<"Raft State">>, Err},
- {<<"Log Index">>, <<>>},
- {<<"Commit Index">>, <<>>},
- {<<"Snapshot Index">>, <<>>},
- {<<"Term">>, <<>>},
- {<<"Machine Version">>, <<>>}
- ]
- end
- end || N <- Nodes];
- {error, not_found} = E ->
- E
- end.
-
-get_sys_status(Proc) ->
- try lists:nth(5, element(4, sys:get_status(Proc))) of
- Sys -> {ok, Sys}
- catch
- _:Err when is_tuple(Err) ->
- {error, element(1, Err)};
- _:_ ->
- {error, other}
-
- end.
-
-
-add_member(VHost, Name, Node, Timeout) ->
- QName = #resource{virtual_host = VHost, name = Name, kind = queue},
- case rabbit_amqqueue:lookup(QName) of
- {ok, Q} when ?amqqueue_is_classic(Q) ->
- {error, classic_queue_not_supported};
- {ok, Q} when ?amqqueue_is_quorum(Q) ->
- QNodes = get_nodes(Q),
- case lists:member(Node, rabbit_nodes:all_running()) of
- false ->
- {error, node_not_running};
- true ->
- case lists:member(Node, QNodes) of
- true ->
- %% idempotent by design
- ok;
- false ->
- add_member(Q, Node, Timeout)
- end
- end;
- {error, not_found} = E ->
- E
- end.
-
-add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
- {RaName, _} = amqqueue:get_pid(Q),
- QName = amqqueue:get_name(Q),
- %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
- ServerId = {RaName, Node},
- Members = members(Q),
- TickTimeout = application:get_env(rabbit, quorum_tick_interval,
- ?TICK_TIMEOUT),
- Conf = make_ra_conf(Q, ServerId, TickTimeout),
- case ra:start_server(Conf) of
- ok ->
- case ra:add_member(Members, ServerId, Timeout) of
- {ok, _, Leader} ->
- Fun = fun(Q1) ->
- Q2 = update_type_state(
- Q1, fun(#{nodes := Nodes} = Ts) ->
- Ts#{nodes => [Node | Nodes]}
- end),
- amqqueue:set_pid(Q2, Leader)
- end,
- rabbit_misc:execute_mnesia_transaction(
- fun() -> rabbit_amqqueue:update(QName, Fun) end),
- ok;
- {timeout, _} ->
- _ = ra:force_delete_server(ServerId),
- _ = ra:remove_member(Members, ServerId),
- {error, timeout};
- E ->
- _ = ra:force_delete_server(ServerId),
- E
- end;
- E ->
- E
- end.
-
-delete_member(VHost, Name, Node) ->
- QName = #resource{virtual_host = VHost, name = Name, kind = queue},
- case rabbit_amqqueue:lookup(QName) of
- {ok, Q} when ?amqqueue_is_classic(Q) ->
- {error, classic_queue_not_supported};
- {ok, Q} when ?amqqueue_is_quorum(Q) ->
- QNodes = get_nodes(Q),
- case lists:member(Node, QNodes) of
- false ->
- %% idempotent by design
- ok;
- true ->
- delete_member(Q, Node)
- end;
- {error, not_found} = E ->
- E
- end.
-
-
-delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
- QName = amqqueue:get_name(Q),
- {RaName, _} = amqqueue:get_pid(Q),
- ServerId = {RaName, Node},
- case members(Q) of
- [{_, Node}] ->
-
- %% deleting the last member is not allowed
- {error, last_node};
- Members ->
- case ra:remove_member(Members, ServerId) of
- {ok, _, _Leader} ->
- Fun = fun(Q1) ->
- update_type_state(
- Q1,
- fun(#{nodes := Nodes} = Ts) ->
- Ts#{nodes => lists:delete(Node, Nodes)}
- end)
- end,
- rabbit_misc:execute_mnesia_transaction(
- fun() -> rabbit_amqqueue:update(QName, Fun) end),
- case ra:force_delete_server(ServerId) of
- ok ->
- ok;
- {error, {badrpc, nodedown}} ->
- ok;
- {error, {badrpc, {'EXIT', {badarg, _}}}} ->
- %% DETS/ETS tables can't be found, application isn't running
- ok;
- {error, _} = Err ->
- Err;
- Err ->
- {error, Err}
- end;
- {timeout, _} ->
- {error, timeout};
- E ->
- E
- end
- end.
-
--spec shrink_all(node()) ->
- [{rabbit_amqqueue:name(),
- {ok, pos_integer()} | {error, pos_integer(), term()}}].
-shrink_all(Node) ->
- [begin
- QName = amqqueue:get_name(Q),
- rabbit_log:info("~s: removing member (replica) on node ~w",
- [rabbit_misc:rs(QName), Node]),
- Size = length(get_nodes(Q)),
- case delete_member(Q, Node) of
- ok ->
- {QName, {ok, Size-1}};
- {error, Err} ->
- rabbit_log:warning("~s: failed to remove member (replica) on node ~w, error: ~w",
- [rabbit_misc:rs(QName), Node, Err]),
- {QName, {error, Size, Err}}
- end
- end || Q <- rabbit_amqqueue:list(),
- amqqueue:get_type(Q) == ?MODULE,
- lists:member(Node, get_nodes(Q))].
-
--spec grow(node(), binary(), binary(), all | even) ->
- [{rabbit_amqqueue:name(),
- {ok, pos_integer()} | {error, pos_integer(), term()}}].
-grow(Node, VhostSpec, QueueSpec, Strategy) ->
- Running = rabbit_nodes:all_running(),
- [begin
- Size = length(get_nodes(Q)),
- QName = amqqueue:get_name(Q),
- rabbit_log:info("~s: adding a new member (replica) on node ~w",
- [rabbit_misc:rs(QName), Node]),
- case add_member(Q, Node, ?ADD_MEMBER_TIMEOUT) of
- ok ->
- {QName, {ok, Size + 1}};
- {error, Err} ->
- rabbit_log:warning(
- "~s: failed to add member (replica) on node ~w, error: ~w",
- [rabbit_misc:rs(QName), Node, Err]),
- {QName, {error, Size, Err}}
- end
- end
- || Q <- rabbit_amqqueue:list(),
- amqqueue:get_type(Q) == ?MODULE,
- %% don't add a member if there is already one on the node
- not lists:member(Node, get_nodes(Q)),
- %% node needs to be running
- lists:member(Node, Running),
- matches_strategy(Strategy, get_nodes(Q)),
- is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
- is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
-
-transfer_leadership(Q, Destination) ->
- {RaName, _} = Pid = amqqueue:get_pid(Q),
- case ra:transfer_leadership(Pid, {RaName, Destination}) of
- ok ->
- case ra:members(Pid) of
- {_, _, {_, NewNode}} ->
- {migrated, NewNode};
- {timeout, _} ->
- {not_migrated, ra_members_timeout}
- end;
- already_leader ->
- {not_migrated, already_leader};
- {error, Reason} ->
- {not_migrated, Reason};
- {timeout, _} ->
- %% TODO should we retry once?
- {not_migrated, timeout}
- end.
-
-queue_length(Q) ->
- Name = amqqueue:get_name(Q),
- case ets:lookup(ra_metrics, Name) of
- [] -> 0;
- [{_, _, SnapIdx, _, _, LastIdx, _}] -> LastIdx - SnapIdx
- end.
-
-get_replicas(Q) ->
- get_nodes(Q).
-
-get_resource_name(#resource{name = Name}) ->
- Name.
-
-matches_strategy(all, _) -> true;
-matches_strategy(even, Members) ->
- length(Members) rem 2 == 0.
-
-is_match(Subj, E) ->
- nomatch /= re:run(Subj, E).
-
-file_handle_leader_reservation(QName) ->
- {ok, Q} = rabbit_amqqueue:lookup(QName),
- ClusterSize = length(get_nodes(Q)),
- file_handle_cache:set_reservation(2 + ClusterSize).
-
-file_handle_other_reservation() ->
- file_handle_cache:set_reservation(2).
-
-file_handle_release_reservation() ->
- file_handle_cache:release_reservation().
-
--spec reclaim_memory(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> ok | {error, term()}.
-reclaim_memory(Vhost, QueueName) ->
- QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
- case rabbit_amqqueue:lookup(QName) of
- {ok, Q} when ?amqqueue_is_classic(Q) ->
- {error, classic_queue_not_supported};
- {ok, Q} when ?amqqueue_is_quorum(Q) ->
- ok = ra:pipeline_command(amqqueue:get_pid(Q),
- rabbit_fifo:make_garbage_collection());
- {error, not_found} = E ->
- E
- end.
-
-%%----------------------------------------------------------------------------
-dlx_mfa(Q) ->
- DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,
- fun res_arg/2, Q), Q),
- DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>,
- fun res_arg/2, Q),
- {?MODULE, dead_letter_publish, [DLX, DLXRKey, amqqueue:get_name(Q)]}.
-
-init_dlx(undefined, _Q) ->
- undefined;
-init_dlx(DLX, Q) when ?is_amqqueue(Q) ->
- QName = amqqueue:get_name(Q),
- rabbit_misc:r(QName, exchange, DLX).
-
-res_arg(_PolVal, ArgVal) -> ArgVal.
-
-dead_letter_publish(undefined, _, _, _) ->
- ok;
-dead_letter_publish(X, RK, QName, ReasonMsgs) ->
- case rabbit_exchange:lookup(X) of
- {ok, Exchange} ->
- [rabbit_dead_letter:publish(Msg, Reason, Exchange, RK, QName)
- || {Reason, Msg} <- ReasonMsgs];
- {error, not_found} ->
- ok
- end.
-
-find_quorum_queues(VHost) ->
- Node = node(),
- mnesia:async_dirty(
- fun () ->
- qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
- ?amqqueue_is_quorum(Q),
- amqqueue:get_vhost(Q) =:= VHost,
- amqqueue:qnode(Q) == Node]))
- end).
-
-i_totals(Q) when ?is_amqqueue(Q) ->
- QName = amqqueue:get_name(Q),
- case ets:lookup(queue_coarse_metrics, QName) of
- [{_, MR, MU, M, _}] ->
- [{messages_ready, MR},
- {messages_unacknowledged, MU},
- {messages, M}];
- [] ->
- [{messages_ready, 0},
- {messages_unacknowledged, 0},
- {messages, 0}]
- end.
-
-i(name, Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q);
-i(durable, Q) when ?is_amqqueue(Q) -> amqqueue:is_durable(Q);
-i(auto_delete, Q) when ?is_amqqueue(Q) -> amqqueue:is_auto_delete(Q);
-i(arguments, Q) when ?is_amqqueue(Q) -> amqqueue:get_arguments(Q);
-i(pid, Q) when ?is_amqqueue(Q) ->
- {Name, _} = amqqueue:get_pid(Q),
- whereis(Name);
-i(messages, Q) when ?is_amqqueue(Q) ->
- QName = amqqueue:get_name(Q),
- quorum_messages(QName);
-i(messages_ready, Q) when ?is_amqqueue(Q) ->
- QName = amqqueue:get_name(Q),
- case ets:lookup(queue_coarse_metrics, QName) of
- [{_, MR, _, _, _}] ->
- MR;
- [] ->
- 0
- end;
-i(messages_unacknowledged, Q) when ?is_amqqueue(Q) ->
- QName = amqqueue:get_name(Q),
- case ets:lookup(queue_coarse_metrics, QName) of
- [{_, _, MU, _, _}] ->
- MU;
- [] ->
- 0
- end;
-i(policy, Q) ->
- case rabbit_policy:name(Q) of
- none -> '';
- Policy -> Policy
- end;
-i(operator_policy, Q) ->
- case rabbit_policy:name_op(Q) of
- none -> '';
- Policy -> Policy
- end;
-i(effective_policy_definition, Q) ->
- case rabbit_policy:effective_definition(Q) of
- undefined -> [];
- Def -> Def
- end;
-i(consumers, Q) when ?is_amqqueue(Q) ->
- QName = amqqueue:get_name(Q),
- case ets:lookup(queue_metrics, QName) of
- [{_, M, _}] ->
- proplists:get_value(consumers, M, 0);
- [] ->
- 0
- end;
-i(memory, Q) when ?is_amqqueue(Q) ->
- {Name, _} = amqqueue:get_pid(Q),
- try
- {memory, M} = process_info(whereis(Name), memory),
- M
- catch
- error:badarg ->
- 0
- end;
-i(state, Q) when ?is_amqqueue(Q) ->
- {Name, Node} = amqqueue:get_pid(Q),
- %% Check against the leader or last known leader
- case rpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
- {badrpc, _} -> down;
- State -> State
- end;
-i(local_state, Q) when ?is_amqqueue(Q) ->
- {Name, _} = amqqueue:get_pid(Q),
- case ets:lookup(ra_state, Name) of
- [{_, State}] -> State;
- _ -> not_member
- end;
-i(garbage_collection, Q) when ?is_amqqueue(Q) ->
- {Name, _} = amqqueue:get_pid(Q),
- try
- rabbit_misc:get_gc_info(whereis(Name))
- catch
- error:badarg ->
- []
- end;
-i(members, Q) when ?is_amqqueue(Q) ->
- get_nodes(Q);
-i(online, Q) -> online(Q);
-i(leader, Q) -> leader(Q);
-i(open_files, Q) when ?is_amqqueue(Q) ->
- {Name, _} = amqqueue:get_pid(Q),
- Nodes = get_nodes(Q),
- {Data, _} = rpc:multicall(Nodes, ?MODULE, open_files, [Name]),
- lists:flatten(Data);
-i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) ->
- QPid = amqqueue:get_pid(Q),
- case ra:local_query(QPid, fun rabbit_fifo:query_single_active_consumer/1) of
- {ok, {_, {value, {_ConsumerTag, ChPid}}}, _} ->
- ChPid;
- {ok, _, _} ->
- '';
- {error, _} ->
- '';
- {timeout, _} ->
- ''
- end;
-i(single_active_consumer_ctag, Q) when ?is_amqqueue(Q) ->
- QPid = amqqueue:get_pid(Q),
- case ra:local_query(QPid,
- fun rabbit_fifo:query_single_active_consumer/1) of
- {ok, {_, {value, {ConsumerTag, _ChPid}}}, _} ->
- ConsumerTag;
- {ok, _, _} ->
- '';
- {error, _} ->
- '';
- {timeout, _} ->
- ''
- end;
-i(type, _) -> quorum;
-i(messages_ram, Q) when ?is_amqqueue(Q) ->
- QPid = amqqueue:get_pid(Q),
- case ra:local_query(QPid,
- fun rabbit_fifo:query_in_memory_usage/1) of
- {ok, {_, {Length, _}}, _} ->
- Length;
- {error, _} ->
- 0;
- {timeout, _} ->
- 0
- end;
-i(message_bytes_ram, Q) when ?is_amqqueue(Q) ->
- QPid = amqqueue:get_pid(Q),
- case ra:local_query(QPid,
- fun rabbit_fifo:query_in_memory_usage/1) of
- {ok, {_, {_, Bytes}}, _} ->
- Bytes;
- {error, _} ->
- 0;
- {timeout, _} ->
- 0
- end;
-i(_K, _Q) -> ''.
-
-open_files(Name) ->
- case whereis(Name) of
- undefined -> {node(), 0};
- Pid -> case ets:lookup(ra_open_file_metrics, Pid) of
- [] -> {node(), 0};
- [{_, Count}] -> {node(), Count}
- end
- end.
-
-leader(Q) when ?is_amqqueue(Q) ->
- {Name, Leader} = amqqueue:get_pid(Q),
- case is_process_alive(Name, Leader) of
- true -> Leader;
- false -> ''
- end.
-
-peek(Vhost, Queue, Pos) ->
- peek(Pos, rabbit_misc:r(Vhost, queue, Queue)).
-
-peek(Pos, #resource{} = QName) ->
- case rabbit_amqqueue:lookup(QName) of
- {ok, Q} ->
- peek(Pos, Q);
- Err ->
- Err
- end;
-peek(Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_quorum(Q) ->
- LeaderPid = amqqueue:get_pid(Q),
- case ra:aux_command(LeaderPid, {peek, Pos}) of
- {ok, {MsgHeader, Msg0}} ->
- Count = case MsgHeader of
- #{delivery_count := C} -> C;
- _ -> 0
- end,
- Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long,
- Count, Msg0),
- {ok, rabbit_basic:peek_fmt_message(Msg)};
- {error, Err} ->
- {error, Err};
- Err ->
- Err
- end;
-peek(_Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_classic(Q) ->
- {error, classic_queue_not_supported}.
-
-online(Q) when ?is_amqqueue(Q) ->
- Nodes = get_nodes(Q),
- {Name, _} = amqqueue:get_pid(Q),
- [Node || Node <- Nodes, is_process_alive(Name, Node)].
-
-format(Q) when ?is_amqqueue(Q) ->
- Nodes = get_nodes(Q),
- [{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].
-
-is_process_alive(Name, Node) ->
- erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).
-
--spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer().
-
-quorum_messages(QName) ->
- case ets:lookup(queue_coarse_metrics, QName) of
- [{_, _, _, M, _}] ->
- M;
- [] ->
- 0
- end.
-
-quorum_ctag(Int) when is_integer(Int) ->
- integer_to_binary(Int);
-quorum_ctag(Other) ->
- Other.
-
-maybe_send_reply(_ChPid, undefined) -> ok;
-maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
-
-queue_name(RaFifoState) ->
- rabbit_fifo_client:cluster_name(RaFifoState).
-
-get_default_quorum_initial_group_size(Arguments) ->
- case rabbit_misc:table_lookup(Arguments, <<"x-quorum-initial-group-size">>) of
- undefined -> application:get_env(rabbit, default_quorum_initial_group_size);
- {_Type, Val} -> Val
- end.
-
-select_quorum_nodes(Size, All) when length(All) =< Size ->
- All;
-select_quorum_nodes(Size, All) ->
- Node = node(),
- case lists:member(Node, All) of
- true ->
- select_quorum_nodes(Size - 1, lists:delete(Node, All), [Node]);
- false ->
- select_quorum_nodes(Size, All, [])
- end.
-
-select_quorum_nodes(0, _, Selected) ->
- Selected;
-select_quorum_nodes(Size, Rest, Selected) ->
- S = lists:nth(rand:uniform(length(Rest)), Rest),
- select_quorum_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]).
-
-%% member with the current leader first
-members(Q) when ?amqqueue_is_quorum(Q) ->
- {RaName, LeaderNode} = amqqueue:get_pid(Q),
- Nodes = lists:delete(LeaderNode, get_nodes(Q)),
- [{RaName, N} || N <- [LeaderNode | Nodes]].
-
-format_ra_event(ServerId, Evt, QRef) ->
- {'$gen_cast', {queue_event, QRef, {ServerId, Evt}}}.
-
-make_ra_conf(Q, ServerId, TickTimeout) ->
- QName = amqqueue:get_name(Q),
- RaMachine = ra_machine(Q),
- [{ClusterName, _} | _] = Members = members(Q),
- UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
- FName = rabbit_misc:rs(QName),
- Formatter = {?MODULE, format_ra_event, [QName]},
- #{cluster_name => ClusterName,
- id => ServerId,
- uid => UId,
- friendly_name => FName,
- metrics_key => QName,
- initial_members => Members,
- log_init_args => #{uid => UId},
- tick_timeout => TickTimeout,
- machine => RaMachine,
- ra_event_formatter => Formatter}.
-
-get_nodes(Q) when ?is_amqqueue(Q) ->
- #{nodes := Nodes} = amqqueue:get_type_state(Q),
- Nodes.
-
-update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
- Ts = amqqueue:get_type_state(Q),
- amqqueue:set_type_state(Q, Fun(Ts)).
-
-overflow(undefined, Def, _QName) -> Def;
-overflow(<<"reject-publish">>, _Def, _QName) -> reject_publish;
-overflow(<<"drop-head">>, _Def, _QName) -> drop_head;
-overflow(<<"reject-publish-dlx">> = V, Def, QName) ->
- rabbit_log:warning("Invalid overflow strategy ~p for quorum queue: ~p",
- [V, rabbit_misc:rs(QName)]),
- Def.