diff options
46 files changed, 3022 insertions, 858 deletions
diff --git a/include/amqqueue.hrl b/include/amqqueue.hrl new file mode 100644 index 0000000000..d08a740d95 --- /dev/null +++ b/include/amqqueue.hrl @@ -0,0 +1,113 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved. +%% + +-include("amqqueue_v1.hrl"). +-include("amqqueue_v2.hrl"). + +-define(is_amqqueue(Q), + (?is_amqqueue_v2(Q) orelse + ?is_amqqueue_v1(Q))). + +-define(amqqueue_is_auto_delete(Q), + ((?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_auto_delete(Q) =:= true) orelse + (?is_amqqueue_v1(Q) andalso + ?amqqueue_v1_field_auto_delete(Q) =:= true))). + +-define(amqqueue_is_durable(Q), + ((?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_durable(Q) =:= true) orelse + (?is_amqqueue_v1(Q) andalso + ?amqqueue_v1_field_durable(Q) =:= true))). + +-define(amqqueue_exclusive_owner_is(Q, Owner), + ((?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_exclusive_owner(Q) =:= Owner) orelse + (?is_amqqueue_v1(Q) andalso + ?amqqueue_v1_field_exclusive_owner(Q) =:= Owner))). + +-define(amqqueue_exclusive_owner_is_pid(Q), + ((?is_amqqueue_v2(Q) andalso + is_pid(?amqqueue_v2_field_exclusive_owner(Q))) orelse + (?is_amqqueue_v1(Q) andalso + is_pid(?amqqueue_v1_field_exclusive_owner(Q))))). + +-define(amqqueue_state_is(Q, State), + ((?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_state(Q) =:= State) orelse + (?is_amqqueue_v1(Q) andalso + ?amqqueue_v1_field_state(Q) =:= State))). + +-define(amqqueue_v1_type, classic). + +-define(amqqueue_is_classic(Q), + ((?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_type(Q) =:= classic) orelse + ?is_amqqueue_v1(Q))). + +-define(amqqueue_is_quorum(Q), + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_type(Q) =:= quorum) orelse + false). + +-define(amqqueue_has_valid_pid(Q), + ((?is_amqqueue_v2(Q) andalso + is_pid(?amqqueue_v2_field_pid(Q))) orelse + (?is_amqqueue_v1(Q) andalso + is_pid(?amqqueue_v1_field_pid(Q))))). + +-define(amqqueue_pid_runs_on_local_node(Q), + ((?is_amqqueue_v2(Q) andalso + node(?amqqueue_v2_field_pid(Q)) =:= node()) orelse + (?is_amqqueue_v1(Q) andalso + node(?amqqueue_v1_field_pid(Q)) =:= node()))). + +-define(amqqueue_pid_equals(Q, Pid), + ((?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_pid(Q) =:= Pid) orelse + (?is_amqqueue_v1(Q) andalso + ?amqqueue_v1_field_pid(Q) =:= Pid))). + +-define(amqqueue_pids_are_equal(Q0, Q1), + ((?is_amqqueue_v2(Q0) andalso ?is_amqqueue_v2(Q1) andalso + ?amqqueue_v2_field_pid(Q0) =:= ?amqqueue_v2_field_pid(Q1)) orelse + (?is_amqqueue_v1(Q0) andalso ?is_amqqueue_v1(Q1) andalso + ?amqqueue_v1_field_pid(Q0) =:= ?amqqueue_v1_field_pid(Q1)))). + +-define(amqqueue_field_name(Q), + case ?is_amqqueue_v2(Q) of + true -> ?amqqueue_v2_field_name(Q); + false -> case ?is_amqqueue_v1(Q) of + true -> ?amqqueue_v1_field_name(Q) + end + end). + +-define(amqqueue_field_pid(Q), + case ?is_amqqueue_v2(Q) of + true -> ?amqqueue_v2_field_pid(Q); + false -> case ?is_amqqueue_v1(Q) of + true -> ?amqqueue_v1_field_pid(Q) + end + end). + +-define(amqqueue_v1_vhost(Q), element(2, ?amqqueue_v1_field_name(Q))). +-define(amqqueue_v2_vhost(Q), element(2, ?amqqueue_v2_field_name(Q))). + +-define(amqqueue_vhost_equals(Q, VHost), + ((?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_vhost(Q) =:= VHost) orelse + (?is_amqqueue_v1(Q) andalso + ?amqqueue_v1_vhost(Q) =:= VHost))). diff --git a/include/amqqueue_v1.hrl b/include/amqqueue_v1.hrl new file mode 100644 index 0000000000..04b2d72850 --- /dev/null +++ b/include/amqqueue_v1.hrl @@ -0,0 +1,20 @@ +-define(is_amqqueue_v1(Q), is_record(Q, amqqueue, 19)). + +-define(amqqueue_v1_field_name(Q), element(2, Q)). +-define(amqqueue_v1_field_durable(Q), element(3, Q)). +-define(amqqueue_v1_field_auto_delete(Q), element(4, Q)). +-define(amqqueue_v1_field_exclusive_owner(Q), element(5, Q)). +-define(amqqueue_v1_field_arguments(Q), element(6, Q)). +-define(amqqueue_v1_field_pid(Q), element(7, Q)). +-define(amqqueue_v1_field_slave_pids(Q), element(8, Q)). +-define(amqqueue_v1_field_sync_slave_pids(Q), element(9, Q)). +-define(amqqueue_v1_field_recoverable_slaves(Q), element(10, Q)). +-define(amqqueue_v1_field_policy(Q), element(11, Q)). +-define(amqqueue_v1_field_operator_policy(Q), element(12, Q)). +-define(amqqueue_v1_field_gm_pids(Q), element(13, Q)). +-define(amqqueue_v1_field_decorators(Q), element(14, Q)). +-define(amqqueue_v1_field_state(Q), element(15, Q)). +-define(amqqueue_v1_field_policy_version(Q), element(16, Q)). +-define(amqqueue_v1_field_slave_pids_pending_shutdown(Q), element(17, Q)). +-define(amqqueue_v1_field_vhost(Q), element(18, Q)). +-define(amqqueue_v1_field_options(Q), element(19, Q)). diff --git a/include/amqqueue_v2.hrl b/include/amqqueue_v2.hrl new file mode 100644 index 0000000000..37cd7ba2a8 --- /dev/null +++ b/include/amqqueue_v2.hrl @@ -0,0 +1,22 @@ +-define(is_amqqueue_v2(Q), is_record(Q, amqqueue, 21)). + +-define(amqqueue_v2_field_name(Q), element(2, Q)). +-define(amqqueue_v2_field_durable(Q), element(3, Q)). +-define(amqqueue_v2_field_auto_delete(Q), element(4, Q)). +-define(amqqueue_v2_field_exclusive_owner(Q), element(5, Q)). +-define(amqqueue_v2_field_arguments(Q), element(6, Q)). +-define(amqqueue_v2_field_pid(Q), element(7, Q)). +-define(amqqueue_v2_field_slave_pids(Q), element(8, Q)). +-define(amqqueue_v2_field_sync_slave_pids(Q), element(9, Q)). +-define(amqqueue_v2_field_recoverable_slaves(Q), element(10, Q)). +-define(amqqueue_v2_field_policy(Q), element(11, Q)). +-define(amqqueue_v2_field_operator_policy(Q), element(12, Q)). +-define(amqqueue_v2_field_gm_pids(Q), element(13, Q)). +-define(amqqueue_v2_field_decorators(Q), element(14, Q)). +-define(amqqueue_v2_field_state(Q), element(15, Q)). +-define(amqqueue_v2_field_policy_version(Q), element(16, Q)). +-define(amqqueue_v2_field_slave_pids_pending_shutdown(Q), element(17, Q)). +-define(amqqueue_v2_field_vhost(Q), element(18, Q)). +-define(amqqueue_v2_field_options(Q), element(19, Q)). +-define(amqqueue_v2_field_type(Q), element(20, Q)). +-define(amqqueue_v2_field_quorum_nodes(Q), element(21, Q)). diff --git a/src/amqqueue.erl b/src/amqqueue.erl new file mode 100644 index 0000000000..83b65cd048 --- /dev/null +++ b/src/amqqueue.erl @@ -0,0 +1,682 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved. +%% + +-module(amqqueue). %% Could become amqqueue_v2 in the future. + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). + +-export([new/9, + new_with_version/10, + fields/0, + fields/1, + field_vhost/0, + record_version_to_use/0, + upgrade/1, + upgrade_to/2, + % arguments + get_arguments/1, + set_arguments/2, + % decorators + get_decorators/1, + set_decorators/2, + % exclusive_owner + get_exclusive_owner/1, + % gm_pids + get_gm_pids/1, + set_gm_pids/2, + get_leader/1, + % name (#resource) + get_name/1, + set_name/2, + % operator_policy + get_operator_policy/1, + set_operator_policy/2, + get_options/1, + % pid + get_pid/1, + set_pid/2, + % policy + get_policy/1, + set_policy/2, + % policy_version + get_policy_version/1, + set_policy_version/2, + % quorum_nodes + get_quorum_nodes/1, + set_quorum_nodes/2, + % recoverable_slaves + get_recoverable_slaves/1, + set_recoverable_slaves/2, + % slave_pids + get_slave_pids/1, + set_slave_pids/2, + % slave_pids_pending_shutdown + get_slave_pids_pending_shutdown/1, + set_slave_pids_pending_shutdown/2, + % state + get_state/1, + set_state/2, + % sync_slave_pids + get_sync_slave_pids/1, + set_sync_slave_pids/2, + get_type/1, + get_vhost/1, + is_amqqueue/1, + is_auto_delete/1, + is_durable/1, + is_classic/1, + is_quorum/1, + pattern_match_all/0, + pattern_match_on_name/1, + pattern_match_on_type/1, + reset_mirroring_and_decorators/1, + set_immutable/1, + qnode/1, + macros/0]). + +-define(record_version, amqqueue_v2). + +-record(amqqueue, { + name :: rabbit_amqqueue:name() | '_', %% immutable + durable :: boolean() | '_', %% immutable + auto_delete :: boolean() | '_', %% immutable + exclusive_owner = none :: pid() | none | '_', %% immutable + arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable + pid :: pid() | ra_server_id() | none | '_', %% durable (just so we + %% know home node) + slave_pids = [] :: [pid()] | none | '_', %% transient + sync_slave_pids = [] :: [pid()] | none| '_',%% transient + recoverable_slaves = [] :: [atom()] | none | '_', %% durable + policy :: binary() | none | undefined | '_', %% durable, implicit + %% update as above + operator_policy :: binary() | none | undefined | '_', %% durable, + %% implicit + %% update + %% as above + gm_pids = [] :: [pid()] | none | '_', %% transient + decorators :: [atom()] | none | undefined | '_', %% transient, + %% recalculated + %% as above + state = live :: atom() | none | '_', %% durable (have we crashed?) + policy_version = 0 :: non_neg_integer() | '_', + slave_pids_pending_shutdown = [] :: [pid()] | '_', + vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index + options = #{} :: map() | '_', + type = ?amqqueue_v1_type :: atom() | '_', + quorum_nodes = [] :: [node()] | '_' + }). + +-type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2(). +-type amqqueue_v2() :: #amqqueue{ + name :: rabbit_amqqueue:name(), + durable :: boolean(), + auto_delete :: boolean(), + exclusive_owner :: pid() | none, + arguments :: rabbit_framing:amqp_table(), + pid :: pid() | ra_server_id() | none, + slave_pids :: [pid()] | none, + sync_slave_pids :: [pid()] | none, + recoverable_slaves :: [atom()] | none, + policy :: binary() | none | undefined, + operator_policy :: binary() | none | undefined, + gm_pids :: [pid()] | none, + decorators :: [atom()] | none | undefined, + state :: atom() | none, + policy_version :: non_neg_integer(), + slave_pids_pending_shutdown :: [pid()], + vhost :: rabbit_types:vhost() | undefined, + options :: map(), + type :: atom(), + quorum_nodes :: [node()] + }. + +-type ra_server_id() :: {Name :: atom(), Node :: node()}. + +-type amqqueue_pattern() :: amqqueue_v1:amqqueue_v1_pattern() | + amqqueue_v2_pattern(). +-type amqqueue_v2_pattern() :: #amqqueue{ + name :: rabbit_amqqueue:name() | '_', + durable :: '_', + auto_delete :: '_', + exclusive_owner :: '_', + arguments :: '_', + pid :: '_', + slave_pids :: '_', + sync_slave_pids :: '_', + recoverable_slaves :: '_', + policy :: '_', + operator_policy :: '_', + gm_pids :: '_', + decorators :: '_', + state :: '_', + policy_version :: '_', + slave_pids_pending_shutdown :: '_', + vhost :: '_', + options :: '_', + type :: atom() | '_', + quorum_nodes :: '_' + }. + +-export_type([amqqueue/0, + amqqueue_v2/0, + amqqueue_pattern/0, + amqqueue_v2_pattern/0, + ra_server_id/0]). + +-spec new(rabbit_amqqueue:name(), + pid() | ra_server_id() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, + map(), + atom()) -> amqqueue(). + +new(#resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + Type) + when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) andalso + is_atom(Type) -> + case record_version_to_use() of + ?record_version -> + new_with_version( + ?record_version, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + Type); + _ -> + amqqueue_v1:new( + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options) + end. + +-spec new_with_version +(amqqueue_v1 | amqqueue_v2, + rabbit_amqqueue:name(), + pid() | ra_server_id() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, + map(), + atom()) -> amqqueue(). + +new_with_version(?record_version, + #resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + Type) + when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) andalso + is_atom(Type) -> + #amqqueue{name = Name, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = Pid, + vhost = VHost, + options = Options, + type = Type}; +new_with_version(Version, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + ?amqqueue_v1_type) -> + amqqueue_v1:new_with_version( + Version, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options). + +-spec is_amqqueue(any()) -> boolean(). + +is_amqqueue(#amqqueue{}) -> true; +is_amqqueue(Queue) -> amqqueue_v1:is_amqqueue(Queue). + +-spec record_version_to_use() -> amqqueue_v1 | amqqueue_v2. + +record_version_to_use() -> + case rabbit_feature_flags:is_enabled(quorum_queue) of + true -> ?record_version; + false -> amqqueue_v1:record_version_to_use() + end. + +-spec upgrade(amqqueue()) -> amqqueue(). + +upgrade(#amqqueue{} = Queue) -> Queue; +upgrade(OldQueue) -> upgrade_to(record_version_to_use(), OldQueue). + +-spec upgrade_to +(amqqueue_v2, amqqueue()) -> amqqueue_v2(); +(amqqueue_v1, amqqueue_v1:amqqueue_v1()) -> amqqueue_v1:amqqueue_v1(). + +upgrade_to(?record_version, #amqqueue{} = Queue) -> + Queue; +upgrade_to(?record_version, OldQueue) -> + Fields = erlang:tuple_to_list(OldQueue) ++ [?amqqueue_v1_type, + undefined], + #amqqueue{} = erlang:list_to_tuple(Fields); +upgrade_to(Version, OldQueue) -> + amqqueue_v1:upgrade_to(Version, OldQueue). + +% arguments + +-spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table(). + +get_arguments(#amqqueue{arguments = Args}) -> + Args; +get_arguments(Queue) -> + amqqueue_v1:get_arguments(Queue). + +-spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue(). + +set_arguments(#amqqueue{} = Queue, Args) -> + Queue#amqqueue{arguments = Args}; +set_arguments(Queue, Args) -> + amqqueue_v1:set_arguments(Queue, Args). + +% decorators + +-spec get_decorators(amqqueue()) -> [atom()] | none | undefined. + +get_decorators(#amqqueue{decorators = Decorators}) -> + Decorators; +get_decorators(Queue) -> + amqqueue_v1:get_decorators(Queue). + +-spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue(). + +set_decorators(#amqqueue{} = Queue, Decorators) -> + Queue#amqqueue{decorators = Decorators}; +set_decorators(Queue, Decorators) -> + amqqueue_v1:set_decorators(Queue, Decorators). + +-spec get_exclusive_owner(amqqueue()) -> pid() | none. + +get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> + Owner; +get_exclusive_owner(Queue) -> + amqqueue_v1:get_exclusive_owner(Queue). + +-spec get_gm_pids(amqqueue()) -> [pid()] | none. + +get_gm_pids(#amqqueue{gm_pids = GMPids}) -> + GMPids; +get_gm_pids(Queue) -> + amqqueue_v1:get_gm_pids(Queue). + +-spec set_gm_pids(amqqueue(), [pid()] | none) -> amqqueue(). + +set_gm_pids(#amqqueue{} = Queue, GMPids) -> + Queue#amqqueue{gm_pids = GMPids}; +set_gm_pids(Queue, GMPids) -> + amqqueue_v1:set_gm_pids(Queue, GMPids). + +-spec get_leader(amqqueue_v2()) -> node(). + +get_leader(#amqqueue{type = quorum, pid = {_, Leader}}) -> Leader. + +% operator_policy + +-spec get_operator_policy(amqqueue()) -> binary() | none | undefined. + +get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy; +get_operator_policy(Queue) -> amqqueue_v1:get_operator_policy(Queue). + +-spec set_operator_policy(amqqueue(), binary() | none | undefined) -> + amqqueue(). + +set_operator_policy(#amqqueue{} = Queue, Policy) -> + Queue#amqqueue{operator_policy = Policy}; +set_operator_policy(Queue, Policy) -> + amqqueue_v1:set_operator_policy(Queue, Policy). + +% name + +-spec get_name(amqqueue()) -> rabbit_amqqueue:name(). + +get_name(#amqqueue{name = Name}) -> Name; +get_name(Queue) -> amqqueue_v1:get_name(Queue). + +-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue(). + +set_name(#amqqueue{} = Queue, Name) -> + Queue#amqqueue{name = Name}; +set_name(Queue, Name) -> + amqqueue_v1:set_name(Queue, Name). + +-spec get_options(amqqueue()) -> map(). + +get_options(#amqqueue{options = Options}) -> Options; +get_options(Queue) -> amqqueue_v1:get_options(Queue). + +% pid + +-spec get_pid +(amqqueue_v2()) -> pid() | ra_server_id() | none; +(amqqueue_v1:amqqueue_v1()) -> pid() | none. + +get_pid(#amqqueue{pid = Pid}) -> Pid; +get_pid(Queue) -> amqqueue_v1:get_pid(Queue). + +-spec set_pid +(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(); +(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1(). + +set_pid(#amqqueue{} = Queue, Pid) -> + Queue#amqqueue{pid = Pid}; +set_pid(Queue, Pid) -> + amqqueue_v1:set_pid(Queue, Pid). + +% policy + +-spec get_policy(amqqueue()) -> binary() | none | undefined. + +get_policy(#amqqueue{policy = Policy}) -> Policy; +get_policy(Queue) -> amqqueue_v1:get_policy(Queue). + +-spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue(). + +set_policy(#amqqueue{} = Queue, Policy) -> + Queue#amqqueue{policy = Policy}; +set_policy(Queue, Policy) -> + amqqueue_v1:set_policy(Queue, Policy). + +% policy_version + +-spec get_policy_version(amqqueue()) -> non_neg_integer(). + +get_policy_version(#amqqueue{policy_version = PV}) -> + PV; +get_policy_version(Queue) -> + amqqueue_v1:get_policy_version(Queue). + +-spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue(). + +set_policy_version(#amqqueue{} = Queue, PV) -> + Queue#amqqueue{policy_version = PV}; +set_policy_version(Queue, PV) -> + amqqueue_v1:set_policy_version(Queue, PV). + +% recoverable_slaves + +-spec get_recoverable_slaves(amqqueue()) -> [atom()] | none. + +get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) -> + Slaves; +get_recoverable_slaves(Queue) -> + amqqueue_v1:get_recoverable_slaves(Queue). + +-spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue(). + +set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> + Queue#amqqueue{recoverable_slaves = Slaves}; +set_recoverable_slaves(Queue, Slaves) -> + amqqueue_v1:set_recoverable_slaves(Queue, Slaves). + +% quorum_nodes (new in v2) + +-spec get_quorum_nodes(amqqueue()) -> [node()]. + +get_quorum_nodes(#amqqueue{quorum_nodes = Nodes}) -> Nodes; +get_quorum_nodes(_) -> []. + +-spec set_quorum_nodes(amqqueue(), [node()]) -> amqqueue(). + +set_quorum_nodes(#amqqueue{} = Queue, Nodes) -> + Queue#amqqueue{quorum_nodes = Nodes}; +set_quorum_nodes(Queue, _Nodes) -> + Queue. + +% slave_pids + +-spec get_slave_pids(amqqueue()) -> [pid()] | none. + +get_slave_pids(#amqqueue{slave_pids = Slaves}) -> + Slaves; +get_slave_pids(Queue) -> + amqqueue_v1:get_slave_pids(Queue). + +-spec set_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). + +set_slave_pids(#amqqueue{} = Queue, SlavePids) -> + Queue#amqqueue{slave_pids = SlavePids}; +set_slave_pids(Queue, SlavePids) -> + amqqueue_v1:set_slave_pids(Queue, SlavePids). + +% slave_pids_pending_shutdown + +-spec get_slave_pids_pending_shutdown(amqqueue()) -> [pid()]. + +get_slave_pids_pending_shutdown(#amqqueue{slave_pids_pending_shutdown = Slaves}) -> + Slaves; +get_slave_pids_pending_shutdown(Queue) -> + amqqueue_v1:get_slave_pids_pending_shutdown(Queue). + +-spec set_slave_pids_pending_shutdown(amqqueue(), [pid()]) -> amqqueue(). + +set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> + Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}; +set_slave_pids_pending_shutdown(Queue, SlavePids) -> + amqqueue_v1:set_slave_pids_pending_shutdown(Queue, SlavePids). + +% state + +-spec get_state(amqqueue()) -> atom() | none. + +get_state(#amqqueue{state = State}) -> State; +get_state(Queue) -> amqqueue_v1:get_state(Queue). + +-spec set_state(amqqueue(), atom() | none) -> amqqueue(). + +set_state(#amqqueue{} = Queue, State) -> + Queue#amqqueue{state = State}; +set_state(Queue, State) -> + amqqueue_v1:set_state(Queue, State). + +% sync_slave_pids + +-spec get_sync_slave_pids(amqqueue()) -> [pid()] | none. + +get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> + Pids; +get_sync_slave_pids(Queue) -> + amqqueue_v1:get_sync_slave_pids(Queue). + +-spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). + +set_sync_slave_pids(#amqqueue{} = Queue, Pids) -> + Queue#amqqueue{sync_slave_pids = Pids}; +set_sync_slave_pids(Queue, Pids) -> + amqqueue_v1:set_sync_slave_pids(Queue, Pids). + +%% New in v2. + +-spec get_type(amqqueue()) -> atom(). + +get_type(#amqqueue{type = Type}) -> Type; +get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type. + +-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined. + +get_vhost(#amqqueue{vhost = VHost}) -> VHost; +get_vhost(Queue) -> amqqueue_v1:get_vhost(Queue). + +-spec is_auto_delete(amqqueue()) -> boolean(). + +is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> + AutoDelete; +is_auto_delete(Queue) -> + amqqueue_v1:is_auto_delete(Queue). + +-spec is_durable(amqqueue()) -> boolean(). + +is_durable(#amqqueue{durable = Durable}) -> Durable; +is_durable(Queue) -> amqqueue_v1:is_durable(Queue). + +-spec is_classic(amqqueue()) -> boolean(). + +is_classic(Queue) -> + get_type(Queue) =:= ?amqqueue_v1_type. + +-spec is_quorum(amqqueue()) -> boolean(). + +is_quorum(Queue) -> + get_type(Queue) =:= quorum. + +fields() -> + case record_version_to_use() of + ?record_version -> fields(?record_version); + _ -> amqqueue_v1:fields() + end. + +fields(?record_version) -> record_info(fields, amqqueue); +fields(Version) -> amqqueue_v1:fields(Version). + +field_vhost() -> + case record_version_to_use() of + ?record_version -> #amqqueue.vhost; + _ -> amqqueue_v1:field_vhost() + end. + +-spec pattern_match_all() -> amqqueue_pattern(). + +pattern_match_all() -> + case record_version_to_use() of + ?record_version -> #amqqueue{_ = '_'}; + _ -> amqqueue_v1:pattern_match_all() + end. + +-spec pattern_match_on_name(rabbit_amqqueue:name()) -> amqqueue_pattern(). + +pattern_match_on_name(Name) -> + case record_version_to_use() of + ?record_version -> #amqqueue{name = Name, _ = '_'}; + _ -> amqqueue_v1:pattern_match_on_name(Name) + end. + +-spec pattern_match_on_type(atom()) -> amqqueue_pattern(). + +pattern_match_on_type(Type) -> + case record_version_to_use() of + ?record_version -> #amqqueue{type = Type, _ = '_'}; + _ when Type =:= classic -> amqqueue_v1:pattern_match_all(); + %% FIXME: We try a pattern which should never match when the + %% `quorum_queue` feature flag is not enabled yet. Is there + %% a better solution? + _ -> amqqueue_v1:pattern_match_on_name( + rabbit_misc:r(<<0>>, queue, <<0>>)) + end. + +-spec reset_mirroring_and_decorators(amqqueue()) -> amqqueue(). + +reset_mirroring_and_decorators(#amqqueue{} = Queue) -> + Queue#amqqueue{slave_pids = [], + sync_slave_pids = [], + gm_pids = [], + decorators = undefined}; +reset_mirroring_and_decorators(Queue) -> + amqqueue_v1:reset_mirroring_and_decorators(Queue). + +-spec set_immutable(amqqueue()) -> amqqueue(). + +set_immutable(#amqqueue{} = Queue) -> + Queue#amqqueue{pid = none, + slave_pids = [], + sync_slave_pids = none, + recoverable_slaves = none, + gm_pids = none, + policy = none, + decorators = none, + state = none}; +set_immutable(Queue) -> + amqqueue_v1:set_immutable(Queue). + +-spec qnode(amqqueue() | pid() | ra_server_id()) -> node(). + +qnode(Queue) when ?is_amqqueue(Queue) -> + QPid = get_pid(Queue), + qnode(QPid); +qnode(QPid) when is_pid(QPid) -> + node(QPid); +qnode({_, Node}) -> + Node. + +% private + +macros() -> + io:format( + "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n", + [?record_version, record_info(size, amqqueue)]), + %% The field number starts at 2 because the first element is the + %% record name. + macros(record_info(fields, amqqueue), 2). + +macros([Field | Rest], I) -> + io:format( + "-define(~s_field_~s(Q), element(~b, Q)).~n", + [?record_version, Field, I]), + macros(Rest, I + 1); +macros([], _) -> + ok. diff --git a/src/amqqueue_v1.erl b/src/amqqueue_v1.erl new file mode 100644 index 0000000000..0b35418d1d --- /dev/null +++ b/src/amqqueue_v1.erl @@ -0,0 +1,403 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved. +%% + +-module(amqqueue_v1). + +-include_lib("rabbit_common/include/resource.hrl"). + +-export([new/8, + new_with_version/9, + fields/0, + fields/1, + field_vhost/0, + record_version_to_use/0, + upgrade/1, + upgrade_to/2, + % arguments + get_arguments/1, + set_arguments/2, + % decorators + get_decorators/1, + set_decorators/2, + % exclusive_owner + get_exclusive_owner/1, + % gm_pids + get_gm_pids/1, + set_gm_pids/2, + % name + get_name/1, + set_name/2, + % operator_policy + get_operator_policy/1, + set_operator_policy/2, + get_options/1, + % pid + get_pid/1, + set_pid/2, + % policy + get_policy/1, + set_policy/2, + % policy_version + get_policy_version/1, + set_policy_version/2, + % recoverable_slaves + get_recoverable_slaves/1, + set_recoverable_slaves/2, + % slave_pids + get_slave_pids/1, + set_slave_pids/2, + % slave_pids_pending_shutdown + get_slave_pids_pending_shutdown/1, + set_slave_pids_pending_shutdown/2, + % state + get_state/1, + set_state/2, + % sync_slave_pids + get_sync_slave_pids/1, + set_sync_slave_pids/2, + get_vhost/1, + is_amqqueue/1, + is_auto_delete/1, + is_durable/1, + pattern_match_all/0, + pattern_match_on_name/1, + reset_mirroring_and_decorators/1, + set_immutable/1, + macros/0]). + +-define(record_version, ?MODULE). + +-record(amqqueue, { + name :: rabbit_amqqueue:name() | '_', %% immutable + durable :: boolean() | '_', %% immutable + auto_delete :: boolean() | '_', %% immutable + exclusive_owner = none :: pid() | none | '_', %% immutable + arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable + pid :: pid() | none | '_', %% durable (just so we + %% know home node) + slave_pids = [] :: [pid()] | none | '_', %% transient + sync_slave_pids = [] :: [pid()] | none| '_',%% transient + recoverable_slaves = [] :: [atom()] | none | '_', %% durable + policy :: binary() | none | undefined | '_', %% durable, implicit + %% update as above + operator_policy :: binary() | none | undefined | '_', %% durable, + %% implicit + %% update + %% as above + gm_pids = [] :: [pid()] | none | '_', %% transient + decorators :: [atom()] | none | undefined | '_', %% transient, + %% recalculated + %% as above + state = live :: atom() | none | '_', %% durable (have we crashed?) + policy_version = 0 :: non_neg_integer() | '_', + slave_pids_pending_shutdown = [] :: [pid()] | '_', + vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index + options = #{} :: map() | '_' + }). + +-type amqqueue() :: amqqueue_v1(). +-type amqqueue_v1() :: #amqqueue{ + name :: rabbit_amqqueue:name(), + durable :: boolean(), + auto_delete :: boolean(), + exclusive_owner :: pid() | none, + arguments :: rabbit_framing:amqp_table(), + pid :: pid() | none, + slave_pids :: [pid()] | none, + sync_slave_pids :: [pid()] | none, + recoverable_slaves :: [atom()] | none, + policy :: binary() | none | undefined, + operator_policy :: binary() | none | undefined, + gm_pids :: [pid()] | none, + decorators :: [atom()] | none | undefined, + state :: atom() | none, + policy_version :: non_neg_integer(), + slave_pids_pending_shutdown :: [pid()], + vhost :: rabbit_types:vhost() | undefined, + options :: map() + }. + +-type amqqueue_pattern() :: amqqueue_v1_pattern(). +-type amqqueue_v1_pattern() :: #amqqueue{ + name :: rabbit_amqqueue:name() | '_', + durable :: '_', + auto_delete :: '_', + exclusive_owner :: '_', + arguments :: '_', + pid :: '_', + slave_pids :: '_', + sync_slave_pids :: '_', + recoverable_slaves :: '_', + policy :: '_', + operator_policy :: '_', + gm_pids :: '_', + decorators :: '_', + state :: '_', + policy_version :: '_', + slave_pids_pending_shutdown :: '_', + vhost :: '_', + options :: '_' + }. + +-export_type([amqqueue/0, + amqqueue_v1/0, + amqqueue_pattern/0, + amqqueue_v1_pattern/0]). + +-spec new(rabbit_amqqueue:name(), + pid() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, + map()) -> amqqueue(). + +new(#resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options) + when (is_pid(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) -> + new_with_version( + ?record_version, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options). + +-spec new_with_version(amqqueue_v1, + rabbit_amqqueue:name(), + pid() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, + map()) -> amqqueue(). + +new_with_version(?record_version, + #resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options) + when (is_pid(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) -> + #amqqueue{name = Name, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = Pid, + vhost = VHost, + options = Options}. + +-spec is_amqqueue(any()) -> boolean(). + +is_amqqueue(#amqqueue{}) -> true; +is_amqqueue(_) -> false. + +-spec record_version_to_use() -> amqqueue_v1. + +record_version_to_use() -> + ?record_version. + +-spec upgrade(amqqueue()) -> amqqueue(). + +upgrade(#amqqueue{} = Queue) -> + Queue. + +-spec upgrade_to(amqqueue_v1, amqqueue()) -> amqqueue(). + +upgrade_to(?record_version, #amqqueue{} = Queue) -> + Queue. + +% arguments + +-spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table(). + +get_arguments(#amqqueue{arguments = Args}) -> Args. + +-spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue(). + +set_arguments(#amqqueue{} = Queue, Args) -> + Queue#amqqueue{arguments = Args}. + +% decorators + +get_decorators(#amqqueue{decorators = Decorators}) -> Decorators. + +set_decorators(#amqqueue{} = Queue, Decorators) -> + Queue#amqqueue{decorators = Decorators}. + +get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> Owner. + +% gm_pids + +get_gm_pids(#amqqueue{gm_pids = GMPids}) -> GMPids. + +set_gm_pids(#amqqueue{} = Queue, GMPids) -> + Queue#amqqueue{gm_pids = GMPids}. + +% name + +get_name(#amqqueue{name = Name}) -> Name. + +set_name(#amqqueue{} = Queue, Name) -> + Queue#amqqueue{name = Name}. + +% operator_policy + +get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy. + +set_operator_policy(#amqqueue{} = Queue, OpPolicy) -> + Queue#amqqueue{operator_policy = OpPolicy}. + +get_options(#amqqueue{options = Options}) -> Options. + +% pid + +get_pid(#amqqueue{pid = Pid}) -> Pid. + +set_pid(#amqqueue{} = Queue, Pid) -> + Queue#amqqueue{pid = Pid}. + +% policy + +get_policy(#amqqueue{policy = Policy}) -> Policy. + +set_policy(#amqqueue{} = Queue, Policy) -> + Queue#amqqueue{policy = Policy}. + +% policy_version + +get_policy_version(#amqqueue{policy_version = PV}) -> + PV. + +set_policy_version(#amqqueue{} = Queue, PV) -> + Queue#amqqueue{policy_version = PV}. + +% recoverable_slaves + +get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) -> + Slaves. + +set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> + Queue#amqqueue{recoverable_slaves = Slaves}. + +% slave_pids + +get_slave_pids(#amqqueue{slave_pids = Slaves}) -> + Slaves. + +set_slave_pids(#amqqueue{} = Queue, SlavePids) -> + Queue#amqqueue{slave_pids = SlavePids}. + +% slave_pids_pending_shutdown + +get_slave_pids_pending_shutdown(#amqqueue{slave_pids_pending_shutdown = Slaves}) -> + Slaves. + +set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> + Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}. + +% state + +get_state(#amqqueue{state = State}) -> State. + +set_state(#amqqueue{} = Queue, State) -> Queue#amqqueue{state = State}. + +% sync_slave_pids + +get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> Pids. + +set_sync_slave_pids(#amqqueue{} = Queue, Pids) -> + Queue#amqqueue{sync_slave_pids = Pids}. + +get_vhost(#amqqueue{vhost = VHost}) -> VHost. + +is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> AutoDelete. + +is_durable(#amqqueue{durable = Durable}) -> Durable. + +fields() -> fields(?record_version). + +fields(?record_version) -> record_info(fields, amqqueue). + +field_vhost() -> #amqqueue.vhost. + +-spec pattern_match_all() -> amqqueue_pattern(). + +pattern_match_all() -> #amqqueue{_ = '_'}. + +-spec pattern_match_on_name(rabbit_amqqueue:name()) -> + amqqueue_pattern(). + +pattern_match_on_name(Name) -> #amqqueue{name = Name, _ = '_'}. + +reset_mirroring_and_decorators(#amqqueue{} = Queue) -> + Queue#amqqueue{slave_pids = [], + sync_slave_pids = [], + gm_pids = [], + decorators = undefined}. + +set_immutable(#amqqueue{} = Queue) -> + Queue#amqqueue{pid = none, + slave_pids = none, + sync_slave_pids = none, + recoverable_slaves = none, + gm_pids = none, + policy = none, + decorators = none, + state = none}. + +macros() -> + io:format( + "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n", + [?record_version, record_info(size, amqqueue)]), + %% The field number starts at 2 because the first element is the + %% record name. + macros(record_info(fields, amqqueue), 2). + +macros([Field | Rest], I) -> + io:format( + "-define(~s_field_~s(Q), element(~b, Q)).~n", + [?record_version, Field, I]), + macros(Rest, I + 1); +macros([], _) -> + ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 63d89ff4a7..ced59a8c62 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -21,11 +21,12 @@ delete_immediately/1, delete_exclusive/2, delete/4, purge/1, forget_all_durable/1, delete_crashed/1, delete_crashed/2, delete_crashed_internal/2]). --export([pseudo_queue/2, immutable/1]). +-export([pseudo_queue/2, pseudo_queue/3, immutable/1]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver/3, requeue/4, ack/4, reject/5]). +-export([not_found/1, absent/2]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). @@ -56,6 +57,7 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). +-include("amqqueue.hrl"). -define(INTEGER_ARG_TYPES, [byte, short, signedint, long, unsignedbyte, unsignedshort, unsignedint]). @@ -71,99 +73,102 @@ -type name() :: rabbit_types:r('queue'). -type qpids() :: [pid()]. -type qlen() :: rabbit_types:ok(non_neg_integer()). --type qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return()). +-type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()). -type qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}. -type msg_id() :: non_neg_integer(). -type ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}. -type absent_reason() :: 'nodedown' | 'crashed'. --type queue_or_absent() :: rabbit_types:amqqueue() | - {'absent', rabbit_types:amqqueue(),absent_reason()}. +-type queue_or_absent() :: amqqueue:amqqueue() | + {'absent', amqqueue:amqqueue(),absent_reason()}. -type not_found_or_absent() :: - 'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}. --spec recover(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. + 'not_found' | {'absent', amqqueue:amqqueue(), absent_reason()}. +-spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. -spec stop(rabbit_types:vhost()) -> 'ok'. --spec start([rabbit_types:amqqueue()]) -> 'ok'. +-spec start([amqqueue:amqqueue()]) -> 'ok'. -spec declare (name(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), rabbit_types:username()) -> {'new' | 'existing' | 'absent' | 'owner_died', - rabbit_types:amqqueue()} | - {'new', rabbit_types:amqqueue(), rabbit_fifo_client:state()} | + amqqueue:amqqueue()} | + {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} | rabbit_types:channel_exit(). -spec declare (name(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), rabbit_types:username(), node()) -> - {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} | - {'new', rabbit_types:amqqueue(), rabbit_fifo_client:state()} | - {'absent', rabbit_types:amqqueue(), absent_reason()} | + {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} | + {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} | + {'absent', amqqueue:amqqueue(), absent_reason()} | rabbit_types:channel_exit(). --spec internal_declare(rabbit_types:amqqueue(), boolean()) -> +-spec internal_declare(amqqueue:amqqueue(), boolean()) -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent()). -spec update - (name(), fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> - 'not_found' | rabbit_types:amqqueue(). + (name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) -> + 'not_found' | amqqueue:amqqueue(). -spec lookup (name()) -> - rabbit_types:ok(rabbit_types:amqqueue()) | + rabbit_types:ok(amqqueue:amqqueue()) | rabbit_types:error('not_found'); ([name()]) -> - [rabbit_types:amqqueue()]. + [amqqueue:amqqueue()]. -spec not_found_or_absent(name()) -> not_found_or_absent(). -spec with(name(), qfun(A)) -> A | rabbit_types:error(not_found_or_absent()). -spec with(name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B. -spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit(). +-spec not_found(rabbit_types:r(atom())) -> rabbit_types:channel_exit(). +-spec absent(amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()) -> + rabbit_types:channel_exit(). -spec assert_equivalence - (rabbit_types:amqqueue(), boolean(), boolean(), + (amqqueue:amqqueue(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> 'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit(). --spec check_exclusive_access(rabbit_types:amqqueue(), pid()) -> +-spec check_exclusive_access(amqqueue:amqqueue(), pid()) -> 'ok' | rabbit_types:channel_exit(). -spec with_exclusive_access_or_die(name(), pid(), qfun(A)) -> A | rabbit_types:channel_exit(). --spec list() -> [rabbit_types:amqqueue()]. --spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. +-spec list() -> [amqqueue:amqqueue()]. +-spec list(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. -spec list_names() -> [rabbit_amqqueue:name()]. --spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. --spec list_by_type(atom()) -> [rabbit_types:amqqueue()]. +-spec list_down(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. +-spec list_by_type(atom()) -> [amqqueue:amqqueue()]. -spec info_keys() -> rabbit_types:info_keys(). --spec info(rabbit_types:amqqueue()) -> rabbit_types:infos(). --spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) -> +-spec info(amqqueue:amqqueue()) -> rabbit_types:infos(). +-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos(). -spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. -spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]. --spec notify_policy_changed(rabbit_types:amqqueue()) -> 'ok'. --spec consumers(rabbit_types:amqqueue()) -> +-spec notify_policy_changed(amqqueue:amqqueue()) -> 'ok'. +-spec consumers(amqqueue:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(), rabbit_framing:amqp_table()}]. -spec consumer_info_keys() -> rabbit_types:info_keys(). -spec consumers_all(rabbit_types:vhost()) -> [{name(), pid(), rabbit_types:ctag(), boolean(), non_neg_integer(), rabbit_framing:amqp_table()}]. --spec stat(rabbit_types:amqqueue()) -> +-spec stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. -spec delete_immediately(qpids()) -> 'ok'. -spec delete_exclusive(qpids(), pid()) -> 'ok'. -spec delete - (rabbit_types:amqqueue(), 'false', 'false', rabbit_types:username()) -> + (amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) -> qlen(); - (rabbit_types:amqqueue(), 'true' , 'false', rabbit_types:username()) -> + (amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) -> qlen() | rabbit_types:error('in_use'); - (rabbit_types:amqqueue(), 'false', 'true', rabbit_types:username()) -> + (amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) -> qlen() | rabbit_types:error('not_empty'); - (rabbit_types:amqqueue(), 'true' , 'true', rabbit_types:username()) -> + (amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) -> qlen() | rabbit_types:error('in_use') | rabbit_types:error('not_empty'). --spec delete_crashed(rabbit_types:amqqueue()) -> 'ok'. --spec delete_crashed_internal(rabbit_types:amqqueue(), rabbit_types:username()) -> 'ok'. --spec purge(rabbit_types:amqqueue()) -> {ok, qlen()}. +-spec delete_crashed(amqqueue:amqqueue()) -> 'ok'. +-spec delete_crashed_internal(amqqueue:amqqueue(), rabbit_types:username()) -> 'ok'. +-spec purge(amqqueue:amqqueue()) -> {ok, qlen()}. -spec forget_all_durable(node()) -> 'ok'. --spec deliver([rabbit_types:amqqueue()], rabbit_types:delivery(), #{Name :: atom() => rabbit_fifo_client:state()} | 'untracked') -> +-spec deliver([amqqueue:amqqueue()], rabbit_types:delivery(), #{Name :: atom() => rabbit_fifo_client:state()} | 'untracked') -> {qpids(), #{Name :: atom() => rabbit_fifo_client:state()}}. --spec deliver([rabbit_types:amqqueue()], rabbit_types:delivery()) -> 'ok'. +-spec deliver([amqqueue:amqqueue()], rabbit_types:delivery()) -> 'ok'. -spec requeue(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. -spec ack(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. -spec reject(pid() | {atom(), node()}, [msg_id()], boolean(), pid(), @@ -172,24 +177,24 @@ -spec notify_down_all(qpids(), pid(), non_neg_integer()) -> ok_or_errors(). -spec activate_limit_all(qpids(), pid()) -> ok_or_errors(). --spec basic_get(rabbit_types:amqqueue(), pid(), boolean(), pid(), rabbit_types:ctag(), +-spec basic_get(amqqueue:amqqueue(), pid(), boolean(), pid(), rabbit_types:ctag(), #{Name :: atom() => rabbit_fifo_client:state()}) -> {'ok', non_neg_integer(), qmsg()} | 'empty'. -spec credit - (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), + (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), boolean(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. -spec basic_consume - (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), + (amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(), non_neg_integer(), rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any(), rabbit_types:username(), #{Name :: atom() => rabbit_fifo_client:state()}) -> rabbit_types:ok_or_error('exclusive_consume_unavailable'). -spec basic_cancel - (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any(), + (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), any(), rabbit_types:username(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok' | {'ok', #{Name :: atom() => rabbit_fifo_client:state()}}. --spec notify_decorators(rabbit_types:amqqueue()) -> 'ok'. +-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'. -spec resume(pid(), pid()) -> 'ok'. -spec internal_delete(name(), rabbit_types:username()) -> 'ok' | rabbit_types:connection_exit() | @@ -202,20 +207,21 @@ -spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. -spec on_node_up(node()) -> 'ok'. -spec on_node_down(node()) -> 'ok'. --spec pseudo_queue(name(), pid()) -> rabbit_types:amqqueue(). --spec immutable(rabbit_types:amqqueue()) -> rabbit_types:amqqueue(). --spec store_queue(rabbit_types:amqqueue()) -> 'ok'. +-spec pseudo_queue(name(), pid()) -> amqqueue:amqqueue(). +-spec pseudo_queue(name(), pid(), boolean()) -> amqqueue:amqqueue(). +-spec immutable(amqqueue:amqqueue()) -> amqqueue:amqqueue(). +-spec store_queue(amqqueue:amqqueue()) -> 'ok'. -spec update_decorators(name()) -> 'ok'. --spec policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> +-spec policy_changed(amqqueue:amqqueue(), amqqueue:amqqueue()) -> 'ok'. -spec update_mirroring(pid()) -> 'ok'. --spec sync_mirrors(rabbit_types:amqqueue() | pid()) -> +-spec sync_mirrors(amqqueue:amqqueue() | pid()) -> 'ok' | rabbit_types:error('not_mirrored'). --spec cancel_sync_mirrors(rabbit_types:amqqueue() | pid()) -> +-spec cancel_sync_mirrors(amqqueue:amqqueue() | pid()) -> 'ok' | {'ok', 'not_syncing'}. --spec is_replicated(rabbit_types:amqqueue()) -> boolean(). +-spec is_replicated(amqqueue:amqqueue()) -> boolean(). --spec pid_of(rabbit_types:amqqueue()) -> +-spec pid_of(amqqueue:amqqueue()) -> {'ok', pid()} | rabbit_types:error('not_found'). -spec pid_of(rabbit_types:vhost(), rabbit_misc:resource_name()) -> {'ok', pid()} | rabbit_types:error('not_found'). @@ -252,7 +258,7 @@ recover_classic_queues(VHost, Queues) -> %% order as the supplied queue names, so that we can zip them together %% for further processing in recover_durable_queues. {ok, OrderedRecoveryTerms} = - BQ:start(VHost, [QName || #amqqueue{name = QName} <- Queues]), + BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]), case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of {ok, _} -> recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)); @@ -262,7 +268,7 @@ recover_classic_queues(VHost, Queues) -> end. filter_per_type(Queues) -> - lists:partition(fun(#amqqueue{type = Type}) -> Type == classic end, Queues). + lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues). filter_pid_per_type(QPids) -> lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids). @@ -287,47 +293,42 @@ start(Qs) -> %% visible to routing, so now it is safe for them to complete %% their initialisation (which may involve interacting with other %% queues). - _ = [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Classic], + _ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic], ok. mark_local_durable_queues_stopped(VHost) -> Qs = find_local_durable_classic_queues(VHost), rabbit_misc:execute_mnesia_transaction( fun() -> - [ store_queue(Q#amqqueue{ state = stopped }) - || Q = #amqqueue{ state = State } <- Qs, - State =/= stopped ] + [ store_queue(amqqueue:set_state(Q, stopped)) + || Q <- Qs, + amqqueue:get_state(Q) =/= stopped ] end). find_local_quorum_queues(VHost) -> Node = node(), mnesia:async_dirty( fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{vhost = VH, - type = quorum, - quorum_nodes = QuorumNodes} - <- mnesia:table(rabbit_durable_queue), - VH =:= VHost, - (lists:member(Node, QuorumNodes))])) + qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue), + amqqueue:get_vhost(Q) =:= VHost, + amqqueue:is_quorum(Q) andalso + (lists:member(Node, amqqueue:get_quorum_nodes(Q)))])) end). find_local_durable_classic_queues(VHost) -> Node = node(), mnesia:async_dirty( fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{name = Name, - vhost = VH, - pid = Pid, - type = classic} - <- mnesia:table(rabbit_durable_queue), - VH =:= VHost, - (is_local_to_node(Pid, Node) andalso + qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue), + amqqueue:get_vhost(Q) =:= VHost, + amqqueue:is_classic(Q) andalso + (is_local_to_node(amqqueue:get_pid(Q), Node) andalso %% Terminations on node down will not remove the rabbit_queue %% record if it is a mirrored queue (such info is now obtained from %% the policy). Thus, we must check if the local pid is alive %% - if the record is present - in order to restart. - (mnesia:read(rabbit_queue, Name, read) =:= [] - orelse not rabbit_mnesia:is_process_alive(Pid))) + (mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= [] + orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)))) ])) end). @@ -335,20 +336,16 @@ find_recoverable_queues() -> Node = node(), mnesia:async_dirty( fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{name = Name, - pid = Pid, - type = Type, - quorum_nodes = QuorumNodes} - <- mnesia:table(rabbit_durable_queue), - (Type == classic andalso - (is_local_to_node(Pid, Node) andalso + qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue), + (amqqueue:is_classic(Q) andalso + (is_local_to_node(amqqueue:get_pid(Q), Node) andalso %% Terminations on node down will not remove the rabbit_queue %% record if it is a mirrored queue (such info is now obtained from %% the policy). Thus, we must check if the local pid is alive %% - if the record is present - in order to restart. - (mnesia:read(rabbit_queue, Name, read) =:= [] - orelse not rabbit_mnesia:is_process_alive(Pid)))) - orelse (Type == quorum andalso lists:member(Node, QuorumNodes)) + (mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= [] + orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))))) + orelse (amqqueue:is_quorum(Q) andalso lists:member(Node, amqqueue:get_quorum_nodes(Q))) ])) end). @@ -372,32 +369,39 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args, Owner, ActingUser, Node) -> ok = check_declare_arguments(QueueName, Args), Type = get_queue_type(Args), - Q = rabbit_queue_decorator:set( - rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - recoverable_slaves = [], - gm_pids = [], - state = live, - policy_version = 0, - slave_pids_pending_shutdown = [], - vhost = VHost, - options = #{user => ActingUser}, - type = Type})), - - case Type of - classic -> - declare_classic_queue(Q, Node); - quorum -> - rabbit_quorum_queue:declare(Q) + TypeIsAllowed = + Type =:= classic orelse + rabbit_feature_flags:is_enabled(quorum_queue), + case TypeIsAllowed of + true -> + Q0 = amqqueue:new(QueueName, + none, + Durable, + AutoDelete, + Owner, + Args, + VHost, + #{user => ActingUser}, + Type), + Q = rabbit_queue_decorator:set( + rabbit_policy:set(Q0)), + do_declare(Q, Node); + false -> + rabbit_misc:protocol_error( + internal_error, + "Cannot declare a queue '~s' of type '~s' on node '~s': " + "the 'quorum_queue' feature flag is disabled", + [rabbit_misc:rs(QueueName), Type, Node]) end. -declare_classic_queue(#amqqueue{name = QName, vhost = VHost} = Q, Node) -> +do_declare(Q, Node) when ?amqqueue_is_classic(Q) -> + declare_classic_queue(Q, Node); +do_declare(Q, _Node) when ?amqqueue_is_quorum(Q) -> + rabbit_quorum_queue:declare(Q). + +declare_classic_queue(Q, Node) -> + QName = amqqueue:get_name(Q), + VHost = amqqueue:get_vhost(Q), Node1 = case rabbit_queue_master_location_misc:get_location(Q) of {ok, Node0} -> Node0; {error, _} -> Node @@ -425,17 +429,18 @@ get_queue_type(Args) -> internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> - ok = store_queue(Q#amqqueue{state = live}), + ok = store_queue(amqqueue:set_state(Q, live)), rabbit_misc:const({created, Q}) end); -internal_declare(Q = #amqqueue{name = QueueName}, false) -> +internal_declare(Q, false) -> + QueueName = amqqueue:get_name(Q), rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> case not_found_or_absent(QueueName) of not_found -> Q1 = rabbit_policy:set(Q), - Q2 = Q1#amqqueue{state = live}, + Q2 = amqqueue:set_state(Q1, live), ok = store_queue(Q2), fun () -> {created, Q2} end; {absent, _Q, _} = R -> rabbit_misc:const(R) @@ -447,7 +452,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> update(Name, Fun) -> case mnesia:wread({rabbit_queue, Name}) of - [Q = #amqqueue{durable = Durable}] -> + [Q] -> + Durable = amqqueue:is_durable(Q), Q1 = Fun(Q), ok = mnesia:write(rabbit_queue, Q1, write), case Durable of @@ -468,14 +474,11 @@ ensure_rabbit_queue_record_is_initialized(Q) -> rabbit_misc:const({ok, Q}) end). -store_queue(Q = #amqqueue{durable = true}) -> - ok = mnesia:write(rabbit_durable_queue, - Q#amqqueue{slave_pids = [], - sync_slave_pids = [], - gm_pids = [], - decorators = undefined}, write), +store_queue(Q) when ?amqqueue_is_durable(Q) -> + Q1 = amqqueue:reset_mirroring_and_decorators(Q), + ok = mnesia:write(rabbit_durable_queue, Q1, write), store_queue_ram(Q); -store_queue(Q = #amqqueue{durable = false}) -> +store_queue(Q) when not ?amqqueue_is_durable(Q) -> store_queue_ram(Q). store_queue_ram(Q) -> @@ -491,8 +494,9 @@ update_decorators(Name) -> end end). -policy_changed(Q1 = #amqqueue{decorators = Decorators1}, - Q2 = #amqqueue{decorators = Decorators2}) -> +policy_changed(Q1, Q2) -> + Decorators1 = amqqueue:get_decorators(Q1), + Decorators2 = amqqueue:get_decorators(Q2), rabbit_mirror_queue_misc:update_mirrors(Q1, Q2), D1 = rabbit_queue_decorator:select(Decorators1), D2 = rabbit_queue_decorator:select(Decorators2), @@ -532,20 +536,20 @@ with(Name, F, E) -> with(Name, F, E, RetriesLeft) -> case lookup(Name) of - {ok, Q = #amqqueue{state = live}} when RetriesLeft =:= 0 -> + {ok, Q} when ?amqqueue_state_is(Q, live) andalso RetriesLeft =:= 0 -> %% Something bad happened to that queue, we are bailing out %% on processing current request. E({absent, Q, timeout}); - {ok, Q = #amqqueue{state = stopped}} when RetriesLeft =:= 0 -> + {ok, Q} when ?amqqueue_state_is(Q, stopped) andalso RetriesLeft =:= 0 -> %% The queue was stopped and not migrated E({absent, Q, stopped}); %% The queue process has crashed with unknown error - {ok, Q = #amqqueue{state = crashed}} -> + {ok, Q} when ?amqqueue_state_is(Q, crashed) -> E({absent, Q, crashed}); %% The queue process has been stopped by a supervisor. %% In that case a synchronised slave can take over %% so we should retry. - {ok, Q = #amqqueue{state = stopped}} -> + {ok, Q} when ?amqqueue_state_is(Q, stopped) -> %% The queue process was stopped by the supervisor rabbit_misc:with_exit_handler( fun () -> retry_wait(Q, F, E, RetriesLeft) end, @@ -553,7 +557,7 @@ with(Name, F, E, RetriesLeft) -> %% The queue is supposed to be active. %% The master node can go away or queue can be killed %% so we retry, waiting for a slave to take over. - {ok, Q = #amqqueue{state = live}} -> + {ok, Q} when ?amqqueue_state_is(Q, live) -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do %% with the QPid. F() should be written s.t. that this @@ -567,7 +571,10 @@ with(Name, F, E, RetriesLeft) -> E(not_found_or_absent_dirty(Name)) end. -retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, RetriesLeft) -> +retry_wait(Q, F, E, RetriesLeft) -> + Name = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), + QState = amqqueue:get_state(Q), case {QState, is_replicated(Q)} of %% We don't want to repeat an operation if %% there are no slaves to migrate to @@ -592,14 +599,47 @@ retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, Retries with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). with_or_die(Name, F) -> - with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name); - ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) + with(Name, F, fun (not_found) -> not_found(Name); + ({absent, Q, Reason}) -> absent(Q, Reason) end). -assert_equivalence(#amqqueue{name = QName, - durable = Durable, - auto_delete = AD} = Q, - Durable1, AD1, Args1, Owner) -> +not_found(R) -> rabbit_misc:protocol_error(not_found, "no ~s", [rabbit_misc:rs(R)]). + +absent(Q, AbsentReason) -> + QueueName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), + IsDurable = amqqueue:is_durable(Q), + priv_absent(QueueName, QPid, IsDurable, AbsentReason). + +priv_absent(QueueName, QPid, true, nodedown) -> + %% The assertion of durability is mainly there because we mention + %% durability in the error message. That way we will hopefully + %% notice if at some future point our logic changes s.t. we get + %% here with non-durable queues. + rabbit_misc:protocol_error( + not_found, + "home node '~s' of durable ~s is down or inaccessible", + [node(QPid), rabbit_misc:rs(QueueName)]); + +priv_absent(QueueName, _QPid, _IsDurable, stopped) -> + rabbit_misc:protocol_error( + not_found, + "~s process is stopped by supervisor", [rabbit_misc:rs(QueueName)]); + +priv_absent(QueueName, _QPid, _IsDurable, crashed) -> + rabbit_misc:protocol_error( + not_found, + "~s has crashed and failed to restart", [rabbit_misc:rs(QueueName)]); + +priv_absent(QueueName, _QPid, _IsDurable, timeout) -> + rabbit_misc:protocol_error( + not_found, + "failed to perform operation on ~s due to timeout", [rabbit_misc:rs(QueueName)]). + +assert_equivalence(Q, Durable1, AD1, Args1, Owner) -> + QName = amqqueue:get_name(Q), + Durable = amqqueue:is_durable(Q), + AD = amqqueue:is_auto_delete(Q), rabbit_misc:assert_field_equivalence(Durable, Durable1, QName, durable), rabbit_misc:assert_field_equivalence(AD, AD1, QName, auto_delete), assert_args_equivalence(Q, Args1), @@ -607,11 +647,14 @@ assert_equivalence(#amqqueue{name = QName, check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). -check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> +check_exclusive_access(Q, Owner, _MatchType) + when ?amqqueue_exclusive_owner_is(Q, Owner) -> ok; -check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> +check_exclusive_access(Q, _ReaderPid, lax) + when ?amqqueue_exclusive_owner_is(Q, none) -> ok; -check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) -> +check_exclusive_access(Q, _ReaderPid, _MatchType) -> + QueueName = amqqueue:get_name(Q), rabbit_misc:protocol_error( resource_locked, "cannot obtain exclusive access to locked ~s", @@ -621,8 +664,9 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> with_or_die(Name, fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). -assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, - RequiredArgs) -> +assert_args_equivalence(Q, RequiredArgs) -> + QueueName = amqqueue:get_name(Q), + Args = amqqueue:get_arguments(Q), rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, [Key || {Key, _Fun} <- declare_args()]). @@ -750,41 +794,37 @@ check_queue_type({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. -list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). +list() -> mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()). list_names() -> mnesia:dirty_all_keys(rabbit_queue). -list_names(VHost) -> [Q#amqqueue.name || Q <- list(VHost)]. +list_names(VHost) -> [amqqueue:get_name(Q) || Q <- list(VHost)]. list_local_names() -> - [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(), - State =/= crashed, is_local_to_node(QPid, node())]. + [ amqqueue:get_name(Q) || Q <- list(), + amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())]. list_by_type(Type) -> {atomic, Qs} = mnesia:sync_transaction( fun () -> mnesia:match_object(rabbit_durable_queue, - #amqqueue{_ = '_', type = Type}, read) + amqqueue:pattern_match_on_type(Type), + read) end), Qs. list_local_followers() -> - [ Q#amqqueue.name - || #amqqueue{state = State, type = quorum, pid = {_, Leader}, - quorum_nodes = Nodes} = Q <- list(), - State =/= crashed, Leader =/= node(), lists:member(node(), Nodes)]. + [ amqqueue:get_name(Q) + || Q <- list(), + amqqueue:is_quorum(Q), + amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), amqqueue:get_quorum_nodes(Q))]. is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) -> Node =:= node(QPid); is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) -> Node =:= Leader. -qnode(QPid) when ?IS_CLASSIC(QPid) -> - node(QPid); -qnode({_, Node} = QPid) when ?IS_QUORUM(QPid) -> - Node. - list(VHostPath) -> list(VHostPath, rabbit_queue). @@ -795,7 +835,7 @@ list(VHostPath, TableName) -> fun () -> mnesia:match_object( TableName, - #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}, + amqqueue:pattern_match_on_name(rabbit_misc:r(VHostPath, queue)), read) end). @@ -805,8 +845,9 @@ list_down(VHostPath) -> true -> Present = list(VHostPath), Durable = list(VHostPath, rabbit_durable_queue), - PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]), - sets:to_list(sets:filter(fun (#amqqueue{name = N}) -> + PresentS = sets:from_list([amqqueue:get_name(Q) || Q <- Present]), + sets:to_list(sets:filter(fun (Q) -> + N = amqqueue:get_name(Q), not sets:is_element(N, PresentS) end, sets:from_list(Durable))) end. @@ -818,7 +859,7 @@ count(VHost) -> %% won't work here because with master migration of mirrored queues %% the "ownership" of queues by nodes becomes a non-trivial problem %% that requires a proper consensus algorithm. - length(mnesia:dirty_index_read(rabbit_queue, VHost, #amqqueue.vhost)) + length(mnesia:dirty_index_read(rabbit_queue, VHost, amqqueue:field_vhost())) catch _:Err -> rabbit_log:error("Failed to fetch number of queues in vhost ~p:~n~p~n", [VHost, Err]), @@ -829,9 +870,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). -is_unresponsive(#amqqueue{ state = crashed }, _Timeout) -> +is_unresponsive(Q, _Timeout) when ?amqqueue_state_is(Q, crashed) -> false; -is_unresponsive(#amqqueue{ pid = QPid }, Timeout) -> +is_unresponsive(Q, Timeout) -> + QPid = amqqueue:get_pid(Q), try delegate:invoke(QPid, {gen_server2, call, [{info, [name]}, Timeout]}), false @@ -841,21 +883,24 @@ is_unresponsive(#amqqueue{ pid = QPid }, Timeout) -> true end. -format(Q = #amqqueue{ type = quorum }) -> rabbit_quorum_queue:format(Q); +format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q); format(_) -> []. -info(Q = #amqqueue{ type = quorum }) -> rabbit_quorum_queue:info(Q); -info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed); -info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped); -info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}). +info(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:info(Q); +info(Q) when ?amqqueue_state_is(Q, crashed) -> info_down(Q, crashed); +info(Q) when ?amqqueue_state_is(Q, stopped) -> info_down(Q, stopped); +info(Q) -> + QPid = amqqueue:get_pid(Q), + delegate:invoke(QPid, {gen_server2, call, [info, infinity]}). -info(Q = #amqqueue{ type = quorum }, Items) -> +info(Q, Items) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:info(Q, Items); -info(Q = #amqqueue{ state = crashed }, Items) -> +info(Q, Items) when ?amqqueue_state_is(Q, crashed) -> info_down(Q, Items, crashed); -info(Q = #amqqueue{ state = stopped }, Items) -> +info(Q, Items) when ?amqqueue_state_is(Q, stopped) -> info_down(Q, Items, stopped); -info(#amqqueue{ pid = QPid }, Items) -> +info(Q, Items) -> + QPid = amqqueue:get_pid(Q), case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of {ok, Res} -> Res; {error, Error} -> throw(Error) @@ -867,13 +912,13 @@ info_down(Q, DownReason) -> info_down(Q, Items, DownReason) -> [{Item, i_down(Item, Q, DownReason)} || Item <- Items]. -i_down(name, #amqqueue{name = Name}, _) -> Name; -i_down(durable, #amqqueue{durable = Dur}, _) -> Dur; -i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD; -i_down(arguments, #amqqueue{arguments = Args}, _) -> Args; -i_down(pid, #amqqueue{pid = QPid}, _) -> QPid; -i_down(recoverable_slaves, #amqqueue{recoverable_slaves = RS}, _) -> RS; -i_down(state, _Q, DownReason) -> DownReason; +i_down(name, Q, _) -> amqqueue:get_name(Q); +i_down(durable, Q, _) -> amqqueue:is_durable(Q); +i_down(auto_delete, Q, _) -> amqqueue:is_auto_delete(Q); +i_down(arguments, Q, _) -> amqqueue:get_arguments(Q); +i_down(pid, Q, _) -> amqqueue:get_pid(Q); +i_down(recoverable_slaves, Q, _) -> amqqueue:get_recoverable_slaves(Q); +i_down(state, _Q, DownReason) -> DownReason; i_down(K, _Q, _DownReason) -> case lists:member(K, rabbit_amqqueue_process:info_keys()) of true -> ''; @@ -919,18 +964,22 @@ info_local(VHostPath) -> map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end). list_local(VHostPath) -> - [ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath), - State =/= crashed, is_local_to_node(QPid, node()) ]. + [Q || Q <- list(VHostPath), + amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())]. -notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) -> +notify_policy_changed(Q) when ?amqqueue_is_classic(Q) -> + QPid = amqqueue:get_pid(Q), gen_server2:cast(QPid, policy_changed); -notify_policy_changed(#amqqueue{pid = QPid, - name = QName}) when ?IS_QUORUM(QPid) -> +notify_policy_changed(Q) when ?amqqueue_is_quorum(Q) -> + QPid = amqqueue:get_pid(Q), + QName = amqqueue:get_name(Q), rabbit_quorum_queue:policy_changed(QName, QPid). -consumers(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) -> +consumers(Q) when ?amqqueue_is_classic(Q) -> + QPid = amqqueue:get_pid(Q), delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}); -consumers(#amqqueue{pid = QPid}) when ?IS_QUORUM(QPid) -> +consumers(Q) when ?amqqueue_is_quorum(Q) -> + QPid = amqqueue:get_pid(Q), {ok, {_, Result}, _} = ra:local_query(QPid, fun rabbit_fifo:query_consumers/1), maps:values(Result). @@ -957,14 +1006,14 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) -> get_queue_consumer_info(Q, ConsumerInfoKeys) -> [lists:zip(ConsumerInfoKeys, - [Q#amqqueue.name, ChPid, CTag, + [amqqueue:get_name(Q), ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args]) || {ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)]. -stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q); -stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}). +stat(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:stat(Q); +stat(Q) -> delegate:invoke(amqqueue:get_pid(Q), {gen_server2, call, [stat, infinity]}). -pid_of(#amqqueue{pid = Pid}) -> Pid. +pid_of(Q) -> amqqueue:get_pid(Q). pid_of(VHost, QueueName) -> case lookup(rabbit_misc:r(VHost, queue, QueueName)) of {ok, Q} -> pid_of(Q); @@ -990,15 +1039,16 @@ delete_immediately_by_resource(Resources) -> || {Resource, QPid} <- Quorum], ok. -delete(#amqqueue{ type = quorum} = Q, - IfUnused, IfEmpty, ActingUser) -> +delete(Q, + IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:delete(Q, IfUnused, IfEmpty, ActingUser); delete(Q, IfUnused, IfEmpty, ActingUser) -> case wait_for_promoted_or_stopped(Q) of - {promoted, #amqqueue{pid = QPid}} -> + {promoted, Q1} -> + QPid = amqqueue:get_pid(Q1), delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}); {stopped, Q1} -> - #resource{name = Name, virtual_host = Vhost} = Q1#amqqueue.name, + #resource{name = Name, virtual_host = Vhost} = amqqueue:get_name(Q1), case IfEmpty of true -> rabbit_log:error("Queue ~s in vhost ~s has its master node down and " @@ -1020,10 +1070,16 @@ delete(Q, IfUnused, IfEmpty, ActingUser) -> {ok, 0} end. --spec wait_for_promoted_or_stopped(#amqqueue{}) -> {promoted, #amqqueue{}} | {stopped, #amqqueue{}} | {error, not_found}. -wait_for_promoted_or_stopped(#amqqueue{name = QName}) -> +-spec wait_for_promoted_or_stopped(amqqueue:amqqueue()) -> + {promoted, amqqueue:amqqueue()} | + {stopped, amqqueue:amqqueue()} | + {error, not_found}. +wait_for_promoted_or_stopped(Q0) -> + QName = amqqueue:get_name(Q0), case lookup(QName) of - {ok, Q = #amqqueue{pid = QPid, slave_pids = SPids}} -> + {ok, Q} -> + QPid = amqqueue:get_pid(Q), + SPids = amqqueue:get_slave_pids(Q), case rabbit_mnesia:is_process_alive(QPid) of true -> {promoted, Q}; false -> @@ -1046,17 +1102,20 @@ wait_for_promoted_or_stopped(#amqqueue{name = QName}) -> delete_crashed(Q) -> delete_crashed(Q, ?INTERNAL_USER). -delete_crashed(#amqqueue{ pid = QPid } = Q, ActingUser) -> - ok = rpc:call(qnode(QPid), ?MODULE, delete_crashed_internal, [Q, ActingUser]). +delete_crashed(Q, ActingUser) -> + ok = rpc:call(amqqueue:qnode(Q), ?MODULE, delete_crashed_internal, [Q, ActingUser]). -delete_crashed_internal(Q = #amqqueue{ name = QName }, ActingUser) -> +delete_crashed_internal(Q, ActingUser) -> + QName = amqqueue:get_name(Q), {ok, BQ} = application:get_env(rabbit, backing_queue_module), BQ:delete_crashed(Q), ok = internal_delete(QName, ActingUser). -purge(#amqqueue{ pid = QPid, type = classic}) -> +purge(Q) when ?amqqueue_is_classic(Q) -> + QPid = amqqueue:get_pid(Q), delegate:invoke(QPid, {gen_server2, call, [purge, infinity]}); -purge(#amqqueue{ pid = NodeId, type = quorum}) -> +purge(Q) when ?amqqueue_is_quorum(Q) -> + NodeId = amqqueue:get_pid(Q), rabbit_quorum_queue:purge(NodeId). @@ -1129,25 +1188,31 @@ activate_limit_all(QRefs, ChPid) -> delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}). -credit(#amqqueue{pid = QPid, type = classic}, ChPid, CTag, Credit, - Drain, QStates) -> +credit(Q, ChPid, CTag, Credit, + Drain, QStates) when ?amqqueue_is_classic(Q) -> + QPid = amqqueue:get_pid(Q), delegate:invoke_no_result(QPid, {gen_server2, cast, [{credit, ChPid, CTag, Credit, Drain}]}), {ok, QStates}; -credit(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum}, +credit(Q, _ChPid, CTag, Credit, - Drain, QStates) -> + Drain, QStates) when ?amqqueue_is_quorum(Q) -> + {Name, _} = Id = amqqueue:get_pid(Q), + QName = amqqueue:get_name(Q), QState0 = get_quorum_state(Id, QName, QStates), {ok, QState} = rabbit_quorum_queue:credit(CTag, Credit, Drain, QState0), {ok, maps:put(Name, QState, QStates)}. -basic_get(#amqqueue{pid = QPid, type = classic}, ChPid, NoAck, LimiterPid, - _CTag, _) -> +basic_get(Q, ChPid, NoAck, LimiterPid, _CTag, _) + when ?amqqueue_is_classic(Q) -> + QPid = amqqueue:get_pid(Q), delegate:invoke(QPid, {gen_server2, call, [{basic_get, ChPid, NoAck, LimiterPid}, infinity]}); -basic_get(#amqqueue{pid = {Name, _} = Id, type = quorum, name = QName} = Q, _ChPid, NoAck, - _LimiterPid, CTag, QStates) -> +basic_get(Q, _ChPid, NoAck, _LimiterPid, CTag, QStates) + when ?amqqueue_is_quorum(Q) -> + {Name, _} = Id = amqqueue:get_pid(Q), + QName = amqqueue:get_name(Q), QState0 = get_quorum_state(Id, QName, QStates), case rabbit_quorum_queue:basic_get(Q, NoAck, CTag, QState0) of {ok, empty, QState} -> @@ -1160,9 +1225,12 @@ basic_get(#amqqueue{pid = {Name, _} = Id, type = quorum, name = QName} = Q, _ChP [rabbit_misc:rs(QName), Reason]) end. -basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid, +basic_consume(Q, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag, - ExclusiveConsume, Args, OkMsg, ActingUser, QState) -> + ExclusiveConsume, Args, OkMsg, ActingUser, QState) + when ?amqqueue_is_classic(Q) -> + QPid = amqqueue:get_pid(Q), + QName = amqqueue:get_name(Q), ok = check_consume_arguments(QName, Args), case delegate:invoke(QPid, {gen_server2, call, @@ -1174,14 +1242,18 @@ basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid, Err -> Err end; -basic_consume(#amqqueue{type = quorum}, _NoAck, _ChPid, +basic_consume(Q, _NoAck, _ChPid, _LimiterPid, true, _ConsumerPrefetchCount, _ConsumerTag, - _ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates) -> + _ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates) + when ?amqqueue_is_quorum(Q) -> {error, global_qos_not_supported_for_queue_type}; -basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q, +basic_consume(Q, NoAck, ChPid, _LimiterPid, _LimiterActive, ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, - ActingUser, QStates) -> + ActingUser, QStates) + when ?amqqueue_is_quorum(Q) -> + {Name, _} = Id = amqqueue:get_pid(Q), + QName = amqqueue:get_name(Q), ok = check_consume_arguments(QName, Args), QState0 = get_quorum_state(Id, QName, QStates), {ok, QState} = rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid, @@ -1192,8 +1264,10 @@ basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q, OkMsg, QState0), {ok, maps:put(Name, QState, QStates)}. -basic_cancel(#amqqueue{pid = QPid, type = classic}, ChPid, ConsumerTag, OkMsg, ActingUser, - QState) -> +basic_cancel(Q, ChPid, ConsumerTag, OkMsg, ActingUser, + QState) + when ?amqqueue_is_classic(Q) -> + QPid = amqqueue:get_pid(Q), case delegate:invoke(QPid, {gen_server2, call, [{basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, infinity]}) of @@ -1201,13 +1275,16 @@ basic_cancel(#amqqueue{pid = QPid, type = classic}, ChPid, ConsumerTag, OkMsg, A {ok, QState}; Err -> Err end; -basic_cancel(#amqqueue{pid = {Name, _} = Id, type = quorum}, ChPid, - ConsumerTag, OkMsg, _ActingUser, QStates) -> +basic_cancel(Q, ChPid, + ConsumerTag, OkMsg, _ActingUser, QStates) + when ?amqqueue_is_quorum(Q) -> + {Name, _} = Id = amqqueue:get_pid(Q), QState0 = get_quorum_state(Id, QStates), {ok, QState} = rabbit_quorum_queue:basic_cancel(ConsumerTag, ChPid, OkMsg, QState0), {ok, maps:put(Name, QState, QStates)}. -notify_decorators(#amqqueue{pid = QPid}) -> +notify_decorators(Q) -> + QPid = amqqueue:get_pid(Q), delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}). notify_sent(QPid, ChPid) -> @@ -1267,10 +1344,10 @@ forget_all_durable(Node) -> mnesia:sync_transaction( fun () -> Qs = mnesia:match_object(rabbit_durable_queue, - #amqqueue{_ = '_'}, write), + amqqueue:pattern_match_all(), write), [forget_node_for_queue(Node, Q) || - #amqqueue{pid = Pid} = Q <- Qs, - is_local_to_node(Pid, Node)], + Q <- Qs, + is_local_to_node(amqqueue:get_pid(Q), Node)], ok end), ok. @@ -1278,26 +1355,30 @@ forget_all_durable(Node) -> %% Try to promote a slave while down - it should recover as a %% master. We try to take the oldest slave here for best chance of %% recovery. -forget_node_for_queue(DeadNode, Q = #amqqueue{type = quorum, - quorum_nodes = QN}) -> +forget_node_for_queue(DeadNode, Q) + when ?amqqueue_is_quorum(Q) -> + QN = amqqueue:get_quorum_nodes(Q), forget_node_for_queue(DeadNode, QN, Q); -forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) -> +forget_node_for_queue(DeadNode, Q) -> + RS = amqqueue:get_recoverable_slaves(Q), forget_node_for_queue(DeadNode, RS, Q). -forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) -> +forget_node_for_queue(_DeadNode, [], Q) -> %% No slaves to recover from, queue is gone. %% Don't process_deletions since that just calls callbacks and we %% are not really up. + Name = amqqueue:get_name(Q), internal_delete1(Name, true); %% Should not happen, but let's be conservative. forget_node_for_queue(DeadNode, [DeadNode | T], Q) -> forget_node_for_queue(DeadNode, T, Q); -forget_node_for_queue(DeadNode, [H|T], #amqqueue{type = Type} = Q) -> +forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) -> + Type = amqqueue:get_type(Q), case {node_permits_offline_promotion(H), Type} of {false, _} -> forget_node_for_queue(DeadNode, T, Q); - {true, classic} -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)}, + {true, classic} -> Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)), ok = mnesia:write(rabbit_durable_queue, Q1, write); {true, quorum} -> ok end. @@ -1329,37 +1410,40 @@ set_maximum_since_use(QPid, Age) -> update_mirroring(QPid) -> ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}). -sync_mirrors(#amqqueue{pid = QPid}) -> +sync_mirrors(Q) when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}); sync_mirrors(QPid) -> delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}). -cancel_sync_mirrors(#amqqueue{pid = QPid}) -> +cancel_sync_mirrors(Q) when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}); cancel_sync_mirrors(QPid) -> delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}). -is_replicated(#amqqueue{type = quorum}) -> +is_replicated(Q) when ?amqqueue_is_quorum(Q) -> true; is_replicated(Q) -> rabbit_mirror_queue_misc:is_mirrored(Q). -is_dead_exclusive(#amqqueue{exclusive_owner = none}) -> +is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) -> false; -is_dead_exclusive(#amqqueue{exclusive_owner = Pid}) when is_pid(Pid) -> +is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) -> + Pid = amqqueue:get_pid(Q), not rabbit_mnesia:is_process_alive(Pid). on_node_up(Node) -> ok = rabbit_misc:execute_mnesia_transaction( fun () -> Qs = mnesia:match_object(rabbit_queue, - #amqqueue{_ = '_'}, write), + amqqueue:pattern_match_all(), write), [maybe_clear_recoverable_node(Node, Q) || Q <- Qs], ok end). -maybe_clear_recoverable_node(Node, - #amqqueue{sync_slave_pids = SPids, - recoverable_slaves = RSs} = Q) -> +maybe_clear_recoverable_node(Node, Q) -> + SPids = amqqueue:get_sync_slave_pids(Q), + RSs = amqqueue:get_recoverable_slaves(Q), case lists:member(Node, RSs) of true -> %% There is a race with @@ -1381,7 +1465,7 @@ maybe_clear_recoverable_node(Node, if DoClearNode -> RSs1 = RSs -- [Node], store_queue( - Q#amqqueue{recoverable_slaves = RSs1}); + amqqueue:set_recoverable_slaves(Q, RSs1)); true -> ok end; false -> @@ -1421,10 +1505,10 @@ partition_queues(T) -> queues_to_delete_when_node_down(NodeDown) -> rabbit_misc:execute_mnesia_transaction(fun () -> - qlc:e(qlc:q([QName || - #amqqueue{name = QName, pid = Pid} = Q <- mnesia:table(rabbit_queue), - qnode(Pid) == NodeDown andalso - not rabbit_mnesia:is_process_alive(Pid) andalso + qlc:e(qlc:q([amqqueue:get_name(Q) || + Q <- mnesia:table(rabbit_queue), + amqqueue:qnode(Q) == NodeDown andalso + not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) andalso (not rabbit_amqqueue:is_replicated(Q) orelse rabbit_amqqueue:is_dead_exclusive(Q))] )) @@ -1454,21 +1538,21 @@ notify_queues_deleted(QueueDeletions) -> QueueDeletions). pseudo_queue(QueueName, Pid) -> - #amqqueue{name = QueueName, - durable = false, - auto_delete = false, - arguments = [], - pid = Pid, - slave_pids = []}. - -immutable(Q) -> Q#amqqueue{pid = none, - slave_pids = none, - sync_slave_pids = none, - recoverable_slaves = none, - gm_pids = none, - policy = none, - decorators = none, - state = none}. + pseudo_queue(QueueName, Pid, false). + +pseudo_queue(QueueName, Pid, Durable) -> + amqqueue:new(QueueName, + Pid, + Durable, + false, + none, % Owner, + [], + undefined, % VHost, + #{user => undefined}, % ActingUser + classic % Type + ). + +immutable(Q) -> amqqueue:set_immutable(Q). deliver(Qs, Delivery) -> deliver(Qs, Delivery, untracked), @@ -1530,17 +1614,26 @@ deliver(Qs, Delivery = #delivery{flow = Flow, {QPids, QuorumPids, QueueState}. qpids([]) -> {[], [], []}; %% optimisation -qpids([#amqqueue{pid = {LocalName, LeaderNode}, type = quorum, name = QName}]) -> +qpids([Q]) when ?amqqueue_is_quorum(Q) -> + QName = amqqueue:get_name(Q), + {LocalName, LeaderNode} = amqqueue:get_pid(Q), {[{{LocalName, LeaderNode}, QName}], [], []}; %% opt -qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> +qpids([Q]) -> + QPid = amqqueue:get_pid(Q), + SPids = amqqueue:get_slave_pids(Q), {[], [QPid], SPids}; %% opt qpids(Qs) -> {QuoPids, MPids, SPids} = - lists:foldl(fun (#amqqueue{pid = QPid, type = quorum, name = QName}, - {QuoPidAcc, MPidAcc, SPidAcc}) -> + lists:foldl(fun (Q, + {QuoPidAcc, MPidAcc, SPidAcc}) + when ?amqqueue_is_quorum(Q) -> + QPid = amqqueue:get_pid(Q), + QName = amqqueue:get_name(Q), {[{QPid, QName} | QuoPidAcc], MPidAcc, SPidAcc}; - (#amqqueue{pid = QPid, slave_pids = SPids}, + (Q, {QuoPidAcc, MPidAcc, SPidAcc}) -> + QPid = amqqueue:get_pid(Q), + SPids = amqqueue:get_slave_pids(Q), {QuoPidAcc, [QPid | MPidAcc], [SPids | SPidAcc]} end, {[], [], []}, Qs), {QuoPids, MPids, lists:append(SPids)}. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5782bc6e23..c6684050dc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -17,6 +17,7 @@ -module(rabbit_amqqueue_process). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl"). +-include("amqqueue.hrl"). -behaviour(gen_server2). @@ -35,7 +36,7 @@ %% Queue's state -record(q, { %% an #amqqueue record - q, + q :: amqqueue:amqqueue(), %% none | {exclusive consumer channel PID, consumer tag} | {single active consumer channel PID, consumer} active_consumer, %% Set to true if a queue has ever had a consumer. @@ -103,7 +104,7 @@ -spec info_keys() -> rabbit_types:info_keys(). -spec init_with_backing_queue_state - (rabbit_types:amqqueue(), atom(), tuple(), any(), + (amqqueue:amqqueue(), atom(), tuple(), any(), [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) -> #q{}. @@ -153,13 +154,13 @@ statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). init(Q) -> process_flag(trap_exit, true), - ?store_proc_name(Q#amqqueue.name), - {ok, init_state(Q#amqqueue{pid = self()}), hibernate, + ?store_proc_name(amqqueue:get_name(Q)), + {ok, init_state(amqqueue:set_pid(Q, self())), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}, ?MODULE}. init_state(Q) -> - SingleActiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-single-active-consumer">>) of + SingleActiveConsumerOn = case rabbit_misc:table_lookup(amqqueue:get_arguments(Q), <<"x-single-active-consumer">>) of {bool, true} -> true; _ -> false end, @@ -175,14 +176,16 @@ init_state(Q) -> single_active_consumer_on = SingleActiveConsumerOn}, rabbit_event:init_stats_timer(State, #q.stats_timer). -init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> +init_it(Recover, From, State = #q{q = Q}) + when ?amqqueue_exclusive_owner_is(Q, none) -> init_it2(Recover, From, State); %% You used to be able to declare an exclusive durable queue. Sadly we %% need to still tidy up after that case, there could be the remnants %% of one left over from an upgrade. So that's why we don't enforce %% Recover = new here. -init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> +init_it(Recover, From, State = #q{q = Q0}) -> + Owner = amqqueue:get_exclusive_owner(Q0), case rabbit_misc:is_process_alive(Owner) of true -> erlang:monitor(process, Owner), init_it2(Recover, From, State); @@ -204,7 +207,9 @@ init_it2(Recover, From, State = #q{q = Q, backing_queue_state = undefined}) -> {Barrier, TermsOrNew} = recovery_status(Recover), case rabbit_amqqueue:internal_declare(Q, Recover /= new) of - {Res, #amqqueue{} = Q1} when Res == created orelse Res == existing -> + {Res, Q1} + when ?is_amqqueue(Q1) andalso + (Res == created orelse Res == existing) -> case matches(Recover, Q, Q1) of true -> ok = file_handle_cache:register_callback( @@ -240,13 +245,14 @@ send_reply(From, Q) -> gen_server2:reply(From, Q). matches(new, Q1, Q2) -> %% i.e. not policy - Q1#amqqueue.name =:= Q2#amqqueue.name andalso - Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso - Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso - Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso - Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso - Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids; + amqqueue:get_name(Q1) =:= amqqueue:get_name(Q2) andalso + amqqueue:is_durable(Q1) =:= amqqueue:is_durable(Q2) andalso + amqqueue:is_auto_delete(Q1) =:= amqqueue:is_auto_delete(Q2) andalso + amqqueue:get_exclusive_owner(Q1) =:= amqqueue:get_exclusive_owner(Q2) andalso + amqqueue:get_arguments(Q1) =:= amqqueue:get_arguments(Q2) andalso + amqqueue:get_pid(Q1) =:= amqqueue:get_pid(Q2) andalso + amqqueue:get_slave_pids(Q1) =:= amqqueue:get_slave_pids(Q2); +%% FIXME: Should v1 vs. v2 of the same record match? matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. @@ -259,8 +265,9 @@ recovery_barrier(BarrierPid) -> {'DOWN', MRef, process, _, _} -> ok end. -init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, +init_with_backing_queue_state(Q, BQ, BQS, RateTRef, Deliveries, Senders, MTC) -> + Owner = amqqueue:get_exclusive_owner(Q), case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -278,14 +285,15 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, notify_decorators(startup, State3), State3. -terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) -> +terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) -> + QName = amqqueue:get_name(Q0), rabbit_core_metrics:queue_deleted(qname(State)), terminate_shutdown( fun (BQS) -> rabbit_misc:execute_mnesia_transaction( fun() -> [Q] = mnesia:read({rabbit_queue, QName}), - Q2 = Q#amqqueue{state = stopped}, + Q2 = amqqueue:set_state(Q, stopped), rabbit_amqqueue:store_queue(Q2) end), BQ:terminate(R, BQS) @@ -309,7 +317,7 @@ terminate(normal, State) -> %% delete case %% If we crashed don't try to clean up the BQS, probably best to leave it. terminate(_Reason, State = #q{q = Q}) -> terminate_shutdown(fun (BQS) -> - Q2 = Q#amqqueue{state = crashed}, + Q2 = amqqueue:set_state(Q, crashed), rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:store_queue(Q2) @@ -318,9 +326,10 @@ terminate(_Reason, State = #q{q = Q}) -> end, State). terminate_delete(EmitStats, Reason0, - State = #q{q = #amqqueue{name = QName}, + State = #q{q = Q, backing_queue = BQ, status = Status}) -> + QName = amqqueue:get_name(Q), ActingUser = terminated_by(Status), fun (BQS) -> Reason = case Reason0 of @@ -389,7 +398,8 @@ notify_decorators(State = #q{consumers = Consumers, decorator_callback(QName, F, A) -> %% Look up again in case policy and hence decorators have changed case rabbit_amqqueue:lookup(QName) of - {ok, Q = #amqqueue{decorators = Ds}} -> + {ok, Q} -> + Ds = amqqueue:get_decorators(Q), [ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)]; {error, not_found} -> ok @@ -418,7 +428,8 @@ process_args_policy(State = #q{q = Q, Fun(args_policy_lookup(Name, Resolve, Q), StateN) end, State#q{args_policy_version = N + 1}, ArgsTable)). -args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) -> +args_policy_lookup(Name, Resolve, Q) -> + Args = amqqueue:get_arguments(Q), AName = <<"x-", Name/binary>>, case {rabbit_policy:get(Name, Q), rabbit_misc:table_lookup(Args, AName)} of {undefined, undefined} -> undefined; @@ -442,7 +453,8 @@ init_ttl(TTL, State) -> (init_ttl(undefined, State))#q{ttl = TTL}. init_dlx(undefined, State) -> State#q{dlx = undefined}; -init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> +init_dlx(DLX, State = #q{q = Q}) -> + QName = amqqueue:get_name(Q), State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}. init_dlx_rkey(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}. @@ -596,8 +608,9 @@ send_or_record_confirm(#delivery{confirm = true, message = #basic_message { is_persistent = true, id = MsgId}}, - State = #q{q = #amqqueue{durable = true}, - msg_id_to_channel = MTC}) -> + State = #q{q = Q, + msg_id_to_channel = MTC}) + when ?amqqueue_is_durable(Q) -> MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC), {eventually, State#q{msg_id_to_channel = MTC1}}; send_or_record_confirm(#delivery{confirm = true, @@ -815,7 +828,8 @@ possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> run_message_queue(true, State1) end. -should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; +should_auto_delete(#q{q = Q}) + when not ?amqqueue_is_auto_delete(Q) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). @@ -896,7 +910,7 @@ is_unused(_State) -> rabbit_queue_consumers:count() == 0. maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). -qname(#q{q = #amqqueue{name = QName}}) -> QName. +qname(#q{q = Q}) -> amqqueue:get_name(Q). backing_queue_timeout(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1001,17 +1015,18 @@ stop(Reply, State) -> {stop, normal, Reply, State}. infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(name, #q{q = #amqqueue{name = Name}}) -> Name; -i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; -i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; -i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; +i(name, #q{q = Q}) -> amqqueue:get_name(Q); +i(durable, #q{q = Q}) -> amqqueue:is_durable(Q); +i(auto_delete, #q{q = Q}) -> amqqueue:is_auto_delete(Q); +i(arguments, #q{q = Q}) -> amqqueue:get_arguments(Q); i(pid, _) -> self(); -i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> +i(owner_pid, #q{q = Q}) when ?amqqueue_exclusive_owner_is(Q, none) -> ''; -i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> - ExclusiveOwner; -i(exclusive, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> +i(owner_pid, #q{q = Q}) -> + amqqueue:get_exclusive_owner(Q); +i(exclusive, #q{q = Q}) -> + ExclusiveOwner = amqqueue:get_exclusive_owner(Q), is_pid(ExclusiveOwner); i(policy, #q{q = Q}) -> case rabbit_policy:name(Q) of @@ -1061,27 +1076,27 @@ i(consumer_utilisation, #q{consumers = Consumers}) -> i(memory, _) -> {memory, M} = process_info(self(), memory), M; -i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - {ok, Q = #amqqueue{slave_pids = SPids}} = - rabbit_amqqueue:lookup(Name), +i(slave_pids, #q{q = Q0}) -> + Name = amqqueue:get_name(Q0), + {ok, Q} = rabbit_amqqueue:lookup(Name), case rabbit_mirror_queue_misc:is_mirrored(Q) of false -> ''; - true -> SPids + true -> amqqueue:get_slave_pids(Q) end; -i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> - {ok, Q = #amqqueue{sync_slave_pids = SSPids}} = - rabbit_amqqueue:lookup(Name), +i(synchronised_slave_pids, #q{q = Q0}) -> + Name = amqqueue:get_name(Q0), + {ok, Q} = rabbit_amqqueue:lookup(Name), case rabbit_mirror_queue_misc:is_mirrored(Q) of false -> ''; - true -> SSPids + true -> amqqueue:get_sync_slave_pids(Q) end; -i(recoverable_slaves, #q{q = #amqqueue{name = Name, - durable = Durable}}) -> - {ok, Q = #amqqueue{recoverable_slaves = Nodes}} = - rabbit_amqqueue:lookup(Name), +i(recoverable_slaves, #q{q = Q0}) -> + Name = amqqueue:get_name(Q0), + Durable = amqqueue:is_durable(Q0), + {ok, Q} = rabbit_amqqueue:lookup(Name), case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of false -> ''; - true -> Nodes + true -> amqqueue:get_recoverable_slaves(Q) end; i(state, #q{status = running}) -> credit_flow:state(); i(state, #q{status = State}) -> State; @@ -1090,7 +1105,8 @@ i(garbage_collection, _State) -> i(reductions, _State) -> {reductions, Reductions} = erlang:process_info(self(), reductions), Reductions; -i(user_who_performed_action, #q{q = #amqqueue{options = Opts}}) -> +i(user_who_performed_action, #q{q = Q}) -> + Opts = amqqueue:get_options(Q), maps:get(user, Opts, ?UNKNOWN_USER); i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:info(Item, BQS). @@ -1174,7 +1190,8 @@ consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) -> {_, _} -> Low end. -prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> +prioritise_info(Msg, _Len, #q{q = Q}) -> + DownPid = amqqueue:get_exclusive_owner(Q), case Msg of {'DOWN', _, process, DownPid, _} -> 8; update_ram_duration -> 8; @@ -1225,7 +1242,8 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, - State = #q{q = #amqqueue{name = QName}}) -> + State = #q{q = Q}) -> + QName = amqqueue:get_name(Q), AckRequired = not NoAck, State1 = ensure_expiry_timer(State), case fetch(AckRequired, State1) of @@ -1546,7 +1564,8 @@ handle_cast(notify_decorators, State) -> notify_decorators(State), noreply(State); -handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> +handle_cast(policy_changed, State = #q{q = Q0}) -> + Name = amqqueue:get_name(Q0), %% We depend on the #q.q field being up to date at least WRT %% policy (but not slave pids) in various places, so when it %% changes we go and read it from Mnesia again. @@ -1556,7 +1575,8 @@ handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> {ok, Q} = rabbit_amqqueue:lookup(Name), noreply(process_args_policy(State#q{q = Q})); -handle_cast({sync_start, _, _}, State = #q{q = #amqqueue{name = Name}}) -> +handle_cast({sync_start, _, _}, State = #q{q = Q}) -> + Name = amqqueue:get_name(Q), %% Only a slave should receive this, it means we are a duplicated master rabbit_mirror_queue_misc:log_warning( Name, "Stopping after receiving sync_start from another master", []), @@ -1587,7 +1607,7 @@ handle_info(emit_stats, State) -> {noreply, State1, Timeout}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, - State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> + State = #q{q = Q}) when ?amqqueue_exclusive_owner_is(Q, DownPid) -> %% Exclusively owned queues must disappear with their owner. In %% the case of clean shutdown we delete the queue synchronously in %% the reader - although not required by the spec this seems to @@ -1665,21 +1685,23 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). log_delete_exclusive({ConPid, _ConRef}, State) -> log_delete_exclusive(ConPid, State); -log_delete_exclusive(ConPid, #q{ q = #amqqueue{ name = Resource } }) -> +log_delete_exclusive(ConPid, #q{ q = Q }) -> + Resource = amqqueue:get_name(Q), #resource{ name = QName, virtual_host = VHost } = Resource, rabbit_log_queue:debug("Deleting exclusive queue '~s' in vhost '~s' " ++ "because its declaring connection ~p was closed", [QName, VHost, ConPid]). -log_auto_delete(Reason, #q{ q = #amqqueue{ name = Resource } }) -> +log_auto_delete(Reason, #q{ q = Q }) -> + Resource = amqqueue:get_name(Q), #resource{ name = QName, virtual_host = VHost } = Resource, rabbit_log_queue:debug("Deleting auto-delete queue '~s' in vhost '~s' " ++ Reason, [QName, VHost]). needs_update_mirroring(Q, Version) -> - {ok, UpQ} = rabbit_amqqueue:lookup(Q#amqqueue.name), - DBVersion = UpQ#amqqueue.policy_version, + {ok, UpQ} = rabbit_amqqueue:lookup(amqqueue:get_name(Q)), + DBVersion = amqqueue:get_policy_version(UpQ), case DBVersion > Version of true -> {rabbit_policy:get(<<"ha-mode">>, UpQ), DBVersion}; false -> false diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 54a97b717e..7c5a70b529 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -26,7 +26,7 @@ %%---------------------------------------------------------------------------- --spec start_link(rabbit_types:amqqueue(), rabbit_prequeue:start_mode()) -> +-spec start_link(amqqueue:amqqueue(), rabbit_prequeue:start_mode()) -> {'ok', pid(), pid()}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index d8045276c6..7e40e54aca 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -32,7 +32,7 @@ -spec start_link() -> rabbit_types:ok_pid_or_error(). -spec start_queue_process - (node(), rabbit_types:amqqueue(), 'declare' | 'recovery' | 'slave') -> + (node(), amqqueue:amqqueue(), 'declare' | 'recovery' | 'slave') -> pid(). %%---------------------------------------------------------------------------- @@ -41,7 +41,7 @@ start_link() -> supervisor2:start_link(?MODULE, []). start_queue_process(Node, Q, StartMode) -> - #amqqueue{name = #resource{virtual_host = VHost}} = Q, + #resource{virtual_host = VHost} = amqqueue:get_name(Q), {ok, Sup} = find_for_vhost(VHost, Node), {ok, _SupPid, QPid} = supervisor2:start_child(Sup, [Q, StartMode]), QPid. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl new file mode 100644 index 0000000000..abd8635720 --- /dev/null +++ b/src/rabbit_backing_queue.erl @@ -0,0 +1,273 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_backing_queue). + +-export([info_keys/0]). + +-define(INFO_KEYS, [messages_ram, messages_ready_ram, + messages_unacknowledged_ram, messages_persistent, + message_bytes, message_bytes_ready, + message_bytes_unacknowledged, message_bytes_ram, + message_bytes_persistent, head_message_timestamp, + disk_reads, disk_writes, backing_queue_status, + messages_paged_out, message_bytes_paged_out]). + +%% We can't specify a per-queue ack/state with callback signatures +-type ack() :: any(). +-type state() :: any(). + +-type flow() :: 'flow' | 'noflow'. +-type msg_ids() :: [rabbit_types:msg_id()]. +-type publish() :: {rabbit_types:basic_message(), + rabbit_types:message_properties(), boolean()}. +-type delivered_publish() :: {rabbit_types:basic_message(), + rabbit_types:message_properties()}. +-type fetch_result(Ack) :: + ('empty' | {rabbit_types:basic_message(), boolean(), Ack}). +-type drop_result(Ack) :: + ('empty' | {rabbit_types:msg_id(), Ack}). +-type recovery_terms() :: [term()] | 'non_clean_shutdown'. +-type recovery_info() :: 'new' | recovery_terms(). +-type purged_msg_count() :: non_neg_integer(). +-type async_callback() :: + fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok'). +-type duration() :: ('undefined' | 'infinity' | number()). + +-type msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A). +-type msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean()). + +-type queue_mode() :: atom(). + +-spec info_keys() -> rabbit_types:info_keys(). + +%% Called on startup with a vhost and a list of durable queue names on this vhost. +%% The queues aren't being started at this point, but this call allows the +%% backing queue to perform any checking necessary for the consistency +%% of those queues, or initialise any other shared resources. +%% +%% The list of queue recovery terms returned as {ok, Terms} must be given +%% in the same order as the list of queue names supplied. +-callback start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()). + +%% Called to tear down any state/resources for vhost. NB: Implementations should +%% not depend on this function being called on shutdown and instead +%% should hook into the rabbit supervision hierarchy. +-callback stop(rabbit_types:vhost()) -> 'ok'. + +%% Initialise the backing queue and its state. +%% +%% Takes +%% 1. the amqqueue record +%% 2. a term indicating whether the queue is an existing queue that +%% should be recovered or not. When 'new' is given, no recovery is +%% taking place, otherwise a list of recovery terms is given, or +%% the atom 'non_clean_shutdown' if no recovery terms are available. +%% 3. an asynchronous callback which accepts a function of type +%% backing-queue-state to backing-queue-state. This callback +%% function can be safely invoked from any process, which makes it +%% useful for passing messages back into the backing queue, +%% especially as the backing queue does not have control of its own +%% mailbox. +-callback init(amqqueue:amqqueue(), recovery_info(), + async_callback()) -> state(). + +%% Called on queue shutdown when queue isn't being deleted. +-callback terminate(any(), state()) -> state(). + +%% Called when the queue is terminating and needs to delete all its +%% content. +-callback delete_and_terminate(any(), state()) -> state(). + +%% Called to clean up after a crashed queue. In this case we don't +%% have a process and thus a state(), we are just removing on-disk data. +-callback delete_crashed(amqqueue:amqqueue()) -> 'ok'. + +%% Remove all 'fetchable' messages from the queue, i.e. all messages +%% except those that have been fetched already and are pending acks. +-callback purge(state()) -> {purged_msg_count(), state()}. + +%% Remove all messages in the queue which have been fetched and are +%% pending acks. +-callback purge_acks(state()) -> state(). + +%% Publish a message. +-callback publish(rabbit_types:basic_message(), + rabbit_types:message_properties(), boolean(), pid(), flow(), + state()) -> state(). + +%% Like publish/6 but for batches of publishes. +-callback batch_publish([publish()], pid(), flow(), state()) -> state(). + +%% Called for messages which have already been passed straight +%% out to a client. The queue will be empty for these calls +%% (i.e. saves the round trip through the backing queue). +-callback publish_delivered(rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), flow(), + state()) + -> {ack(), state()}. + +%% Like publish_delivered/5 but for batches of publishes. +-callback batch_publish_delivered([delivered_publish()], pid(), flow(), + state()) + -> {[ack()], state()}. + +%% Called to inform the BQ about messages which have reached the +%% queue, but are not going to be further passed to BQ. +-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state(). + +%% Return ids of messages which have been confirmed since the last +%% invocation of this function (or initialisation). +%% +%% Message ids should only appear in the result of drain_confirmed +%% under the following circumstances: +%% +%% 1. The message appears in a call to publish_delivered/4 and the +%% first argument (ack_required) is false; or +%% 2. The message is fetched from the queue with fetch/2 and the first +%% argument (ack_required) is false; or +%% 3. The message is acked (ack/2 is called for the message); or +%% 4. The message is fully fsync'd to disk in such a way that the +%% recovery of the message is guaranteed in the event of a crash of +%% this rabbit node (excluding hardware failure). +%% +%% In addition to the above conditions, a message id may only appear +%% in the result of drain_confirmed if +%% #message_properties.needs_confirming = true when the msg was +%% published (through whichever means) to the backing queue. +%% +%% It is legal for the same message id to appear in the results of +%% multiple calls to drain_confirmed, which means that the backing +%% queue is not required to keep track of which messages it has +%% already confirmed. The confirm will be issued to the publisher the +%% first time the message id appears in the result of +%% drain_confirmed. All subsequent appearances of that message id will +%% be ignored. +-callback drain_confirmed(state()) -> {msg_ids(), state()}. + +%% Drop messages from the head of the queue while the supplied +%% predicate on message properties returns true. Returns the first +%% message properties for which the predictate returned false, or +%% 'undefined' if the whole backing queue was traversed w/o the +%% predicate ever returning false. +-callback dropwhile(msg_pred(), state()) + -> {rabbit_types:message_properties() | undefined, state()}. + +%% Like dropwhile, except messages are fetched in "require +%% acknowledgement" mode and are passed, together with their ack tag, +%% to the supplied function. The function is also fed an +%% accumulator. The result of fetchwhile is as for dropwhile plus the +%% accumulator. +-callback fetchwhile(msg_pred(), msg_fun(A), A, state()) + -> {rabbit_types:message_properties() | undefined, + A, state()}. + +%% Produce the next message. +-callback fetch(true, state()) -> {fetch_result(ack()), state()}; + (false, state()) -> {fetch_result(undefined), state()}. + +%% Remove the next message. +-callback drop(true, state()) -> {drop_result(ack()), state()}; + (false, state()) -> {drop_result(undefined), state()}. + +%% Acktags supplied are for messages which can now be forgotten +%% about. Must return 1 msg_id per Ack, in the same order as Acks. +-callback ack([ack()], state()) -> {msg_ids(), state()}. + +%% Reinsert messages into the queue which have already been delivered +%% and were pending acknowledgement. +-callback requeue([ack()], state()) -> {msg_ids(), state()}. + +%% Fold over messages by ack tag. The supplied function is called with +%% each message, its ack tag, and an accumulator. +-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}. + +%% Fold over all the messages in a queue and return the accumulated +%% results, leaving the queue undisturbed. +-callback fold(fun((rabbit_types:basic_message(), + rabbit_types:message_properties(), + boolean(), A) -> {('stop' | 'cont'), A}), + A, state()) -> {A, state()}. + +%% How long is my queue? +-callback len(state()) -> non_neg_integer(). + +%% Is my queue empty? +-callback is_empty(state()) -> boolean(). + +%% What's the queue depth, where depth = length + number of pending acks +-callback depth(state()) -> non_neg_integer(). + +%% For the next three functions, the assumption is that you're +%% monitoring something like the ingress and egress rates of the +%% queue. The RAM duration is thus the length of time represented by +%% the messages held in RAM given the current rates. If you want to +%% ignore all of this stuff, then do so, and return 0 in +%% ram_duration/1. + +%% The target is to have no more messages in RAM than indicated by the +%% duration and the current queue rates. +-callback set_ram_duration_target(duration(), state()) -> state(). + +%% Optionally recalculate the duration internally (likely to be just +%% update your internal rates), and report how many seconds the +%% messages in RAM represent given the current rates of the queue. +-callback ram_duration(state()) -> {duration(), state()}. + +%% Should 'timeout' be called as soon as the queue process can manage +%% (either on an empty mailbox, or when a timer fires)? +-callback needs_timeout(state()) -> 'false' | 'timed' | 'idle'. + +%% Called (eventually) after needs_timeout returns 'idle' or 'timed'. +%% Note this may be called more than once for each 'idle' or 'timed' +%% returned from needs_timeout +-callback timeout(state()) -> state(). + +%% Called immediately before the queue hibernates. +-callback handle_pre_hibernate(state()) -> state(). + +%% Called when more credit has become available for credit_flow. +-callback resume(state()) -> state(). + +%% Used to help prioritisation in rabbit_amqqueue_process. The rate of +%% inbound messages and outbound messages at the moment. +-callback msg_rates(state()) -> {float(), float()}. + +-callback info(atom(), state()) -> any(). + +%% Passed a function to be invoked with the relevant backing queue's +%% state. Useful for when the backing queue or other components need +%% to pass functions into the backing queue. +-callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state(). + +%% Called prior to a publish or publish_delivered call. Allows the BQ +%% to signal that it's already seen this message, (e.g. it was published +%% or discarded previously) specifying whether to drop the message or reject it. +-callback is_duplicate(rabbit_types:basic_message(), state()) + -> {{true, drop} | {true, reject} | boolean(), state()}. + +-callback set_queue_mode(queue_mode(), state()) -> state(). + +-callback zip_msgs_and_acks(delivered_publish(), + [ack()], Acc, state()) + -> Acc. + +%% Called when rabbit_amqqueue_process receives a message via +%% handle_info and it should be processed by the backing +%% queue +-callback handle_info(term(), state()) -> state(). + +info_keys() -> ?INFO_KEYS. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index bb1c754b5c..2b63b809b8 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -15,7 +15,8 @@ %% -module(rabbit_binding). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -export([recover/0, recover/2, exists/1, add/2, add/3, remove/1, remove/3, list/1]). -export([list_for_source/1, list_for_destination/1, @@ -44,7 +45,7 @@ {'resources_missing', [{'not_found', (rabbit_types:binding_source() | rabbit_types:binding_destination())} | - {'absent', rabbit_types:amqqueue()}]}). + {'absent', amqqueue:amqqueue()}]}). -type bind_ok_or_error() :: 'ok' | bind_errors() | rabbit_types:error( @@ -53,7 +54,7 @@ -type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()). -type inner_fun() :: fun((rabbit_types:exchange(), - rabbit_types:exchange() | rabbit_types:amqqueue()) -> + rabbit_types:exchange() | amqqueue:amqqueue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error())). -type bindings() :: [rabbit_types:binding()]. @@ -393,7 +394,8 @@ remove_transient_for_destination(DstName) -> %%---------------------------------------------------------------------------- durable(#exchange{durable = D}) -> D; -durable(#amqqueue{durable = D}) -> D. +durable(Q) when ?is_amqqueue(Q) -> + amqqueue:is_durable(Q). binding_action(Binding = #binding{source = SrcName, destination = DstName, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 634789adab..dac3037bff 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -51,6 +51,7 @@ -include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -behaviour(gen_server2). @@ -1366,7 +1367,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, error -> %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); - {ok, {Q = #amqqueue{pid = QPid}, _CParams}} -> + {ok, {Q, _CParams}} when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping), QRef = qpid_to_ref(QPid), QCons1 = @@ -1636,7 +1638,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, Username, QueueStates0), Q} end) of - {{ok, QueueStates}, Q = #amqqueue{pid = QPid, name = QName}} -> + {{ok, QueueStates}, Q} when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), + QName = amqqueue:get_name(Q), CM1 = maps:put( ActualConsumerTag, {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}}, @@ -1649,7 +1653,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, true -> consumer_monitor(ActualConsumerTag, State1); false -> State1 end}; - {ok, Q = #amqqueue{pid = QPid, name = QName}} -> + {ok, Q} when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), + QName = amqqueue:get_name(Q), CM1 = maps:put( ActualConsumerTag, {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}}, @@ -1674,7 +1680,8 @@ consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, queue_monitors = QMons, queue_consumers = QCons}) -> - {#amqqueue{pid = QPid}, _} = maps:get(ConsumerTag, ConsumerMapping), + {Q, _} = maps:get(ConsumerTag, ConsumerMapping), + QPid = amqqueue:get_pid(Q), QRef = qpid_to_ref(QPid), CTags1 = case maps:find(QRef, QCons) of {ok, CTags} -> gb_sets:insert(ConsumerTag, CTags); @@ -1782,7 +1789,7 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, destination = DestinationName, key = RoutingKey, args = Arguments}, - fun (_X, Q = #amqqueue{}) -> + fun (_X, Q) when ?is_amqqueue(Q) -> try rabbit_amqqueue:check_exclusive_access(Q, ConnPid) catch exit:Reason -> {error, Reason} end; @@ -1791,9 +1798,9 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, end, Username) of {error, {resources_missing, [{not_found, Name} | _]}} -> - rabbit_misc:not_found(Name); + rabbit_amqqueue:not_found(Name); {error, {resources_missing, [{absent, Q, Reason} | _]}} -> - rabbit_misc:absent(Q, Reason); + rabbit_amqqueue:absent(Q, Reason); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", @@ -1956,8 +1963,9 @@ foreach_per_queue(F, UAL, Acc) -> rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T). consumer_queue_refs(Consumers) -> - lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}} - <- maps:to_list(Consumers)]). + lists:usort([qpid_to_ref(amqqueue:get_pid(Q)) + || {_Key, {Q, _CParams}} <- maps:to_list(Consumers), + amqqueue:is_amqqueue(Q)]). %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for @@ -2011,9 +2019,10 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ %% since alternative algorithms to update queue_names less %% frequently would in fact be more expensive in the common case. {QNames1, QMons1} = - lists:foldl(fun (#amqqueue{pid = QPid, name = QName}, - {QNames0, QMons0}) -> + lists:foldl(fun (Q, {QNames0, QMons0}) when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), QRef = qpid_to_ref(QPid), + QName = amqqueue:get_name(Q), {case maps:is_key(QRef, QNames0) of true -> QNames0; false -> maps:put(QRef, QName, QNames0) @@ -2287,7 +2296,7 @@ handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), case declare_fast_reply_to(StrippedQueueNameBin) of exists -> {ok, QueueName, 0, 1}; - not_found -> rabbit_misc:not_found(QueueName) + not_found -> rabbit_amqqueue:not_found(QueueName) end; handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, @@ -2338,11 +2347,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, end, case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner, Username) of - {new, #amqqueue{pid = QPid}} -> + {new, Q} when ?is_amqqueue(Q) -> %% We need to notify the reader within the channel %% process so that we can be sure there are no %% outstanding exclusive queues being declared as %% the connection shuts down. + QPid = amqqueue:get_pid(Q), ok = case {Owner, CollectorPid} of {none, _} -> ok; {_, none} -> ok; %% Supports call from mgmt API @@ -2357,7 +2367,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, handle_method(Declare, ConnPid, CollectorPid, VHostPath, User); {absent, Q, Reason} -> - rabbit_misc:absent(Q, Reason); + rabbit_amqqueue:absent(Q, Reason); {owner_died, _Q} -> %% Presumably our own days are numbered since the %% connection has died. Pretend the queue exists though, @@ -2365,7 +2375,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {ok, QueueName, 0, 0} end; {error, {absent, Q, Reason}} -> - rabbit_misc:absent(Q, Reason) + rabbit_amqqueue:absent(Q, Reason) end; handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait, @@ -2373,9 +2383,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, ConnPid, _CollectorPid, VHostPath, _User) -> StrippedQueueNameBin = strip_cr_lf(QueueNameBin), QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), - {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = - rabbit_amqqueue:with_or_die( - QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end), + Fun = fun (Q0) -> + QStat = maybe_stat(NoWait, Q0), + {QStat, Q0} + end, + %% Note: no need to check if Q is an #amqqueue, with_or_die does it + {{ok, MessageCount, ConsumerCount}, Q} = rabbit_amqqueue:with_or_die(QueueName, Fun), ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), {ok, QueueName, MessageCount, ConsumerCount}; handle_method(#'queue.delete'{queue = QueueNameBin, @@ -2399,7 +2412,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, {ok, 0}; ({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username), {ok, 0}; - ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) + ({absent, Q, Reason}) -> rabbit_amqqueue:absent(Q, Reason) end) of {error, in_use} -> precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]); diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl new file mode 100644 index 0000000000..158eca4df8 --- /dev/null +++ b/src/rabbit_core_ff.erl @@ -0,0 +1,53 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_core_ff). + +-export([quorum_queue_migration/3]). + +-rabbit_feature_flag( + {quorum_queue, + #{desc => "Support queues of type `quorum`", + doc_url => "http://www.rabbitmq.com/quorum-queues.html", + stability => stable, + migration_fun => {?MODULE, quorum_queue_migration} + }}). + +quorum_queue_migration(FeatureName, _FeatureProps, enable) -> + Tables = [rabbit_queue, + rabbit_durable_queue], + rabbit_table:wait(Tables), + Fields = amqqueue:fields(amqqueue_v2), + migrate_to_amqqueue_with_type(FeatureName, Tables, Fields); +quorum_queue_migration(_FeatureName, _FeatureProps, is_enabled) -> + Fields = amqqueue:fields(amqqueue_v2), + mnesia:table_info(rabbit_queue, attributes) =:= Fields andalso + mnesia:table_info(rabbit_durable_queue, attributes) =:= Fields. + +migrate_to_amqqueue_with_type(FeatureName, [Table | Rest], Fields) -> + rabbit_log:info("Feature flag `~s`: migrating Mnesia table ~s...", + [FeatureName, Table]), + Fun = fun(Queue) -> amqqueue:upgrade_to(amqqueue_v2, Queue) end, + case mnesia:transform_table(Table, Fun, Fields) of + {atomic, ok} -> migrate_to_amqqueue_with_type(FeatureName, + Rest, + Fields); + {aborted, Reason} -> {error, Reason} + end; +migrate_to_amqqueue_with_type(FeatureName, [], _) -> + rabbit_log:info("Feature flag `~s`: Mnesia tables migration done", + [FeatureName]), + ok. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 9c879ad041..2480e7c3d0 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -259,7 +259,7 @@ lookup(Name) -> lookup_or_die(Name) -> case lookup(Name) of {ok, X} -> X; - {error, not_found} -> rabbit_misc:not_found(Name) + {error, not_found} -> rabbit_amqqueue:not_found(Name) end. list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}). diff --git a/src/rabbit_fhc_helpers.erl b/src/rabbit_fhc_helpers.erl index 90833795da..57dbd981f1 100644 --- a/src/rabbit_fhc_helpers.erl +++ b/src/rabbit_fhc_helpers.erl @@ -18,7 +18,7 @@ -export([clear_read_cache/0]). --include("rabbit.hrl"). % For #amqqueue record definition. +-include("amqqueue.hrl"). clear_read_cache() -> case application:get_env(rabbit, fhc_read_buffering) of @@ -37,7 +37,9 @@ clear_vhost_read_cache([VHost | Rest]) -> clear_queue_read_cache([]) -> ok; -clear_queue_read_cache([#amqqueue{pid = MPid, slave_pids = SPids} | Rest]) -> +clear_queue_read_cache([Q | Rest]) when ?is_amqqueue(Q) -> + MPid = amqqueue:get_pid(Q), + SPids = amqqueue:get_slave_pids(Q), %% Limit the action to the current node. Pids = [P || P <- [MPid | SPids], node(P) =:= node()], %% This function is executed in the context of the backing queue diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index f2426e156f..9e5bddb28a 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -26,7 +26,8 @@ -behaviour(gen_server2). -behaviour(gm). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -include("gm_specs.hrl"). -record(state, { q, @@ -37,7 +38,7 @@ }). -spec start_link - (rabbit_types:amqqueue(), pid() | 'undefined', + (amqqueue:amqqueue(), pid() | 'undefined', rabbit_mirror_queue_master:death_fun(), rabbit_mirror_queue_master:depth_fun()) -> rabbit_types:ok_pid_or_error(). @@ -319,7 +320,8 @@ ensure_monitoring(CPid, Pids) -> %% gen_server %% --------------------------------------------------------------------------- -init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) -> +init([Q, GM, DeathFun, DepthFun]) when ?is_amqqueue(Q) -> + QueueName = amqqueue:get_name(Q), ?store_proc_name(QueueName), GM1 = case GM of undefined -> @@ -345,9 +347,9 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) -> handle_call(get_gm, _From, State = #state { gm = GM }) -> reply(GM, State). -handle_cast({gm_deaths, DeadGMPids}, - State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) - when node(MPid) =:= node() -> +handle_cast({gm_deaths, DeadGMPids}, State = #state{q = Q}) when ?amqqueue_pid_runs_on_local_node(Q) -> + QueueName = amqqueue:get_name(Q), + MPid = amqqueue:get_pid(Q), case rabbit_mirror_queue_misc:remove_from_queue( QueueName, MPid, DeadGMPids) of {ok, MPid, DeadPids, ExtraNodes} -> @@ -373,14 +375,15 @@ handle_cast({gm_deaths, DeadGMPids}, error(unexpected_mirrored_state) end; -handle_cast(request_depth, State = #state { depth_fun = DepthFun, - q = #amqqueue { name = QName, pid = MPid }}) -> +handle_cast(request_depth, State = #state{depth_fun = DepthFun, q = QArg}) when ?is_amqqueue(QArg) -> + QName = amqqueue:get_name(QArg), + MPid = amqqueue:get_pid(QArg), case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{ pid = MPid }} -> - ok = DepthFun(), - noreply(State); - _ -> - {stop, shutdown, State} + {ok, QFound} when ?amqqueue_pid_equals(QFound, MPid) -> + ok = DepthFun(), + noreply(State); + _ -> + {stop, shutdown, State} end; handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4cc6856442..a5736a36fc 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -34,7 +34,8 @@ -behaviour(rabbit_backing_queue). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -record(state, { name, gm, @@ -68,7 +69,7 @@ -spec sender_death_fun() -> death_fun(). -spec depth_fun() -> depth_fun(). --spec init_with_existing_bq(rabbit_types:amqqueue(), atom(), any()) -> +-spec init_with_existing_bq(amqqueue:amqqueue(), atom(), any()) -> master_state(). -spec stop_mirroring(master_state()) -> {atom(), any()}. -spec sync_mirrors(stats_fun(), stats_fun(), master_state()) -> @@ -100,43 +101,46 @@ init(Q, Recover, AsyncCallback) -> ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), State. -init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> +init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) -> + QName = amqqueue:get_name(Q0), case rabbit_mirror_queue_coordinator:start_link( - Q, undefined, sender_death_fun(), depth_fun()) of - {ok, CPid} -> - GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - Self = self(), - ok = rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue{gm_pids = GMPids}] - = mnesia:read({rabbit_queue, QName}), - ok = rabbit_amqqueue:store_queue( - Q1#amqqueue{gm_pids = [{GM, Self} | GMPids], - state = live}) - end), - {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), - %% We need synchronous add here (i.e. do not return until the - %% slave is running) so that when queue declaration is finished - %% all slaves are up; we don't want to end up with unsynced slaves - %% just by declaring a new queue. But add can't be synchronous all - %% the time as it can be called by slaves and that's - %% deadlock-prone. - rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), - #state { name = QName, - gm = GM, - coordinator = CPid, - backing_queue = BQ, - backing_queue_state = BQS, - seen_status = #{}, - confirmed = [], - known_senders = sets:new(), - wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }; - {error, Reason} -> - %% The GM can shutdown before the coordinator has started up - %% (lost membership or missing group), thus the start_link of - %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process - % is trapping exists - throw({coordinator_not_started, Reason}) + Q0, undefined, sender_death_fun(), depth_fun()) of + {ok, CPid} -> + GM = rabbit_mirror_queue_coordinator:get_gm(CPid), + Self = self(), + Fun = fun () -> + [Q1] = mnesia:read({rabbit_queue, QName}), + true = amqqueue:is_amqqueue(Q1), + GMPids0 = amqqueue:get_gm_pids(Q1), + GMPids1 = [{GM, Self} | GMPids0], + Q2 = amqqueue:set_gm_pids(Q1, GMPids1), + Q3 = amqqueue:set_state(Q2, live), + ok = rabbit_amqqueue:store_queue(Q3) + end, + ok = rabbit_misc:execute_mnesia_transaction(Fun), + {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), + %% We need synchronous add here (i.e. do not return until the + %% slave is running) so that when queue declaration is finished + %% all slaves are up; we don't want to end up with unsynced slaves + %% just by declaring a new queue. But add can't be synchronous all + %% the time as it can be called by slaves and that's + %% deadlock-prone. + rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), + #state{name = QName, + gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = #{}, + confirmed = [], + known_senders = sets:new(), + wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000)}; + {error, Reason} -> + %% The GM can shutdown before the coordinator has started up + %% (lost membership or missing group), thus the start_link of + %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process + % is trapping exists + throw({coordinator_not_started, Reason}) end. stop_mirroring(State = #state { coordinator = CPid, @@ -156,7 +160,8 @@ sync_mirrors(HandleInfo, EmitStats, QName, "Synchronising: " ++ Fmt ++ "~n", Params) end, Log("~p messages to synchronise", [BQ:len(BQS)]), - {ok, #amqqueue{slave_pids = SPids} = Q} = rabbit_amqqueue:lookup(QName), + {ok, Q} = rabbit_amqqueue:lookup(QName), + SPids = amqqueue:get_slave_pids(Q), SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q), Log("batch size: ~p", [SyncBatchSize]), Ref = make_ref(), @@ -193,8 +198,8 @@ terminate(Reason, %% Backing queue termination. The queue is going down but %% shouldn't be deleted. Most likely safe shutdown of this %% node. - {ok, Q = #amqqueue{sync_slave_pids = SSPids}} = - rabbit_amqqueue:lookup(QName), + {ok, Q} = rabbit_amqqueue:lookup(QName), + SSPids = amqqueue:get_sync_slave_pids(Q), case SSPids =:= [] andalso rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of true -> %% Remove the whole queue to avoid data loss @@ -213,7 +218,8 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) -> - {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), + {ok, Q} = rabbit_amqqueue:lookup(QName), + SPids = amqqueue:get_slave_pids(Q), rabbit_mirror_queue_misc:stop_all_slaves(Reason, SPids, QName, GM, WT). purge(State = #state { gm = GM, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index ac933d4df4..9fc70527c0 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -31,7 +31,8 @@ %% for testing only -export([module/1]). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -define(HA_NODES_MODULE, rabbit_mirror_queue_mode_nodes). @@ -61,18 +62,18 @@ {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}. -spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') -> 'ok'. --spec store_updated_slaves(rabbit_types:amqqueue()) -> - rabbit_types:amqqueue(). --spec initial_queue_node(rabbit_types:amqqueue(), node()) -> node(). --spec suggested_queue_nodes(rabbit_types:amqqueue()) -> +-spec store_updated_slaves(amqqueue:amqqueue()) -> + amqqueue:amqqueue(). +-spec initial_queue_node(amqqueue:amqqueue(), node()) -> node(). +-spec suggested_queue_nodes(amqqueue:amqqueue()) -> {node(), [node()]}. --spec is_mirrored(rabbit_types:amqqueue()) -> boolean(). +-spec is_mirrored(amqqueue:amqqueue()) -> boolean(). -spec update_mirrors - (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'. + (amqqueue:amqqueue(), amqqueue:amqqueue()) -> 'ok'. -spec update_mirrors - (rabbit_types:amqqueue()) -> 'ok'. --spec maybe_drop_master_after_sync(rabbit_types:amqqueue()) -> 'ok'. --spec maybe_auto_sync(rabbit_types:amqqueue()) -> 'ok'. + (amqqueue:amqqueue()) -> 'ok'. +-spec maybe_drop_master_after_sync(amqqueue:amqqueue()) -> 'ok'. +-spec maybe_auto_sync(amqqueue:amqqueue()) -> 'ok'. -spec log_info(rabbit_amqqueue:name(), string(), [any()]) -> 'ok'. -spec log_warning(rabbit_amqqueue:name(), string(), [any()]) -> 'ok'. @@ -86,10 +87,11 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% get here. Or, gm group could've altered. see rabbitmq-server#914 case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - slave_pids = SPids, - sync_slave_pids = SyncSPids, - gm_pids = GMPids }] -> + [Q0] when ?is_amqqueue(Q0) -> + QPid = amqqueue:get_pid(Q0), + SPids = amqqueue:get_slave_pids(Q0), + SyncSPids = amqqueue:get_sync_slave_pids(Q0), + GMPids = amqqueue:get_gm_pids(Q0), {DeadGM, AliveGM} = lists:partition( fun ({GM, _}) -> lists:member(GM, DeadGMPids) @@ -109,7 +111,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> _ -> promote_slave(Alive) end, DoNotPromote = SyncSPids =:= [] andalso - rabbit_policy:get(<<"ha-promote-on-failure">>, Q) =:= <<"when-synced">>, + rabbit_policy:get(<<"ha-promote-on-failure">>, Q0) =:= <<"when-synced">>, case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> {ok, QPid1, DeadPids, []}; @@ -124,23 +126,23 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% we're ok to update mnesia; or we have %% become the master. If gm altered, %% we have no choice but to proceed. - Q1 = Q#amqqueue{pid = QPid1, - slave_pids = SPids1, - gm_pids = AliveGM}, - store_updated_slaves(Q1), + Q1 = amqqueue:set_pid(Q0, QPid1), + Q2 = amqqueue:set_slave_pids(Q1, SPids1), + Q3 = amqqueue:set_gm_pids(Q2, AliveGM), + store_updated_slaves(Q3), %% If we add and remove nodes at the %% same time we might tell the old %% master we need to sync and then %% shut it down. So let's check if %% the new master needs to sync. - maybe_auto_sync(Q1), - {ok, QPid1, DeadPids, slaves_to_start_on_failure(Q1, DeadGMPids)}; + maybe_auto_sync(Q3), + {ok, QPid1, DeadPids, slaves_to_start_on_failure(Q3, DeadGMPids)}; _ -> %% Master has changed, and we're not it. %% [1]. - Q1 = Q#amqqueue{slave_pids = Alive, - gm_pids = AliveGM}, - store_updated_slaves(Q1), + Q1 = amqqueue:set_slave_pids(Q0, Alive), + Q2 = amqqueue:set_gm_pids(Q1, AliveGM), + store_updated_slaves(Q2), {ok, QPid1, DeadPids, []} end end @@ -185,13 +187,12 @@ on_vhost_up(VHost) -> fun () -> mnesia:foldl( fun - (#amqqueue{name = #resource{virtual_host = OtherVhost}}, - QNames0) when OtherVhost =/= VHost -> + (Q, QNames0) when not ?amqqueue_vhost_equals(Q, VHost) -> QNames0; - (Q = #amqqueue{name = QName, - pid = Pid, - slave_pids = SPids, - type = classic}, QNames0) -> + (Q, QNames0) when ?amqqueue_is_classic(Q) -> + QName = amqqueue:get_name(Q), + Pid = amqqueue:get_pid(Q), + SPids = amqqueue:get_slave_pids(Q), %% We don't want to pass in the whole %% cluster - we don't want a situation %% where starting one node causes us to @@ -221,7 +222,10 @@ drop_mirrors(QName, Nodes) -> drop_mirror(QName, MirrorNode) -> case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids }} -> + {ok, Q} when ?is_amqqueue(Q) -> + Name = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), + SPids = amqqueue:get_slave_pids(Q), case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> {error, {queue_not_mirrored_on_node, MirrorNode}}; @@ -247,7 +251,7 @@ add_mirror(QName, MirrorNode, SyncMode) -> rabbit_misc:with_exit_handler( rabbit_misc:const(ok), fun () -> - #amqqueue{name = #resource{virtual_host = VHost}} = Q, + #resource{virtual_host = VHost} = amqqueue:get_name(Q), case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of {ok, _} -> SPid = rabbit_amqqueue_sup_sup:start_queue_process( @@ -285,19 +289,21 @@ log_warning(QName, Fmt, Args) -> rabbit_log_mirroring:warning("Mirrored ~s: " ++ Fmt, [rabbit_misc:rs(QName) | Args]). -store_updated_slaves(Q = #amqqueue{slave_pids = SPids, - sync_slave_pids = SSPids, - recoverable_slaves = RS}) -> +store_updated_slaves(Q0) when ?is_amqqueue(Q0) -> + SPids = amqqueue:get_slave_pids(Q0), + SSPids = amqqueue:get_sync_slave_pids(Q0), + RS0 = amqqueue:get_recoverable_slaves(Q0), %% TODO now that we clear sync_slave_pids in rabbit_durable_queue, %% do we still need this filtering? SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], - Q1 = Q#amqqueue{sync_slave_pids = SSPids1, - recoverable_slaves = update_recoverable(SPids, RS), - state = live}, - ok = rabbit_amqqueue:store_queue(Q1), + Q1 = amqqueue:set_sync_slave_pids(Q0, SSPids1), + RS1 = update_recoverable(SPids, RS0), + Q2 = amqqueue:set_recoverable_slaves(Q1, RS1), + Q3 = amqqueue:set_state(Q2, live), + ok = rabbit_amqqueue:store_queue(Q3), %% Wake it up so that we emit a stats event - rabbit_amqqueue:notify_policy_changed(Q1), - Q1. + rabbit_amqqueue:notify_policy_changed(Q3), + Q3. %% Recoverable nodes are those which we could promote if the whole %% cluster were to suddenly stop and we then lose the master; i.e. all @@ -346,13 +352,14 @@ stop_all_slaves(Reason, SPids, QName, GM, WaitTimeout) -> %% notice and update Mnesia. But we just removed them all, and %% have stopped listening ourselves. So manually clean up. rabbit_misc:execute_mnesia_transaction(fun () -> - [Q] = mnesia:read({rabbit_queue, QName}), - rabbit_mirror_queue_misc:store_updated_slaves( - Q #amqqueue { gm_pids = [], slave_pids = [], - %% Restarted slaves on running nodes can - %% ensure old incarnations are stopped using - %% the pending slave pids. - slave_pids_pending_shutdown = PendingSlavePids}) + [Q0] = mnesia:read({rabbit_queue, QName}), + Q1 = amqqueue:set_gm_pids(Q0, []), + Q2 = amqqueue:set_slave_pids(Q1, []), + %% Restarted slaves on running nodes can + %% ensure old incarnations are stopped using + %% the pending slave pids. + Q3 = amqqueue:set_slave_pids_pending_shutdown(Q2, PendingSlavePids), + rabbit_mirror_queue_misc:store_updated_slaves(Q3) end), ok = gm:forget_group(QName). @@ -373,7 +380,8 @@ suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All). %% The third argument exists so we can pull a call to %% rabbit_mnesia:cluster_nodes(running) out of a loop or transaction %% or both. -suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, DefNode, All) -> +suggested_queue_nodes(Q, DefNode, All) when ?is_amqqueue(Q) -> + Owner = amqqueue:get_exclusive_owner(Q), {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q), MNode = case MNode0 of none -> DefNode; @@ -395,7 +403,7 @@ policy(Policy, Q) -> P -> P end. -module(#amqqueue{} = Q) -> +module(Q) when ?is_amqqueue(Q) -> case rabbit_policy:get(<<"ha-mode">>, Q) of undefined -> not_mirrored; Mode -> module(Mode) @@ -430,16 +438,18 @@ is_mirrored_ha_nodes(Q) -> _ -> false end. -actual_queue_nodes(#amqqueue{pid = MPid, - slave_pids = SPids, - sync_slave_pids = SSPids}) -> +actual_queue_nodes(Q) when ?is_amqqueue(Q) -> + MPid = amqqueue:get_pid(Q), + SPids = amqqueue:get_slave_pids(Q), + SSPids = amqqueue:get_sync_slave_pids(Q), Nodes = fun (L) -> [node(Pid) || Pid <- L] end, {case MPid of none -> none; _ -> node(MPid) end, Nodes(SPids), Nodes(SSPids)}. -maybe_auto_sync(Q = #amqqueue{pid = QPid}) -> +maybe_auto_sync(Q) when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), case policy(<<"ha-sync-mode">>, Q) of <<"automatic">> -> spawn(fun() -> rabbit_amqqueue:sync_mirrors(QPid) end); @@ -447,23 +457,27 @@ maybe_auto_sync(Q = #amqqueue{pid = QPid}) -> ok end. -sync_queue(Q) -> - rabbit_amqqueue:with( - Q, fun(#amqqueue{pid = QPid, type = classic}) -> - rabbit_amqqueue:sync_mirrors(QPid); - (#amqqueue{type = quorum}) -> - {error, quorum_queue_not_supported} - end). - -cancel_sync_queue(Q) -> - rabbit_amqqueue:with( - Q, fun(#amqqueue{pid = QPid, type = classic}) -> - rabbit_amqqueue:cancel_sync_mirrors(QPid); - (#amqqueue{type = quorum}) -> - {error, quorum_queue_not_supported} - end). - -sync_batch_size(#amqqueue{} = Q) -> +sync_queue(Q0) -> + F = fun + (Q) when ?amqqueue_is_classic(Q) -> + QPid = amqqueue:get_pid(Q), + rabbit_amqqueue:sync_mirrors(QPid); + (Q) when ?amqqueue_is_quorum(Q) -> + {error, quorum_queue_not_supported} + end, + rabbit_amqqueue:with(Q0, F). + +cancel_sync_queue(Q0) -> + F = fun + (Q) when ?amqqueue_is_classic(Q) -> + QPid = amqqueue:get_pid(Q), + rabbit_amqqueue:cancel_sync_mirrors(QPid); + (Q) when ?amqqueue_is_quorum(Q) -> + {error, quorum_queue_not_supported} + end, + rabbit_amqqueue:with(Q0, F). + +sync_batch_size(Q) when ?is_amqqueue(Q) -> case policy(<<"ha-sync-batch-size">>, Q) of none -> %% we need this case because none > 1 == true default_batch_size(); @@ -479,14 +493,17 @@ default_batch_size() -> rabbit_misc:get_env(rabbit, mirroring_sync_batch_size, ?DEFAULT_BATCH_SIZE). -update_mirrors(OldQ = #amqqueue{pid = QPid}, - NewQ = #amqqueue{pid = QPid}) -> +update_mirrors(OldQ, NewQ) when ?amqqueue_pids_are_equal(OldQ, NewQ) -> + % Note: we do want to ensure both queues have same pid + QPid = amqqueue:get_pid(OldQ), + QPid = amqqueue:get_pid(NewQ), case {is_mirrored(OldQ), is_mirrored(NewQ)} of {false, false} -> ok; _ -> rabbit_amqqueue:update_mirroring(QPid) end. -update_mirrors(Q = #amqqueue{name = QName}) -> +update_mirrors(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), {NewMNode, NewSNodes} = suggested_queue_nodes(Q), OldNodes = [OldMNode | OldSNodes], @@ -512,8 +529,9 @@ update_mirrors(Q = #amqqueue{name = QName}) -> %% We don't just call update_mirrors/2 here since that could decide to %% start a slave for some other reason, and since we are the slave ATM %% that allows complicated deadlocks. -maybe_drop_master_after_sync(Q = #amqqueue{name = QName, - pid = MPid}) -> +maybe_drop_master_after_sync(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + MPid = amqqueue:get_pid(Q), {DesiredMNode, DesiredSNodes} = suggested_queue_nodes(Q), case node(MPid) of DesiredMNode -> ok; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index bf0a5a7ed0..637d49035e 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -35,8 +35,9 @@ -behaviour(gen_server2). -behaviour(gm). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -include("gm_specs.hrl"). %%---------------------------------------------------------------------------- @@ -76,8 +77,9 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -init(Q) -> - ?store_proc_name(Q#amqqueue.name), +init(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + ?store_proc_name(QName), {ok, {not_started, Q}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}, ?MODULE}. @@ -85,7 +87,8 @@ init(Q) -> go(SPid, sync) -> gen_server2:call(SPid, go, infinity); go(SPid, async) -> gen_server2:cast(SPid, go). -handle_go(Q = #amqqueue{name = QName}) -> +handle_go(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -119,10 +122,10 @@ handle_go(Q = #amqqueue{name = QName}) -> ok = rabbit_memory_monitor:register( Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), - Q1 = Q #amqqueue { pid = QPid }, + QPid = amqqueue:get_pid(Q), _ = BQ:delete_crashed(Q), %% For crash recovery - BQS = bq_init(BQ, Q1, new), - State = #state { q = Q1, + BQS = bq_init(BQ, Q, new), + State = #state { q = Q, gm = GM, backing_queue = BQ, backing_queue_state = BQS, @@ -139,7 +142,7 @@ handle_go(Q = #amqqueue{name = QName}) -> }, ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), - rabbit_mirror_queue_misc:maybe_auto_sync(Q1), + rabbit_mirror_queue_misc:maybe_auto_sync(Q), {ok, State}; {stale, StalePid} -> rabbit_mirror_queue_misc:log_warning( @@ -163,8 +166,11 @@ handle_go(Q = #amqqueue{name = QName}) -> init_it(Self, GM, Node, QName) -> case mnesia:read({rabbit_queue, QName}) of - [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids, - slave_pids_pending_shutdown = PSPids}] -> + [Q] when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), + SPids = amqqueue:get_slave_pids(Q), + GMPids = amqqueue:get_gm_pids(Q), + PSPids = amqqueue:get_slave_pids_pending_shutdown(Q), case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of [] -> stop_pending_slaves(QName, PSPids), add_slave(Q, Self, GM), @@ -175,12 +181,11 @@ init_it(Self, GM, Node, QName) -> end; [SPid] -> case rabbit_mnesia:is_process_alive(SPid) of true -> existing; - false -> GMPids1 = [T || T = {_, S} <- GMPids, - S =/= SPid], - Q1 = Q#amqqueue{ - slave_pids = SPids -- [SPid], - gm_pids = GMPids1}, - add_slave(Q1, Self, GM), + false -> GMPids1 = [T || T = {_, S} <- GMPids, S =/= SPid], + SPids1 = SPids -- [SPid], + Q1 = amqqueue:set_slave_pids(Q, SPids1), + Q2 = amqqueue:set_gm_pids(Q1, GMPids1), + add_slave(Q2, Self, GM), {new, QPid, GMPids1} end end; @@ -212,9 +217,14 @@ stop_pending_slaves(QName, Pids) -> %% Add to the end, so they are in descending order of age, see %% rabbit_mirror_queue_misc:promote_slave/1 -add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> - rabbit_mirror_queue_misc:store_updated_slaves( - Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}). +add_slave(Q0, New, GM) when ?is_amqqueue(Q0) -> + SPids = amqqueue:get_slave_pids(Q0), + GMPids = amqqueue:get_gm_pids(Q0), + SPids1 = SPids ++ [New], + GMPids1 = [{GM, New} | GMPids], + Q1 = amqqueue:set_slave_pids(Q0, SPids1), + Q2 = amqqueue:set_gm_pids(Q1, GMPids1), + rabbit_mirror_queue_misc:store_updated_slaves(Q2). handle_call(go, _From, {not_started, Q} = NotStarted) -> case handle_go(Q) of @@ -223,10 +233,11 @@ handle_call(go, _From, {not_started, Q} = NotStarted) -> end; handle_call({gm_deaths, DeadGMPids}, From, - State = #state{ gm = GM, - q = Q = #amqqueue{ name = QName, pid = MPid }, - backing_queue = BQ, - backing_queue_state = BQS}) -> + State = #state{ gm = GM, q = Q, + backing_queue = BQ, + backing_queue_state = BQS}) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + MPid = amqqueue:get_pid(Q), Self = self(), case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of {error, not_found} -> @@ -269,7 +280,9 @@ handle_call({gm_deaths, DeadGMPids}, From, %% death. That is all process_death does, create %% some traffic. ok = gm:broadcast(GM, process_death), - noreply(State #state { q = Q #amqqueue { pid = Pid } }) + Q1 = amqqueue:set_pid(Q, Pid), + State1 = State#state{q = Q1}, + noreply(State1) end end; @@ -285,20 +298,22 @@ handle_cast(go, {not_started, Q} = NotStarted) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({gm, Instruction}, State = #state{q = #amqqueue { name = QName }}) -> +handle_cast({gm, Instruction}, State = #state{q = Q0}) when ?is_amqqueue(Q0) -> + QName = amqqueue:get_name(Q0), case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{slave_pids = SPids}} -> - case lists:member(self(), SPids) of - true -> - handle_process_result(process_instruction(Instruction, State)); - false -> - %% Potentially a duplicated slave caused by a partial partition, - %% will stop as a new slave could start unaware of our presence - {stop, shutdown, State} - end; - {error, not_found} -> - %% Would not expect this to happen after fixing #953 - {stop, shutdown, State} + {ok, Q1} when ?is_amqqueue(Q1) -> + SPids = amqqueue:get_slave_pids(Q1), + case lists:member(self(), SPids) of + true -> + handle_process_result(process_instruction(Instruction, State)); + false -> + %% Potentially a duplicated slave caused by a partial partition, + %% will stop as a new slave could start unaware of our presence + {stop, shutdown, State} + end; + {error, not_found} -> + %% Would not expect this to happen after fixing #953 + {stop, shutdown, State} end; handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, @@ -521,11 +536,16 @@ handle_terminate([_SPid], _Reason) -> infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(pid, _State) -> self(); -i(name, #state { q = #amqqueue { name = Name } }) -> Name; -i(master_pid, #state { q = #amqqueue { pid = MPid } }) -> MPid; -i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0; -i(Item, _State) -> throw({bad_argument, Item}). +i(pid, _State) -> + self(); +i(name, #state{q = Q}) when ?is_amqqueue(Q) -> + amqqueue:get_name(Q); +i(master_pid, #state{q = Q}) when ?is_amqqueue(Q) -> + amqqueue:get_pid(Q); +i(is_synchronised, #state{depth_delta = DD}) -> + DD =:= 0; +i(Item, _State) -> + throw({bad_argument, Item}). bq_init(BQ, Q, Recover) -> Self = self(), @@ -550,7 +570,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid, message = #basic_message { id = MsgId, is_persistent = true } }, - MS, #state { q = #amqqueue { durable = true } }) -> + MS, #state{q = Q}) when ?amqqueue_is_durable(Q) -> maps:put(MsgId, {published, ChPid, MsgSeqNo} , MS); send_or_record_confirm(_Status, #delivery { sender = ChPid, confirm = true, @@ -595,7 +615,7 @@ handle_process_result({stop, State}) -> {stop, normal, State}. -spec promote_me({pid(), term()}, #state{}) -> no_return(). -promote_me(From, #state { q = Q = #amqqueue { name = QName }, +promote_me(From, #state { q = Q0, gm = GM, backing_queue = BQ, backing_queue_state = BQS, @@ -603,13 +623,14 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, sender_queues = SQ, msg_id_ack = MA, msg_id_status = MS, - known_senders = KS }) -> + known_senders = KS}) when ?is_amqqueue(Q0) -> + QName = amqqueue:get_name(Q0), rabbit_mirror_queue_misc:log_info(QName, "Promoting slave ~s to master~n", [rabbit_misc:pid_to_string(self())]), - Q1 = Q #amqqueue { pid = self() }, - {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q1, GM, rabbit_mirror_queue_master:sender_death_fun(), - rabbit_mirror_queue_master:depth_fun()), + Q1 = amqqueue:set_pid(Q0, self()), + DeathFun = rabbit_mirror_queue_master:sender_death_fun(), + DepthFun = rabbit_mirror_queue_master:depth_fun(), + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q1, GM, DeathFun, DepthFun), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), @@ -1040,19 +1061,22 @@ update_ram_duration(BQ, BQS) -> rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQ:set_ram_duration_target(DesiredDuration, BQS1). -record_synchronised(#amqqueue { name = QName }) -> +record_synchronised(Q0) when ?is_amqqueue(Q0) -> + QName = amqqueue:get_name(Q0), Self = self(), - case rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_queue, QName}) of - [] -> - ok; - [Q1 = #amqqueue { sync_slave_pids = SSPids }] -> - Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, - rabbit_mirror_queue_misc:store_updated_slaves(Q2), - {ok, Q2} - end - end) of - ok -> ok; - {ok, Q} -> rabbit_mirror_queue_misc:maybe_drop_master_after_sync(Q) + F = fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q1] when ?is_amqqueue(Q1) -> + SSPids = amqqueue:get_sync_slave_pids(Q1), + SSPids1 = [Self | SSPids], + Q2 = amqqueue:set_sync_slave_pids(Q1, SSPids1), + rabbit_mirror_queue_misc:store_updated_slaves(Q2), + {ok, Q2} + end + end, + case rabbit_misc:execute_mnesia_transaction(F) of + ok -> ok; + {ok, Q2} -> rabbit_mirror_queue_misc:maybe_drop_master_after_sync(Q2) end. diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 7f07b205ff..ed6ffc9ecd 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -36,7 +36,8 @@ -behaviour(rabbit_runtime_parameter). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -import(rabbit_misc, [pget/2, pget/3]). @@ -59,16 +60,22 @@ register() -> rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE), rabbit_registry:register(runtime_parameter, <<"operator_policy">>, ?MODULE). -name(#amqqueue{policy = Policy}) -> name0(Policy); +name(Q) when ?is_amqqueue(Q) -> + Policy = amqqueue:get_policy(Q), + name0(Policy); name(#exchange{policy = Policy}) -> name0(Policy). -name_op(#amqqueue{operator_policy = Policy}) -> name0(Policy); +name_op(Q) when ?is_amqqueue(Q) -> + OpPolicy = amqqueue:get_operator_policy(Q), + name0(OpPolicy); name_op(#exchange{operator_policy = Policy}) -> name0(Policy). name0(undefined) -> none; name0(Policy) -> pget(name, Policy). -effective_definition(#amqqueue{policy = Policy, operator_policy = OpPolicy}) -> +effective_definition(Q) when ?is_amqqueue(Q) -> + Policy = amqqueue:get_policy(Q), + OpPolicy = amqqueue:get_operator_policy(Q), effective_definition0(Policy, OpPolicy); effective_definition(#exchange{policy = Policy, operator_policy = OpPolicy}) -> effective_definition0(Policy, OpPolicy). @@ -90,10 +97,15 @@ effective_definition0(Policy, OpPolicy) -> end, lists:umerge(Keys, OpKeys)). -set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = match(Name), - operator_policy = match_op(Name)}; -set(X = #exchange{name = Name}) -> X#exchange{policy = match(Name), - operator_policy = match_op(Name)}. +set(Q0) when ?is_amqqueue(Q0) -> + Name = amqqueue:get_name(Q0), + Policy = match(Name), + OpPolicy = match_op(Name), + Q1 = amqqueue:set_policy(Q0, Policy), + Q2 = amqqueue:set_operator_policy(Q1, OpPolicy), + Q2; +set(X = #exchange{name = Name}) -> + X#exchange{policy = match(Name), operator_policy = match_op(Name)}. match(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)). @@ -101,7 +113,9 @@ match(Name = #resource{virtual_host = VHost}) -> match_op(Name = #resource{virtual_host = VHost}) -> match(Name, list_op(VHost)). -get(Name, #amqqueue{policy = Policy, operator_policy = OpPolicy}) -> +get(Name, Q) when ?is_amqqueue(Q) -> + Policy = amqqueue:get_policy(Q), + OpPolicy = amqqueue:get_operator_policy(Q), get0(Name, Policy, OpPolicy); get(Name, #exchange{policy = Policy, operator_policy = OpPolicy}) -> get0(Name, Policy, OpPolicy); @@ -170,7 +184,7 @@ recover() -> %% variants. recover0() -> Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}), - Qs = mnesia:dirty_match_object(rabbit_durable_queue, #amqqueue{_ = '_'}), + Qs = mnesia:dirty_match_object(rabbit_durable_queue, amqqueue:pattern_match_all()), Policies = list(), OpPolicies = list_op(), [rabbit_misc:execute_mnesia_transaction( @@ -182,15 +196,18 @@ recover0() -> operator_policy = match(Name, OpPolicies)}), write) end) || X = #exchange{name = Name} <- Xs], - [rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:write( - rabbit_durable_queue, - rabbit_queue_decorator:set( - Q#amqqueue{policy = match(Name, Policies), - operator_policy = match(Name, OpPolicies)}), - write) - end) || Q = #amqqueue{name = Name} <- Qs], + [begin + QName = amqqueue:get_name(Q0), + Policy1 = match(QName, Policies), + Q1 = amqqueue:set_policy(Q0, Policy1), + OpPolicy1 = match(QName, OpPolicies), + Q2 = amqqueue:set_operator_policy(Q1, OpPolicy1), + Q3 = rabbit_queue_decorator:set(Q2), + F = fun () -> + mnesia:write(rabbit_durable_queue, Q3, write) + end, + rabbit_misc:execute_mnesia_transaction(F) + end || Q0 <- Qs], ok. invalid_file() -> @@ -395,25 +412,26 @@ update_exchange(X = #exchange{name = XName, end end. -update_queue(Q = #amqqueue{name = QName, - policy = OldPolicy, - operator_policy = OldOpPolicy}, - Policies, OpPolicies) -> +update_queue(Q0, Policies, OpPolicies) when ?is_amqqueue(Q0) -> + QName = amqqueue:get_name(Q0), + OldPolicy = amqqueue:get_policy(Q0), + OldOpPolicy = amqqueue:get_operator_policy(Q0), case {match(QName, Policies), match(QName, OpPolicies)} of {OldPolicy, OldOpPolicy} -> no_change; {NewPolicy, NewOpPolicy} -> - NewQueue = rabbit_amqqueue:update( - QName, - fun(Q1) -> - rabbit_queue_decorator:set( - Q1#amqqueue{policy = NewPolicy, - operator_policy = NewOpPolicy, - policy_version = - Q1#amqqueue.policy_version + 1 }) - end), + F = fun (QFun0) -> + QFun1 = amqqueue:set_policy(QFun0, NewPolicy), + QFun2 = amqqueue:set_operator_policy(QFun1, NewOpPolicy), + NewPolicyVersion = amqqueue:get_policy_version(QFun2) + 1, + QFun3 = amqqueue:set_policy_version(QFun2, NewPolicyVersion), + rabbit_queue_decorator:set(QFun3) + end, + NewQueue = rabbit_amqqueue:update(QName, F), case NewQueue of - #amqqueue{} = Q1 -> {Q, Q1}; - not_found -> {Q, Q } + Q1 when ?is_amqqueue(Q1) -> + {Q0, Q1}; + not_found -> + {Q0, Q0} end end. @@ -421,7 +439,7 @@ notify(no_change)-> ok; notify({X1 = #exchange{}, X2 = #exchange{}}) -> rabbit_exchange:policy_changed(X1, X2); -notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) -> +notify({Q1, Q2}) when ?is_amqqueue(Q1), ?is_amqqueue(Q2) -> rabbit_amqqueue:policy_changed(Q1, Q2). match(Name, Policies) -> diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 63dbec545b..b750482ba1 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -29,7 +29,8 @@ -behaviour(gen_server2). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). %%---------------------------------------------------------------------------- @@ -37,7 +38,7 @@ -type start_mode() :: 'declare' | 'recovery' | 'slave'. --spec start_link(rabbit_types:amqqueue(), start_mode(), pid()) +-spec start_link(amqqueue:amqqueue(), start_mode(), pid()) -> rabbit_types:ok_pid_or_error(). %%---------------------------------------------------------------------------- @@ -57,20 +58,22 @@ init({Q, StartMode, Marker}) -> init(Q, master) -> rabbit_amqqueue_process:init(Q); init(Q, slave) -> rabbit_mirror_queue_slave:init(Q); -init(#amqqueue{name = QueueName}, restart) -> - {ok, Q = #amqqueue{pid = QPid, - slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName), +init(Q0, restart) when ?is_amqqueue(Q0) -> + QueueName = amqqueue:get_name(Q0), + {ok, Q1} = rabbit_amqqueue:lookup(QueueName), + QPid = amqqueue:get_pid(Q1), + SPids = amqqueue:get_slave_pids(Q1), LocalOrMasterDown = node(QPid) =:= node() orelse not rabbit_mnesia:on_running_node(QPid), Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)], case rabbit_mnesia:is_process_alive(QPid) of true -> false = LocalOrMasterDown, %% assertion rabbit_mirror_queue_slave:go(self(), async), - rabbit_mirror_queue_slave:init(Q); %% [1] + rabbit_mirror_queue_slave:init(Q1); %% [1] false -> case LocalOrMasterDown andalso Slaves =:= [] of - true -> crash_restart(Q); %% [2] + true -> crash_restart(Q1); %% [2] false -> timer:sleep(25), - init(Q, restart) %% [3] + init(Q1, restart) %% [3] end end. %% [1] There is a master on another node. Regardless of whether we @@ -83,10 +86,12 @@ init(#amqqueue{name = QueueName}, restart) -> %% not a stable situation. Sleep and wait for somebody else to make a %% move. -crash_restart(Q = #amqqueue{name = QueueName}) -> +crash_restart(Q0) when ?is_amqqueue(Q0) -> + QueueName = amqqueue:get_name(Q0), rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), gen_server2:cast(self(), init), - rabbit_amqqueue_process:init(Q#amqqueue{pid = self()}). + Q1 = amqqueue:set_pid(Q0, self()), + rabbit_amqqueue_process:init(Q1). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index b41511f874..621f42dafb 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -16,8 +16,10 @@ -module(rabbit_priority_queue). --include_lib("rabbit.hrl"). --include_lib("rabbit_framing.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). +-include("amqqueue.hrl"). + -behaviour(rabbit_backing_queue). %% enabled unconditionally. Disabling priority queueing after @@ -102,10 +104,14 @@ stop(VHost) -> %%---------------------------------------------------------------------------- -mutate_name(P, Q = #amqqueue{name = QName = #resource{name = QNameBin}}) -> - Q#amqqueue{name = QName#resource{name = mutate_name_bin(P, QNameBin)}}. +mutate_name(P, Q) when ?is_amqqueue(Q) -> + Res0 = #resource{name = QNameBin0} = amqqueue:get_name(Q), + QNameBin1 = mutate_name_bin(P, QNameBin0), + Res1 = Res0#resource{name = QNameBin1}, + amqqueue:set_name(Q, Res1). -mutate_name_bin(P, NameBin) -> <<NameBin/binary, 0, P:8>>. +mutate_name_bin(P, NameBin) -> + <<NameBin/binary, 0, P:8>>. expand_queues(QNames) -> lists:unzip( @@ -125,7 +131,8 @@ collapse_recovery(QNames, DupNames, Recovery) -> end, dict:new(), lists:zip(DupNames, Recovery)), [dict:fetch(Name, NameToTerms) || Name <- QNames]. -priorities(#amqqueue{arguments = Args}) -> +priorities(Q) when ?is_amqqueue(Q) -> + Args = amqqueue:get_arguments(Q), Ints = [long, short, signedint, byte, unsignedbyte, unsignedshort, unsignedint], case rabbit_misc:table_lookup(Args, <<"x-max-priority">>) of {Type, RequestedMax} -> diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 0c234e1072..6f40637b93 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -16,7 +16,8 @@ -module(rabbit_queue_decorator). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -export([select/1, set/1, register/2, unregister/1]). @@ -26,18 +27,18 @@ %%---------------------------------------------------------------------------- --callback startup(rabbit_types:amqqueue()) -> 'ok'. +-callback startup(amqqueue:amqqueue()) -> 'ok'. --callback shutdown(rabbit_types:amqqueue()) -> 'ok'. +-callback shutdown(amqqueue:amqqueue()) -> 'ok'. --callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> +-callback policy_changed(amqqueue:amqqueue(), amqqueue:amqqueue()) -> 'ok'. --callback active_for(rabbit_types:amqqueue()) -> boolean(). +-callback active_for(amqqueue:amqqueue()) -> boolean(). %% called with Queue, MaxActivePriority, IsEmpty -callback consumer_state_changed( - rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'. + amqqueue:amqqueue(), integer(), boolean()) -> 'ok'. %%---------------------------------------------------------------------------- @@ -47,7 +48,9 @@ removed_from_rabbit_registry(_Type) -> ok. select(Modules) -> [M || M <- Modules, code:which(M) =/= non_existing]. -set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}. +set(Q) when ?is_amqqueue(Q) -> + Decorators = [D || D <- list(), D:active_for(Q)], + amqqueue:set_decorators(Q, Decorators). list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)]. @@ -61,13 +64,18 @@ unregister(TypeName) -> [maybe_recover(Q) || Q <- rabbit_amqqueue:list()], ok. -maybe_recover(Q = #amqqueue{name = Name, - decorators = Decs}) -> - #amqqueue{decorators = Decs1} = set(Q), - Old = lists:sort(select(Decs)), +maybe_recover(Q0) when ?is_amqqueue(Q0) -> + Name = amqqueue:get_name(Q0), + Decs0 = amqqueue:get_decorators(Q0), + Q1 = set(Q0), + Decs1 = amqqueue:get_decorators(Q1), + Old = lists:sort(select(Decs0)), New = lists:sort(select(Decs1)), case New of - Old -> ok; - _ -> [M:startup(Q) || M <- New -- Old], - rabbit_amqqueue:update_decorators(Name) + Old -> + ok; + _ -> + %% TODO LRB JSP 160169569 should startup be passed Q1 here? + [M:startup(Q0) || M <- New -- Old], + rabbit_amqqueue:update_decorators(Name) end. diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl index aa07637ab1..4c51bf52d2 100644 --- a/src/rabbit_queue_location_client_local.erl +++ b/src/rabbit_queue_location_client_local.erl @@ -17,7 +17,8 @@ -module(rabbit_queue_location_client_local). -behaviour(rabbit_queue_master_locator). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -export([description/0, queue_master_location/1]). @@ -37,4 +38,5 @@ description() -> [{description, <<"Locate queue master node as the client local node">>}]. -queue_master_location(#amqqueue{}) -> {ok, node()}. +queue_master_location(Q) when ?is_amqqueue(Q) -> + {ok, node()}. diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl index a3a3021229..9462e15db9 100644 --- a/src/rabbit_queue_location_min_masters.erl +++ b/src/rabbit_queue_location_min_masters.erl @@ -17,7 +17,8 @@ -module(rabbit_queue_location_min_masters). -behaviour(rabbit_queue_master_locator). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -export([description/0, queue_master_location/1]). @@ -37,7 +38,7 @@ description() -> [{description, <<"Locate queue master node from cluster node with least bound queues">>}]. -queue_master_location(#amqqueue{} = Q) -> +queue_master_location(Q) when ?is_amqqueue(Q) -> Cluster = rabbit_queue_master_location_misc:all_nodes(Q), QueueNames = rabbit_amqqueue:list_names(), MastersPerNode = lists:foldl( diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl index a92c178dfe..e166fa350d 100644 --- a/src/rabbit_queue_location_random.erl +++ b/src/rabbit_queue_location_random.erl @@ -17,7 +17,8 @@ -module(rabbit_queue_location_random). -behaviour(rabbit_queue_master_locator). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -export([description/0, queue_master_location/1]). @@ -37,7 +38,7 @@ description() -> [{description, <<"Locate queue master node from cluster in a random manner">>}]. -queue_master_location(#amqqueue{} = Q) -> +queue_master_location(Q) when ?is_amqqueue(Q) -> Cluster = rabbit_queue_master_location_misc:all_nodes(Q), RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)), MasterNode = lists:nth(RandomPos + 1, Cluster), diff --git a/src/rabbit_queue_location_validator.erl b/src/rabbit_queue_location_validator.erl index 68a4bc3e06..2c7bb41b7e 100644 --- a/src/rabbit_queue_location_validator.erl +++ b/src/rabbit_queue_location_validator.erl @@ -17,7 +17,8 @@ -module(rabbit_queue_location_validator). -behaviour(rabbit_policy_validator). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -export([validate_policy/1, validate_strategy/1]). @@ -49,7 +50,7 @@ policy(Policy, Q) -> P -> P end. -module(#amqqueue{} = Q) -> +module(Q) when ?is_amqqueue(Q) -> case policy(<<"queue-master-locator">>, Q) of undefined -> no_location_strategy; Mode -> module(Mode) diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl index 6df7e5db6f..b31fd50192 100644 --- a/src/rabbit_queue_master_location_misc.erl +++ b/src/rabbit_queue_master_location_misc.erl @@ -16,7 +16,8 @@ -module(rabbit_queue_master_location_misc). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). -export([lookup_master/2, lookup_queue/2, @@ -28,22 +29,25 @@ lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin), is_binary(VHostPath) -> - Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin), - case rabbit_amqqueue:lookup(Queue) of - {ok, #amqqueue{pid = Pid}} when is_pid(Pid) -> + QueueR = rabbit_misc:r(VHostPath, queue, QueueNameBin), + case rabbit_amqqueue:lookup(QueueR) of + {ok, Queue} when ?amqqueue_has_valid_pid(Queue) -> + Pid = amqqueue:get_pid(Queue), {ok, node(Pid)}; Error -> Error end. lookup_queue(QueueNameBin, VHostPath) when is_binary(QueueNameBin), is_binary(VHostPath) -> - Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin), - case rabbit_amqqueue:lookup(Queue) of - Reply = {ok, #amqqueue{}} -> Reply; - Error -> Error + QueueR = rabbit_misc:r(VHostPath, queue, QueueNameBin), + case rabbit_amqqueue:lookup(QueueR) of + Reply = {ok, Queue} when ?is_amqqueue(Queue) -> + Reply; + Error -> + Error end. -get_location(Queue=#amqqueue{})-> +get_location(Queue) when ?is_amqqueue(Queue) -> Reply1 = case get_location_mod_by_args(Queue) of _Err1 = {error, _} -> case get_location_mod_by_policy(Queue) of @@ -62,7 +66,8 @@ get_location(Queue=#amqqueue{})-> Error -> Error end. -get_location_mod_by_args(#amqqueue{arguments=Args}) -> +get_location_mod_by_args(Queue) when ?is_amqqueue(Queue) -> + Args = amqqueue:get_arguments(Queue), case rabbit_misc:table_lookup(Args, <<"x-queue-master-locator">>) of {_Type, Strategy} -> case rabbit_queue_location_validator:validate_strategy(Strategy) of @@ -72,7 +77,7 @@ get_location_mod_by_args(#amqqueue{arguments=Args}) -> _ -> {error, "x-queue-master-locator undefined"} end. -get_location_mod_by_policy(Queue=#amqqueue{}) -> +get_location_mod_by_policy(Queue) when ?is_amqqueue(Queue) -> case rabbit_policy:get(<<"queue-master-locator">> , Queue) of undefined -> {error, "queue-master-locator policy undefined"}; Strategy -> @@ -82,7 +87,7 @@ get_location_mod_by_policy(Queue=#amqqueue{}) -> end end. -get_location_mod_by_config(#amqqueue{}) -> +get_location_mod_by_config(Queue) when ?is_amqqueue(Queue) -> case application:get_env(rabbit, queue_master_locator) of {ok, Strategy} -> case rabbit_queue_location_validator:validate_strategy(Strategy) of @@ -92,7 +97,7 @@ get_location_mod_by_config(#amqqueue{}) -> _ -> {error, "queue_master_locator undefined"} end. -all_nodes(Queue = #amqqueue{}) -> +all_nodes(Queue) when ?is_amqqueue(Queue) -> handle_is_mirrored_ha_nodes(rabbit_mirror_queue_misc:is_mirrored_ha_nodes(Queue), Queue). handle_is_mirrored_ha_nodes(false, _Queue) -> diff --git a/src/rabbit_queue_master_locator.erl b/src/rabbit_queue_master_locator.erl new file mode 100644 index 0000000000..eba1e2aefa --- /dev/null +++ b/src/rabbit_queue_master_locator.erl @@ -0,0 +1,28 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_queue_master_locator). + +-behaviour(rabbit_registry_class). + +-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]). + +-callback description() -> [proplists:property()]. +-callback queue_master_location(amqqueue:amqqueue()) -> + {'ok', node()} | {'error', term()}. + +added_to_rabbit_registry(_Type, _ModuleName) -> ok. +removed_from_rabbit_registry(_Type) -> ok. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index b4edca7937..45a1c12cac 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -41,6 +41,7 @@ %%-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). +-include("amqqueue.hrl"). -type ra_server_id() :: {Name :: atom(), Node :: node()}. -type msg_id() :: non_neg_integer(). @@ -49,8 +50,8 @@ -spec handle_event({'ra_event', ra_server_id(), any()}, rabbit_fifo_client:state()) -> {'internal', Correlators :: [term()], rabbit_fifo_client:state()} | {rabbit_fifo:client_msg(), rabbit_fifo_client:state()}. --spec recover([rabbit_types:amqqueue()]) -> [rabbit_types:amqqueue() | - {'absent', rabbit_types:amqqueue(), atom()}]. +-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() | + {'absent', amqqueue:amqqueue(), atom()}]. -spec stop(rabbit_types:vhost()) -> 'ok'. -spec ack(rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. @@ -59,10 +60,10 @@ -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'. --spec info(rabbit_types:amqqueue()) -> rabbit_types:infos(). --spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos(). +-spec info(amqqueue:amqqueue()) -> rabbit_types:infos(). +-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos(). -spec infos(rabbit_types:r('queue')) -> rabbit_types:infos(). --spec stat(rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. +-spec stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. -spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'. -spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}. @@ -93,8 +94,9 @@ init_state({Name, _}, QName = #resource{}) -> {ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit), %% This lookup could potentially return an {error, not_found}, but we do not %% know what to do if the queue has `disappeared`. Let it crash. - {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes}} = - rabbit_amqqueue:lookup(QName), + {ok, Q} = rabbit_amqqueue:lookup(QName), + Leader = amqqueue:get_pid(Q), + Nodes = amqqueue:get_quorum_nodes(Q), %% Ensure the leader is listed first Servers0 = [{Name, N} || N <- Nodes], Servers = [Leader | lists:delete(Leader, Servers0)], @@ -105,14 +107,15 @@ init_state({Name, _}, QName = #resource{}) -> handle_event({ra_event, From, Evt}, QState) -> rabbit_fifo_client:handle_ra_event(From, Evt, QState). --spec declare(rabbit_types:amqqueue()) -> - {'new', rabbit_types:amqqueue()} | - {existing, rabbit_types:amqqueue()}. -declare(#amqqueue{name = QName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Arguments, - options = Opts} = Q) -> +-spec declare(amqqueue:amqqueue()) -> + {'new', amqqueue:amqqueue()} | + {existing, amqqueue:amqqueue()}. +declare(Q) when ?amqqueue_is_quorum(Q) -> + QName = amqqueue:get_name(Q), + Durable = amqqueue:is_durable(Q), + AutoDelete = amqqueue:is_auto_delete(Q), + Arguments = amqqueue:get_arguments(Q), + Opts = amqqueue:get_options(Q), ActingUser = maps:get(user, Opts, ?UNKNOWN_USER), check_invalid_arguments(QName, Arguments), check_auto_delete(Q), @@ -122,9 +125,9 @@ declare(#amqqueue{name = QName, RaName = qname_to_rname(QName), Id = {RaName, node()}, Nodes = select_quorum_nodes(QuorumSize, rabbit_mnesia:cluster_nodes(all)), - NewQ0 = Q#amqqueue{pid = Id, - quorum_nodes = Nodes}, - case rabbit_amqqueue:internal_declare(NewQ0, false) of + NewQ0 = amqqueue:set_pid(Q, Id), + NewQ1 = amqqueue:set_quorum_nodes(NewQ0, Nodes), + case rabbit_amqqueue:internal_declare(NewQ1, false) of {created, NewQ} -> RaMachine = ra_machine(NewQ), case ra:start_cluster(RaName, RaMachine, @@ -150,8 +153,9 @@ declare(#amqqueue{name = QName, ra_machine(Q) -> {module, rabbit_fifo, ra_machine_config(Q)}. -ra_machine_config(Q = #amqqueue{name = QName, - pid = {Name, _}}) -> +ra_machine_config(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + {Name, _} = amqqueue:get_pid(Q), %% take the minimum value of the policy and the queue arg if present MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), @@ -163,7 +167,8 @@ ra_machine_config(Q = #amqqueue{name = QName, max_bytes => MaxBytes, single_active_consumer_on => single_active_consumer_on(Q)}. -single_active_consumer_on(#amqqueue{arguments = QArguments}) -> +single_active_consumer_on(Q) -> + QArguments = amqqueue:get_arguments(Q), case rabbit_misc:table_lookup(QArguments, <<"x-single-active-consumer">>) of {bool, true} -> true; _ -> false @@ -200,9 +205,10 @@ local_or_remote_handler(ChPid, Module, Function, Args) -> end. become_leader(QName, Name) -> - Fun = fun(Q1) -> - Q1#amqqueue{pid = {Name, node()}, - state = live} + Fun = fun (Q1) -> + amqqueue:set_state( + amqqueue:set_pid(Q1, {Name, node()}), + live) end, %% as this function is called synchronously when a ra node becomes leader %% we need to ensure there is no chance of blocking as else the ra node @@ -213,7 +219,8 @@ become_leader(QName, Name) -> rabbit_amqqueue:update(QName, Fun) end), case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{quorum_nodes = Nodes}} -> + {ok, Q0} when ?is_amqqueue(Q0) -> + Nodes = amqqueue:get_quorum_nodes(Q0), [rpc:call(Node, ?MODULE, rpc_delete_metrics, [QName], ?TICK_TIME) || Node <- Nodes, Node =/= node()]; @@ -263,9 +270,10 @@ reductions(Name) -> recover(Queues) -> [begin + {Name, _} = amqqueue:get_pid(Q0), + Nodes = amqqueue:get_quorum_nodes(Q0), case ra:restart_server({Name, node()}) of ok -> - % queue was restarted, good ok; {error, Err} @@ -298,20 +306,24 @@ recover(Queues) -> %% So many code paths are dependent on this. {ok, Q} = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Q0), Q - end || #amqqueue{pid = {Name, _}, - quorum_nodes = Nodes} = Q0 <- Queues]. + end || Q0 <- Queues]. stop(VHost) -> - _ = [ra:stop_server(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)], + _ = [begin + Pid = amqqueue:get_pid(Q), + ra:stop_server(Pid) + end || Q <- find_quorum_queues(VHost)], ok. --spec delete(#amqqueue{}, +-spec delete(amqqueue:amqqueue(), boolean(), boolean(), rabbit_types:username()) -> {ok, QLen :: non_neg_integer()}. -delete(#amqqueue{type = quorum, pid = {Name, _}, - name = QName, quorum_nodes = QNodes} = Q, - _IfUnused, _IfEmpty, ActingUser) -> +delete(Q, + _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> + {Name, _} = amqqueue:get_pid(Q), + QName = amqqueue:get_name(Q), + QNodes = amqqueue:get_quorum_nodes(Q), %% TODO Quorum queue needs to support consumer tracking for IfUnused Timeout = ?DELETE_TIMEOUT, {ok, ReadyMsgs, _} = stat(Q), @@ -388,12 +400,14 @@ reject(false, CTag, MsgIds, QState) -> credit(CTag, Credit, Drain, QState) -> rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState). --spec basic_get(#amqqueue{}, NoAck :: boolean(), rabbit_types:ctag(), +-spec basic_get(amqqueue:amqqueue(), NoAck :: boolean(), rabbit_types:ctag(), rabbit_fifo_client:state()) -> {'ok', 'empty', rabbit_fifo_client:state()} | {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}. -basic_get(#amqqueue{name = QName, pid = Id, type = quorum}, NoAck, - CTag0, QState0) -> + +basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) -> + QName = amqqueue:get_name(Q), + Id = amqqueue:get_pid(Q), CTag = quorum_ctag(CTag0), Settlement = case NoAck of true -> @@ -413,17 +427,20 @@ basic_get(#amqqueue{name = QName, pid = Id, type = quorum}, NoAck, {error, timeout} end. --spec basic_consume(rabbit_types:amqqueue(), NoAck :: boolean(), ChPid :: pid(), +-spec basic_consume(amqqueue:amqqueue(), NoAck :: boolean(), ChPid :: pid(), ConsumerPrefetchCount :: non_neg_integer(), rabbit_types:ctag(), ExclusiveConsume :: boolean(), Args :: rabbit_framing:amqp_table(), ActingUser :: binary(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum} = Q, NoAck, ChPid, + +basic_consume(Q, NoAck, ChPid, ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args, - ActingUser, OkMsg, QState0) -> + ActingUser, OkMsg, QState0) when ?amqqueue_is_quorum(Q) -> %% TODO: validate consumer arguments %% currently quorum queues do not support any arguments + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), maybe_send_reply(ChPid, OkMsg), ConsumerTag = quorum_ctag(ConsumerTag0), %% A prefetch count of 0 means no limitation, @@ -514,9 +531,12 @@ requeue(ConsumerTag, MsgIds, QState) -> rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState). cleanup_data_dir() -> - Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes} - <- rabbit_amqqueue:list_by_type(quorum), - lists:member(node(), Nodes)], + Names = [begin + {Name, _} = amqqueue:get_pid(Q), + Name + end + || Q <- rabbit_amqqueue:list_by_type(quorum), + lists:member(node(), amqqueue:get_quorum_nodes(Q))], Registered = ra_directory:list_registered(), _ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, not lists:member(Name, Names)], @@ -552,9 +572,11 @@ status(Vhost, QueueName) -> QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue}, RName = qname_to_rname(QName), case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{type = classic}} -> + {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; - {ok, #amqqueue{pid = {_, Leader}, quorum_nodes = Nodes}} -> + {ok, Q} when ?amqqueue_is_quorum(Q) -> + {_, Leader} = amqqueue:get_pid(Q), + Nodes = amqqueue:get_quorum_nodes(Q), Info = [{leader, Leader}, {members, Nodes}], case ets:lookup(ra_state, RName) of [{_, State}] -> @@ -569,9 +591,10 @@ status(Vhost, QueueName) -> add_member(VHost, Name, Node) -> QName = #resource{virtual_host = VHost, name = Name, kind = queue}, case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{type = classic}} -> + {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; - {ok, #amqqueue{quorum_nodes = QNodes} = Q} -> + {ok, Q} when ?amqqueue_is_quorum(Q) -> + QNodes = amqqueue:get_quorum_nodes(Q), case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of false -> {error, node_not_running}; @@ -587,8 +610,10 @@ add_member(VHost, Name, Node) -> E end. -add_member(#amqqueue{pid = {RaName, _} = ServerRef, name = QName, - quorum_nodes = QNodes} = Q, Node) -> +add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> + {RaName, _} = ServerRef = amqqueue:get_pid(Q), + QName = amqqueue:get_name(Q), + QNodes = amqqueue:get_quorum_nodes(Q), %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, case ra:start_server(RaName, ServerId, ra_machine(Q), @@ -597,9 +622,10 @@ add_member(#amqqueue{pid = {RaName, _} = ServerRef, name = QName, case ra:add_member(ServerRef, ServerId) of {ok, _, Leader} -> Fun = fun(Q1) -> - Q1#amqqueue{quorum_nodes = - [Node | Q1#amqqueue.quorum_nodes], - pid = Leader} + Q2 = amqqueue:set_quorum_nodes( + Q1, + [Node | amqqueue:get_quorum_nodes(Q1)]), + amqqueue:set_pid(Q2, Leader) end, rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:update(QName, Fun) end), @@ -615,9 +641,10 @@ add_member(#amqqueue{pid = {RaName, _} = ServerRef, name = QName, delete_member(VHost, Name, Node) -> QName = #resource{virtual_host = VHost, name = Name, kind = queue}, case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{type = classic}} -> + {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; - {ok, #amqqueue{quorum_nodes = QNodes} = Q} -> + {ok, Q} when ?amqqueue_is_quorum(Q) -> + QNodes = amqqueue:get_quorum_nodes(Q), case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of false -> {error, node_not_running}; @@ -633,13 +660,16 @@ delete_member(VHost, Name, Node) -> E end. -delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) -> +delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> + QName = amqqueue:get_name(Q), + {RaName, _} = amqqueue:get_pid(Q), ServerId = {RaName, Node}, case ra:leave_and_delete_server(ServerId) of ok -> Fun = fun(Q1) -> - Q1#amqqueue{quorum_nodes = - lists:delete(Node, Q1#amqqueue.quorum_nodes)} + amqqueue:set_quorum_nodes( + Q1, + lists:delete(Node, amqqueue:get_quorum_nodes(Q1))) end, rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:update(QName, Fun) end), @@ -654,16 +684,18 @@ dlx_mfa(Q) -> fun res_arg/2, Q), Q), DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q), - {?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}. + {?MODULE, dead_letter_publish, [DLX, DLXRKey, amqqueue:get_name(Q)]}. init_dlx(undefined, _Q) -> undefined; -init_dlx(DLX, #amqqueue{name = QName}) -> +init_dlx(DLX, Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), rabbit_misc:r(QName, exchange, DLX). res_arg(_PolVal, ArgVal) -> ArgVal. -args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) -> +args_policy_lookup(Name, Resolve, Q) when ?is_amqqueue(Q) -> + Args = amqqueue:get_arguments(Q), AName = <<"x-", Name/binary>>, case {rabbit_policy:get(Name, Q), rabbit_misc:table_lookup(Args, AName)} of {undefined, undefined} -> undefined; @@ -693,29 +725,32 @@ find_quorum_queues(VHost) -> Node = node(), mnesia:async_dirty( fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{vhost = VH, - pid = Pid, - type = quorum} - <- mnesia:table(rabbit_durable_queue), - VH =:= VHost, - qnode(Pid) == Node])) + qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue), + ?amqqueue_is_quorum(Q), + amqqueue:get_vhost(Q) =:= VHost, + amqqueue:qnode(Q) == Node])) end). -i(name, #amqqueue{name = Name}) -> Name; -i(durable, #amqqueue{durable = Dur}) -> Dur; -i(auto_delete, #amqqueue{auto_delete = AD}) -> AD; -i(arguments, #amqqueue{arguments = Args}) -> Args; -i(pid, #amqqueue{pid = {Name, _}}) -> whereis(Name); -i(messages, #amqqueue{pid = {Name, _}}) -> +i(name, Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q); +i(durable, Q) when ?is_amqqueue(Q) -> amqqueue:is_durable(Q); +i(auto_delete, Q) when ?is_amqqueue(Q) -> amqqueue:is_auto_delete(Q); +i(arguments, Q) when ?is_amqqueue(Q) -> amqqueue:get_arguments(Q); +i(pid, Q) when ?is_amqqueue(Q) -> + {Name, _} = amqqueue:get_pid(Q), + whereis(Name); +i(messages, Q) when ?is_amqqueue(Q) -> + {Name, _} = amqqueue:get_pid(Q), quorum_messages(Name); -i(messages_ready, #amqqueue{name = QName}) -> +i(messages_ready, Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), case ets:lookup(queue_coarse_metrics, QName) of [{_, MR, _, _, _}] -> MR; [] -> 0 end; -i(messages_unacknowledged, #amqqueue{name = QName}) -> +i(messages_unacknowledged, Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), case ets:lookup(queue_coarse_metrics, QName) of [{_, _, MU, _, _}] -> MU; @@ -737,14 +772,16 @@ i(effective_policy_definition, Q) -> undefined -> []; Def -> Def end; -i(consumers, #amqqueue{name = QName}) -> +i(consumers, Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), case ets:lookup(queue_metrics, QName) of [{_, M, _}] -> proplists:get_value(consumers, M, 0); [] -> 0 end; -i(memory, #amqqueue{pid = {Name, _}}) -> +i(memory, Q) when ?is_amqqueue(Q) -> + {Name, _} = amqqueue:get_pid(Q), try {memory, M} = process_info(whereis(Name), memory), M @@ -752,33 +789,38 @@ i(memory, #amqqueue{pid = {Name, _}}) -> error:badarg -> 0 end; -i(state, #amqqueue{pid = {Name, Node}}) -> +i(state, Q) when ?is_amqqueue(Q) -> + {Name, Node} = amqqueue:get_pid(Q), %% Check against the leader or last known leader case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of {badrpc, _} -> down; State -> State end; -i(local_state, #amqqueue{pid = {Name, _}}) -> +i(local_state, Q) when ?is_amqqueue(Q) -> + {Name, _} = amqqueue:get_pid(Q), case ets:lookup(ra_state, Name) of [{_, State}] -> State; _ -> not_member end; -i(garbage_collection, #amqqueue{pid = {Name, _}}) -> +i(garbage_collection, Q) when ?is_amqqueue(Q) -> + {Name, _} = amqqueue:get_pid(Q), try rabbit_misc:get_gc_info(whereis(Name)) catch error:badarg -> [] end; -i(members, #amqqueue{quorum_nodes = Nodes}) -> - Nodes; +i(members, Q) when ?is_amqqueue(Q) -> + amqqueue:get_quorum_nodes(Q); i(online, Q) -> online(Q); i(leader, Q) -> leader(Q); -i(open_files, #amqqueue{pid = {Name, _}, - quorum_nodes = Nodes}) -> +i(open_files, Q) when ?is_amqqueue(Q) -> + {Name, _} = amqqueue:get_pid(Q), + Nodes = amqqueue:get_quorum_nodes(Q), {Data, _} = rpc:multicall(Nodes, rabbit_quorum_queue, open_files, [Name]), lists:flatten(Data); -i(single_active_consumer_pid, #amqqueue{pid = QPid}) -> +i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), {ok, {_, SacResult}, _} = ra:local_query(QPid, fun rabbit_fifo:query_single_active_consumer/1), case SacResult of @@ -787,7 +829,8 @@ i(single_active_consumer_pid, #amqqueue{pid = QPid}) -> _ -> '' end; -i(single_active_consumer_ctag, #amqqueue{pid = QPid}) -> +i(single_active_consumer_ctag, Q) when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), {ok, {_, SacResult}, _} = ra:local_query(QPid, fun rabbit_fifo:query_single_active_consumer/1), case SacResult of @@ -807,17 +850,20 @@ open_files(Name) -> end end. -leader(#amqqueue{pid = {Name, Leader}}) -> +leader(Q) when ?is_amqqueue(Q) -> + {Name, Leader} = amqqueue:get_pid(Q), case is_process_alive(Name, Leader) of true -> Leader; false -> '' end. -online(#amqqueue{quorum_nodes = Nodes, - pid = {Name, _Leader}}) -> +online(Q) when ?is_amqqueue(Q) -> + Nodes = amqqueue:get_quorum_nodes(Q), + {Name, _} = amqqueue:get_pid(Q), [Node || Node <- Nodes, is_process_alive(Name, Node)]. -format(#amqqueue{quorum_nodes = Nodes} = Q) -> +format(Q) when ?is_amqqueue(Q) -> + Nodes = amqqueue:get_quorum_nodes(Q), [{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}]. is_process_alive(Name, Node) -> @@ -839,11 +885,6 @@ quorum_ctag(Other) -> maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). -qnode(QPid) when is_pid(QPid) -> - node(QPid); -qnode({_, Node}) -> - Node. - check_invalid_arguments(QueueName, Args) -> Keys = [<<"x-expires">>, <<"x-message-ttl">>, <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>], @@ -856,7 +897,8 @@ check_invalid_arguments(QueueName, Args) -> end || Key <- Keys], ok. -check_auto_delete(#amqqueue{auto_delete = true, name = Name}) -> +check_auto_delete(Q) when ?amqqueue_is_auto_delete(Q) -> + Name = amqqueue:get_name(Q), rabbit_misc:protocol_error( precondition_failed, "invalid property 'auto-delete' for ~s", @@ -864,18 +906,19 @@ check_auto_delete(#amqqueue{auto_delete = true, name = Name}) -> check_auto_delete(_) -> ok. -check_exclusive(#amqqueue{exclusive_owner = none}) -> +check_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) -> ok; -check_exclusive(#amqqueue{name = Name}) -> +check_exclusive(Q) when ?is_amqqueue(Q) -> + Name = amqqueue:get_name(Q), rabbit_misc:protocol_error( precondition_failed, "invalid property 'exclusive-owner' for ~s", [rabbit_misc:rs(Name)]). -check_non_durable(#amqqueue{durable = true}) -> +check_non_durable(Q) when ?amqqueue_is_durable(Q) -> ok; -check_non_durable(#amqqueue{name = Name, - durable = false}) -> +check_non_durable(Q) when not ?amqqueue_is_durable(Q) -> + Name = amqqueue:get_name(Q), rabbit_misc:protocol_error( precondition_failed, "invalid property 'non-durable' for ~s", diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 1fab94fe34..272e3966b2 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -24,7 +24,7 @@ %% for testing purposes -export([definitions/0]). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). %%---------------------------------------------------------------------------- -type retry() :: boolean(). @@ -349,13 +349,13 @@ definitions() -> {match, #runtime_parameters{_='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}, + {attributes, amqqueue:fields()}, {disc_copies, [node()]}, - {match, #amqqueue{name = queue_name_match(), _='_'}}]}, + {match, amqqueue:pattern_match_on_name(queue_name_match())}]}, {rabbit_queue, [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}, - {match, #amqqueue{name = queue_name_match(), _='_'}}]}] + {attributes, amqqueue:fields()}, + {match, amqqueue:pattern_match_on_name(queue_name_match())}]}] ++ gm:table_definitions() ++ mirrored_supervisor:table_definitions(). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 6be812dad3..76a8b59e31 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -60,8 +60,6 @@ -rabbit_upgrade({queue_vhost_field, mnesia, [operator_policies]}). -rabbit_upgrade({topic_permission, mnesia, []}). -rabbit_upgrade({queue_options, mnesia, [queue_vhost_field]}). --rabbit_upgrade({queue_type, mnesia, [queue_options]}). --rabbit_upgrade({queue_quorum_nodes, mnesia, [queue_type]}). -rabbit_upgrade({exchange_options, mnesia, [operator_policies]}). %% TODO: move that to feature flags @@ -103,8 +101,6 @@ -spec operator_policies() -> 'ok'. -spec queue_vhost_field() -> 'ok'. -spec queue_options() -> 'ok'. --spec queue_type() -> 'ok'. --spec queue_quorum_nodes() -> 'ok'. -spec exchange_options() -> 'ok'. -spec remove_explicit_default_exchange_bindings() -> 'ok'. @@ -584,47 +580,6 @@ queue_options(Table) -> sync_slave_pids, recoverable_slaves, policy, operator_policy, gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost, options]). -queue_type() -> - ok = queue_type(rabbit_queue), - ok = queue_type(rabbit_durable_queue), - ok. - -queue_type(Table) -> - transform( - Table, - fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, - Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators, - State, PolicyVersion, SlavePidsPendingShutdown, VHost, Options}) -> - {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, - Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators, - State, PolicyVersion, SlavePidsPendingShutdown, VHost, Options, classic} - end, - [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, - sync_slave_pids, recoverable_slaves, policy, operator_policy, - gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost, options, - type]). - -queue_quorum_nodes() -> - ok = queue_quorum_nodes(rabbit_queue), - ok = queue_quorum_nodes(rabbit_durable_queue), - ok. - -queue_quorum_nodes(Table) -> - transform( - Table, - fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, - Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators, - State, PolicyVersion, SlavePidsPendingShutdown, VHost, Options, Type}) -> - {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, - Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators, - State, PolicyVersion, SlavePidsPendingShutdown, VHost, Options, Type, - undefined} - end, - [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, - sync_slave_pids, recoverable_slaves, policy, operator_policy, - gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost, options, - type, quorum_nodes]). - %% Prior to 3.6.0, passwords were hashed using MD5, this populates %% existing records with said default. Users created with 3.6.0+ will %% have internal_user.hashing_algorithm populated by the internal diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 4da073a518..f0ad0eb6e1 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -356,8 +356,9 @@ -define(QUEUE, lqueue). --include("rabbit.hrl"). --include("rabbit_framing.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). +-include("amqqueue.hrl"). %%---------------------------------------------------------------------------- @@ -532,8 +533,9 @@ init(Queue, Recover, Callback) -> fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end, fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end). -init(#amqqueue { name = QueueName, durable = IsDurable }, new, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> +init(Q, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) -> + QueueName = amqqueue:get_name(Q), + IsDurable = amqqueue:is_durable(Q), IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), VHost = QueueName#resource.virtual_host, @@ -547,8 +549,9 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, VHost), VHost); %% We can be recovering a transient queue if it crashed -init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> +init(Q, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) -> + QueueName = amqqueue:get_name(Q), + IsDurable = amqqueue:is_durable(Q), {PRef, RecoveryTerms} = process_recovery_terms(Terms), VHost = QueueName#resource.virtual_host, {PersistentClient, ContainsCheckFun} = @@ -620,7 +623,8 @@ delete_and_terminate(_Reason, State) -> rabbit_msg_store:client_delete_and_terminate(MSCStateT), a(State2 #vqstate { msg_store_clients = undefined }). -delete_crashed(#amqqueue{name = QName}) -> +delete_crashed(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), ok = rabbit_queue_index:erase(QName). purge(State = #vqstate { len = Len }) -> @@ -2825,7 +2829,7 @@ move_messages_to_vhost_store(Queues) -> %% Move the queue index for each persistent queue to the new store lists:foreach( fun(Queue) -> - #amqqueue{name = QueueName} = Queue, + QueueName = amqqueue:get_name(Queue), rabbit_queue_index:move_to_per_vhost_stores(QueueName) end, Queues), @@ -2938,17 +2942,16 @@ list_persistent_queues() -> Node = node(), mnesia:async_dirty( fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{name = Name, - pid = Pid} - <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node, - mnesia:read(rabbit_queue, Name, read) =:= []])) + qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue), + ?amqqueue_is_classic(Q), + amqqueue:qnode(Q) == Node, + mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= []])) end). read_old_recovery_terms([]) -> {[], [], ?EMPTY_START_FUN_STATE}; read_old_recovery_terms(Queues) -> - QueueNames = [Name || #amqqueue{name = Name} <- Queues], + QueueNames = [amqqueue:get_name(Q) || Q <- Queues], {AllTerms, StartFunState} = rabbit_queue_index:read_global_recovery_terms(QueueNames), Refs = [Ref || Terms <- AllTerms, Terms /= non_clean_shutdown, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index cf12d04ce8..75f96ac1eb 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -16,7 +16,7 @@ -module(rabbit_vhost). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -72,8 +72,8 @@ recover(VHost) -> ok = rabbit_file:ensure_dir(VHostStubFile), ok = file:write_file(VHostStubFile, VHost), Qs = rabbit_amqqueue:recover(VHost), - ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), - [QName || #amqqueue{name = QName} <- Qs]), + QNames = [amqqueue:get_name(Q) || Q <- Qs], + ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames), ok = rabbit_amqqueue:start(Qs), %% Start queue mirrors. ok = rabbit_mirror_queue_misc:on_vhost_up(VHost), @@ -142,8 +142,10 @@ delete(VHostPath, ActingUser) -> %% notifications which must be sent outside the TX rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]), QDelFun = fun (Q) -> rabbit_amqqueue:delete(Q, false, false, ActingUser) end, - [assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser) || - #amqqueue{name = Name} <- rabbit_amqqueue:list(VHostPath)], + [begin + Name = amqqueue:get_name(Q), + assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser) + end || Q <- rabbit_amqqueue:list(VHostPath)], [assert_benign(rabbit_exchange:delete(Name, false, ActingUser), ActingUser) || #exchange{name = Name} <- rabbit_exchange:list(VHostPath)], Funs = rabbit_misc:execute_mnesia_transaction( @@ -226,7 +228,8 @@ assert_benign({error, not_found}, _) -> ok; assert_benign({error, {absent, Q, _}}, ActingUser) -> %% Removing the mnesia entries here is safe. If/when the down node %% restarts, it will clear out the on-disk storage of the queue. - case rabbit_amqqueue:internal_delete(Q#amqqueue.name, ActingUser) of + QName = amqqueue:get_name(Q), + case rabbit_amqqueue:internal_delete(QName, ActingUser) of ok -> ok; {error, not_found} -> ok end. diff --git a/test/amqqueue_backward_compatibility_SUITE.erl b/test/amqqueue_backward_compatibility_SUITE.erl new file mode 100644 index 0000000000..05a049c9bb --- /dev/null +++ b/test/amqqueue_backward_compatibility_SUITE.erl @@ -0,0 +1,302 @@ +-module(amqqueue_backward_compatibility_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-include("amqqueue.hrl"). + +-export([all/0, + groups/0, + init_per_suite/2, + end_per_suite/2, + init_per_group/2, + end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, + + new_amqqueue_v1_is_amqqueue/1, + new_amqqueue_v2_is_amqqueue/1, + random_term_is_not_amqqueue/1, + + amqqueue_v1_is_durable/1, + amqqueue_v2_is_durable/1, + random_term_is_not_durable/1, + + amqqueue_v1_state_matching/1, + amqqueue_v2_state_matching/1, + random_term_state_matching/1, + + amqqueue_v1_type_matching/1, + amqqueue_v2_type_matching/1, + random_term_type_matching/1, + + upgrade_v1_to_v2/1 + ]). + +-define(long_tuple, {random_tuple, a, b, c, d, e, f, g, h, i, j, k, l, m, + n, o, p, q, r, s, t, u, v, w, x, y, z}). + +all() -> + [ + {group, parallel_tests} + ]. + +groups() -> + [ + {parallel_tests, [parallel], [new_amqqueue_v1_is_amqqueue, + new_amqqueue_v2_is_amqqueue, + random_term_is_not_amqqueue, + amqqueue_v1_is_durable, + amqqueue_v2_is_durable, + random_term_is_not_durable, + amqqueue_v1_state_matching, + amqqueue_v2_state_matching, + random_term_state_matching, + amqqueue_v1_type_matching, + amqqueue_v2_type_matching, + random_term_type_matching]} + ]. + +init_per_suite(_, Config) -> Config. +end_per_suite(_, Config) -> Config. + +init_per_group(_, Config) -> Config. +end_per_group(_, Config) -> Config. + +init_per_testcase(_, Config) -> Config. +end_per_testcase(_, Config) -> Config. + +new_amqqueue_v1_is_amqqueue(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?is_amqqueue(Queue)), + ?assert(?is_amqqueue_v1(Queue)), + ?assert(not ?is_amqqueue_v2(Queue)), + ?assert(?amqqueue_is_classic(Queue)), + ?assert(amqqueue:is_classic(Queue)), + ?assert(not ?amqqueue_is_quorum(Queue)), + ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)), + ?assert(?amqqueue_has_valid_pid(Queue)), + ?assert(?amqqueue_pid_equals(Queue, self())), + ?assert(?amqqueue_pids_are_equal(Queue, Queue)), + ?assert(?amqqueue_pid_runs_on_local_node(Queue)), + ?assert(amqqueue:qnode(Queue) == node()). + +new_amqqueue_v2_is_amqqueue(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2), + Queue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(?is_amqqueue(Queue)), + ?assert(?is_amqqueue_v2(Queue)), + ?assert(not ?is_amqqueue_v1(Queue)), + ?assert(?amqqueue_is_classic(Queue)), + ?assert(amqqueue:is_classic(Queue)), + ?assert(not ?amqqueue_is_quorum(Queue)), + ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)), + ?assert(?amqqueue_has_valid_pid(Queue)), + ?assert(?amqqueue_pid_equals(Queue, self())), + ?assert(?amqqueue_pids_are_equal(Queue, Queue)), + ?assert(?amqqueue_pid_runs_on_local_node(Queue)), + ?assert(amqqueue:qnode(Queue) == node()). + +random_term_is_not_amqqueue(_) -> + Term = ?long_tuple, + ?assert(not ?is_amqqueue(Term)), + ?assert(not ?is_amqqueue_v2(Term)), + ?assert(not ?is_amqqueue_v1(Term)). + +%% ------------------------------------------------------------------- + +amqqueue_v1_is_durable(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + TransientQueue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + DurableQueue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(not ?amqqueue_is_durable(TransientQueue)), + ?assert(?amqqueue_is_durable(DurableQueue)). + +amqqueue_v2_is_durable(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + TransientQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + classic), + DurableQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(not ?amqqueue_is_durable(TransientQueue)), + ?assert(?amqqueue_is_durable(DurableQueue)). + +random_term_is_not_durable(_) -> + Term = ?long_tuple, + ?assert(not ?amqqueue_is_durable(Term)). + +%% ------------------------------------------------------------------- + +amqqueue_v1_state_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue1 = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?amqqueue_state_is(Queue1, live)), + Queue2 = amqqueue:set_state(Queue1, stopped), + ?assert(?amqqueue_state_is(Queue2, stopped)). + +amqqueue_v2_state_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue1 = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(?amqqueue_state_is(Queue1, live)), + Queue2 = amqqueue:set_state(Queue1, stopped), + ?assert(?amqqueue_state_is(Queue2, stopped)). + +random_term_state_matching(_) -> + Term = ?long_tuple, + ?assert(not ?amqqueue_state_is(Term, live)). + +%% ------------------------------------------------------------------- + +amqqueue_v1_type_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?amqqueue_is_classic(Queue)), + ?assert(amqqueue:is_classic(Queue)), + ?assert(not ?amqqueue_is_quorum(Queue)). + +amqqueue_v2_type_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + ClassicQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(?amqqueue_is_classic(ClassicQueue)), + ?assert(amqqueue:is_classic(ClassicQueue)), + ?assert(not ?amqqueue_is_quorum(ClassicQueue)), + ?assert(not amqqueue:is_quorum(ClassicQueue)), + QuorumQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + quorum), + ?assert(not ?amqqueue_is_classic(QuorumQueue)), + ?assert(not amqqueue:is_classic(QuorumQueue)), + ?assert(?amqqueue_is_quorum(QuorumQueue)), + ?assert(amqqueue:is_quorum(QuorumQueue)). + +random_term_type_matching(_) -> + Term = ?long_tuple, + ?assert(not ?amqqueue_is_classic(Term)), + ?assert(not ?amqqueue_is_quorum(Term)), + ?assertException(error, function_clause, amqqueue:is_classic(Term)), + ?assertException(error, function_clause, amqqueue:is_quorum(Term)). + +%% ------------------------------------------------------------------- + +upgrade_v1_to_v2(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + OldQueue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?is_amqqueue_v1(OldQueue)), + ?assert(not ?is_amqqueue_v2(OldQueue)), + NewQueue = amqqueue:upgrade_to(amqqueue_v2, OldQueue), + ?assert(not ?is_amqqueue_v1(NewQueue)), + ?assert(?is_amqqueue_v2(NewQueue)). diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index 433bc66bff..bc9b2c8ced 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include("amqqueue.hrl"). -compile(export_all). @@ -686,11 +687,10 @@ bq_variable_queue_delete_msg_store_files_callback(Config) -> bq_variable_queue_delete_msg_store_files_callback1(Config) -> ok = restart_msg_store_empty(), - {new, #amqqueue { pid = QPid, name = QName } = Q} = - rabbit_amqqueue:declare( - queue_name(Config, - <<"bq_variable_queue_delete_msg_store_files_callback-q">>), - true, false, [], none, <<"acting-user">>), + QName0 = queue_name(Config, <<"bq_variable_queue_delete_msg_store_files_callback-q">>), + {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>), + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), Payload = <<0:8388608>>, %% 1MB Count = 30, publish_and_confirm(Q, Payload, Count), @@ -718,9 +718,10 @@ bq_queue_recover(Config) -> bq_queue_recover1(Config) -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), - {new, #amqqueue { pid = QPid, name = QName } = Q} = - rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>), - true, false, [], none, <<"acting-user">>), + QName0 = queue_name(Config, <<"bq_queue_recover-q">>), + {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>), + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), publish_and_confirm(Q, <<>>, Count), SupPid = get_queue_sup_pid(Q), @@ -736,7 +737,8 @@ bq_queue_recover1(Config) -> {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, - fun (Q1 = #amqqueue { pid = QPid1 }) -> + fun (Q1) when ?is_amqqueue(Q1) -> + QPid1 = amqqueue:get_pid(Q1), CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false, Limiter, @@ -752,7 +754,9 @@ bq_queue_recover1(Config) -> passed. %% Return the PID of the given queue's supervisor. -get_queue_sup_pid(#amqqueue { pid = QPid, name = QName }) -> +get_queue_sup_pid(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), VHost = QName#resource.virtual_host, {ok, AmqSup} = rabbit_amqqueue_sup_sup:find_for_vhost(VHost, node(QPid)), Sups = supervisor:which_children(AmqSup), @@ -1498,8 +1502,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> end, {VQ, []}, lists:seq(1, Count)). test_amqqueue(QName, Durable) -> - (rabbit_amqqueue:pseudo_queue(QName, self())) - #amqqueue { durable = Durable }. + rabbit_amqqueue:pseudo_queue(QName, self(), Durable). assert_prop(List, Prop, Value) -> case proplists:get_value(Prop, List)of diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl index 9bfa0ae07a..77da6133d8 100644 --- a/test/channel_operation_timeout_SUITE.erl +++ b/test/channel_operation_timeout_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include("amqqueue.hrl"). -compile([export_all]). @@ -169,9 +170,16 @@ get_consumers(Config, Node, VHost) when is_atom(Node), rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, consumers_all, [VHost]). -get_amqqueue(Q, []) -> throw({not_found, Q}); -get_amqqueue(Q, [AMQQ = #amqqueue{name = Q} | _]) -> AMQQ; -get_amqqueue(Q, [_| Rem]) -> get_amqqueue(Q, Rem). +get_amqqueue(QName0, []) -> + throw({not_found, QName0}); +get_amqqueue(QName0, [Q | Rem]) when ?is_amqqueue(Q) -> + QName1 = amqqueue:get_name(Q), + compare_amqqueue(QName0, QName1, Q, Rem). + +compare_amqqueue(QName, QName, Q, _Rem) -> + Q; +compare_amqqueue(QName, _, _, Rem) -> + get_amqqueue(QName, Rem). qconfig(Ch, Name, Ex, Consume, Deliver) -> [{ch, Ch}, {name, Name}, {ex,Ex}, {consume, Consume}, {deliver, Deliver}]. diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index c52dc9ef64..dc30825f8c 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include("include/amqqueue.hrl"). -compile(export_all). @@ -225,9 +226,9 @@ declare_on_dead_queue1(_Config, SecondaryNode) -> Self = self(), Pid = spawn(SecondaryNode, fun () -> - {new, #amqqueue{name = QueueName, pid = QPid}} = - rabbit_amqqueue:declare(QueueName, false, false, [], - none, <<"acting-user">>), + {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>), + QueueName = ?amqqueue_field_name(Q), + QPid = ?amqqueue_field_pid(Q), exit(QPid, kill), Self ! {self(), killed, QPid} end), @@ -269,12 +270,12 @@ must_exit(Fun) -> end. dead_queue_loop(QueueName, OldPid) -> - {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, - <<"acting-user">>), - case Q#amqqueue.pid of + {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>), + QPid = ?amqqueue_field_pid(Q), + case QPid of OldPid -> timer:sleep(25), dead_queue_loop(QueueName, OldPid); - _ -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + _ -> true = rabbit_misc:is_process_alive(QPid), Q end. diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index 120257feb9..5ae2fb687c 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -315,14 +315,14 @@ forget_offline_removes_things(Config) -> forget_promotes_offline_slave(Config) -> [A, B, C, D] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), ACh = rabbit_ct_client_helpers:open_channel(Config, A), - Q = <<"mirrored-queue">>, - declare(ACh, Q), - set_ha_policy(Config, Q, A, [B, C]), - set_ha_policy(Config, Q, A, [C, D]), %% Test add and remove from recoverable_slaves + QName = <<"mirrored-queue">>, + declare(ACh, QName), + set_ha_policy(Config, QName, A, [B, C]), + set_ha_policy(Config, QName, A, [C, D]), %% Test add and remove from recoverable_slaves %% Publish and confirm amqp_channel:call(ACh, #'confirm.select'{}), - amqp_channel:cast(ACh, #'basic.publish'{routing_key = Q}, + amqp_channel:cast(ACh, #'basic.publish'{routing_key = QName}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}}), amqp_channel:wait_for_confirms(ACh), @@ -353,26 +353,50 @@ forget_promotes_offline_slave(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, D), DCh2 = rabbit_ct_client_helpers:open_channel(Config, D), - #'queue.declare_ok'{message_count = 1} = declare(DCh2, Q), + #'queue.declare_ok'{message_count = 1} = declare(DCh2, QName), ok. -set_ha_policy(Config, Q, Master, Slaves) -> +set_ha_policy(Config, QName, Master, Slaves) -> Nodes = [list_to_binary(atom_to_list(N)) || N <- [Master | Slaves]], - rabbit_ct_broker_helpers:set_ha_policy(Config, Master, Q, - {<<"nodes">>, Nodes}), - await_slaves(Q, Master, Slaves). - -await_slaves(Q, Master, Slaves) -> - {ok, #amqqueue{pid = MPid, - slave_pids = SPids}} = - rpc:call(Master, rabbit_amqqueue, lookup, - [rabbit_misc:r(<<"/">>, queue, Q)]), - ActMaster = node(MPid), + HaPolicy = {<<"nodes">>, Nodes}, + rabbit_ct_broker_helpers:set_ha_policy(Config, Master, QName, HaPolicy), + await_slaves(QName, Master, Slaves). + +await_slaves(QName, Master, Slaves) -> + await_slaves_0(QName, Master, Slaves, 10). + +await_slaves_0(QName, Master, Slaves0, Tries) -> + {ok, Queue} = await_slaves_lookup_queue(QName, Master), + SPids = amqqueue:get_slave_pids(Queue), + ActMaster = amqqueue:qnode(Queue), ActSlaves = lists:usort([node(P) || P <- SPids]), - case {Master, lists:usort(Slaves)} of - {ActMaster, ActSlaves} -> ok; - _ -> timer:sleep(100), - await_slaves(Q, Master, Slaves) + Slaves1 = lists:usort(Slaves0), + await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves1, Tries). + +await_slaves_1(QName, _ActMaster, _ActSlaves, _Master, _Slaves, 0) -> + error({timeout_waiting_for_slaves, QName}); +await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves, Tries) -> + case {Master, Slaves} of + {ActMaster, ActSlaves} -> + ok; + _ -> + timer:sleep(250), + await_slaves_0(QName, Master, Slaves, Tries - 1) + end. + +await_slaves_lookup_queue(QName, Master) -> + await_slaves_lookup_queue(QName, Master, 10). + +await_slaves_lookup_queue(QName, _Master, 0) -> + error({timeout_looking_up_queue, QName}); +await_slaves_lookup_queue(QName, Master, Tries) -> + RpcArgs = [rabbit_misc:r(<<"/">>, queue, QName)], + case rpc:call(Master, rabbit_amqqueue, lookup, RpcArgs) of + {error, not_found} -> + timer:sleep(250), + await_slaves_lookup_queue(QName, Master, Tries - 1); + {ok, Q} -> + {ok, Q} end. force_boot(Config) -> diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl index 2d91083abc..7b8ec91346 100644 --- a/test/crashing_queues_SUITE.erl +++ b/test/crashing_queues_SUITE.erl @@ -217,9 +217,10 @@ kill_queue(Node, QName) -> await_new_pid(Node, QName, Pid1). queue_pid(Node, QName) -> - #amqqueue{pid = QPid, - state = State, - name = #resource{virtual_host = VHost}} = lookup(Node, QName), + Q = lookup(Node, QName), + QPid = amqqueue:get_pid(Q), + State = amqqueue:get_state(Q), + #resource{virtual_host = VHost} = amqqueue:get_name(Q), case State of crashed -> case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index 9db866e3c6..8a0ab98241 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -366,9 +366,9 @@ info_head_message_timestamp1(_Config) -> QName = rabbit_misc:r(<<"/">>, queue, <<"info_head_message_timestamp-queue">>), Q0 = rabbit_amqqueue:pseudo_queue(QName, self()), - Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 2}]}, + Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 2}]), PQ = rabbit_priority_queue, - BQS1 = PQ:init(Q, new, fun(_, _) -> ok end), + BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end), %% The queue is empty: no timestamp. true = PQ:is_empty(BQS1), '' = PQ:info(head_message_timestamp, BQS1), @@ -415,9 +415,9 @@ info_head_message_timestamp1(_Config) -> ram_duration(_Config) -> QName = rabbit_misc:r(<<"/">>, queue, <<"ram_duration-queue">>), Q0 = rabbit_amqqueue:pseudo_queue(QName, self()), - Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 5}]}, + Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 5}]), PQ = rabbit_priority_queue, - BQS1 = PQ:init(Q, new, fun(_, _) -> ok end), + BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end), {_Duration1, BQS2} = PQ:ram_duration(BQS1), BQS3 = PQ:set_ram_duration_target(infinity, BQS2), BQS4 = PQ:set_ram_duration_target(1, BQS3), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 9f40bc7b0c..0f9010f204 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -2228,11 +2228,10 @@ assert_queue_type(Server, Q, Expected) -> Actual = get_queue_type(Server, Q), Expected = Actual. -get_queue_type(Server, Q) -> - QNameRes = rabbit_misc:r(<<"/">>, queue, Q), - {ok, AMQQueue} = - rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), - AMQQueue#amqqueue.type. +get_queue_type(Server, Q0) -> + QNameRes = rabbit_misc:r(<<"/">>, queue, Q0), + {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), + amqqueue:get_type(Q1). wait_for_messages(Config, Stats) -> wait_for_messages(Config, lists:sort(Stats), 60). diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl index 7ae43aa7e1..42ac863ead 100644 --- a/test/rabbit_core_metrics_gc_SUITE.erl +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -364,11 +364,9 @@ cluster_queue_metrics(Config) -> % Synchronize Name = rabbit_misc:r(VHost, queue, QueueName), - [#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0, - ets, lookup, - [rabbit_queue, Name]), - ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, - sync_mirrors, [QPid]), + [Q] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, lookup, [rabbit_queue, Name]), + QPid = amqqueue:get_pid(Q), + ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, sync_mirrors, [QPid]), % Check ETS table for data wait_for(fun () -> diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl index e3e282f233..d2db382e30 100644 --- a/test/unit_inbroker_non_parallel_SUITE.erl +++ b/test/unit_inbroker_non_parallel_SUITE.erl @@ -568,7 +568,7 @@ head_message_timestamp1(_Config) -> QRes = rabbit_misc:r(<<"/">>, queue, QName), {ok, Q1} = rabbit_amqqueue:lookup(QRes), - QPid = Q1#amqqueue.pid, + QPid = amqqueue:get_pid(Q1), %% Set up event receiver for queue dummy_event_receiver:start(self(), [node()], [queue_stats]), diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index d028ed3ccf..f4f4971517 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -283,8 +283,7 @@ wait_for_confirms(Unconfirmed) -> end. test_amqqueue(Durable) -> - (rabbit_amqqueue:pseudo_queue(test_queue(), self())) - #amqqueue { durable = Durable }. + rabbit_amqqueue:pseudo_queue(test_queue(), self(), Durable). assert_prop(List, Prop, Value) -> case proplists:get_value(Prop, List)of @@ -709,7 +708,7 @@ head_message_timestamp1(_Config) -> QRes = rabbit_misc:r(<<"/">>, queue, QName), {ok, Q1} = rabbit_amqqueue:lookup(QRes), - QPid = Q1#amqqueue.pid, + QPid = amqqueue:get_pid(Q1), %% Set up event receiver for queue dummy_event_receiver:start(self(), [node()], [queue_stats]), @@ -958,7 +957,7 @@ confirms1(_Config) -> QName2 = DeclareBindDurableQueue(), %% Get the first one's pid (we'll crash it later) {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)), - QPid1 = Q1#amqqueue.pid, + QPid1 = amqqueue:get_pid(Q1), %% Enable confirms rabbit_channel:do(Ch, #'confirm.select'{}), receive diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl index 2de5819b54..5b6a6bd6ff 100644 --- a/test/vhost_SUITE.erl +++ b/test/vhost_SUITE.erl @@ -354,7 +354,7 @@ node_starts_with_dead_vhosts_and_ignore_slaves(Config) -> Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), - #amqqueue{sync_slave_pids = [Pid]} = Q, + [Pid] = amqqueue:get_sync_slave_pids(Q), Node1 = node(Pid), |
