summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/amqqueue.hrl113
-rw-r--r--include/amqqueue_v1.hrl20
-rw-r--r--include/amqqueue_v2.hrl22
-rw-r--r--src/amqqueue.erl682
-rw-r--r--src/amqqueue_v1.erl403
-rw-r--r--src/rabbit_amqqueue.erl605
-rw-r--r--src/rabbit_amqqueue_process.erl138
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl4
-rw-r--r--src/rabbit_backing_queue.erl273
-rw-r--r--src/rabbit_binding.erl10
-rw-r--r--src/rabbit_channel.erl51
-rw-r--r--src/rabbit_core_ff.erl53
-rw-r--r--src/rabbit_exchange.erl2
-rw-r--r--src/rabbit_fhc_helpers.erl6
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl29
-rw-r--r--src/rabbit_mirror_queue_master.erl90
-rw-r--r--src/rabbit_mirror_queue_misc.erl170
-rw-r--r--src/rabbit_mirror_queue_slave.erl150
-rw-r--r--src/rabbit_policy.erl88
-rw-r--r--src/rabbit_prequeue.erl25
-rw-r--r--src/rabbit_priority_queue.erl19
-rw-r--r--src/rabbit_queue_decorator.erl36
-rw-r--r--src/rabbit_queue_location_client_local.erl6
-rw-r--r--src/rabbit_queue_location_min_masters.erl5
-rw-r--r--src/rabbit_queue_location_random.erl5
-rw-r--r--src/rabbit_queue_location_validator.erl5
-rw-r--r--src/rabbit_queue_master_location_misc.erl31
-rw-r--r--src/rabbit_queue_master_locator.erl28
-rw-r--r--src/rabbit_quorum_queue.erl241
-rw-r--r--src/rabbit_table.erl10
-rw-r--r--src/rabbit_upgrade_functions.erl45
-rw-r--r--src/rabbit_variable_queue.erl31
-rw-r--r--src/rabbit_vhost.erl15
-rw-r--r--test/amqqueue_backward_compatibility_SUITE.erl302
-rw-r--r--test/backing_queue_SUITE.erl27
-rw-r--r--test/channel_operation_timeout_SUITE.erl14
-rw-r--r--test/cluster_SUITE.erl15
-rw-r--r--test/clustering_management_SUITE.erl66
-rw-r--r--test/crashing_queues_SUITE.erl7
-rw-r--r--test/priority_queue_SUITE.erl8
-rw-r--r--test/quorum_queue_SUITE.erl9
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl8
-rw-r--r--test/unit_inbroker_non_parallel_SUITE.erl2
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl7
-rw-r--r--test/vhost_SUITE.erl2
46 files changed, 3022 insertions, 858 deletions
diff --git a/include/amqqueue.hrl b/include/amqqueue.hrl
new file mode 100644
index 0000000000..d08a740d95
--- /dev/null
+++ b/include/amqqueue.hrl
@@ -0,0 +1,113 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+-include("amqqueue_v1.hrl").
+-include("amqqueue_v2.hrl").
+
+-define(is_amqqueue(Q),
+ (?is_amqqueue_v2(Q) orelse
+ ?is_amqqueue_v1(Q))).
+
+-define(amqqueue_is_auto_delete(Q),
+ ((?is_amqqueue_v2(Q) andalso
+ ?amqqueue_v2_field_auto_delete(Q) =:= true) orelse
+ (?is_amqqueue_v1(Q) andalso
+ ?amqqueue_v1_field_auto_delete(Q) =:= true))).
+
+-define(amqqueue_is_durable(Q),
+ ((?is_amqqueue_v2(Q) andalso
+ ?amqqueue_v2_field_durable(Q) =:= true) orelse
+ (?is_amqqueue_v1(Q) andalso
+ ?amqqueue_v1_field_durable(Q) =:= true))).
+
+-define(amqqueue_exclusive_owner_is(Q, Owner),
+ ((?is_amqqueue_v2(Q) andalso
+ ?amqqueue_v2_field_exclusive_owner(Q) =:= Owner) orelse
+ (?is_amqqueue_v1(Q) andalso
+ ?amqqueue_v1_field_exclusive_owner(Q) =:= Owner))).
+
+-define(amqqueue_exclusive_owner_is_pid(Q),
+ ((?is_amqqueue_v2(Q) andalso
+ is_pid(?amqqueue_v2_field_exclusive_owner(Q))) orelse
+ (?is_amqqueue_v1(Q) andalso
+ is_pid(?amqqueue_v1_field_exclusive_owner(Q))))).
+
+-define(amqqueue_state_is(Q, State),
+ ((?is_amqqueue_v2(Q) andalso
+ ?amqqueue_v2_field_state(Q) =:= State) orelse
+ (?is_amqqueue_v1(Q) andalso
+ ?amqqueue_v1_field_state(Q) =:= State))).
+
+-define(amqqueue_v1_type, classic).
+
+-define(amqqueue_is_classic(Q),
+ ((?is_amqqueue_v2(Q) andalso
+ ?amqqueue_v2_field_type(Q) =:= classic) orelse
+ ?is_amqqueue_v1(Q))).
+
+-define(amqqueue_is_quorum(Q),
+ (?is_amqqueue_v2(Q) andalso
+ ?amqqueue_v2_field_type(Q) =:= quorum) orelse
+ false).
+
+-define(amqqueue_has_valid_pid(Q),
+ ((?is_amqqueue_v2(Q) andalso
+ is_pid(?amqqueue_v2_field_pid(Q))) orelse
+ (?is_amqqueue_v1(Q) andalso
+ is_pid(?amqqueue_v1_field_pid(Q))))).
+
+-define(amqqueue_pid_runs_on_local_node(Q),
+ ((?is_amqqueue_v2(Q) andalso
+ node(?amqqueue_v2_field_pid(Q)) =:= node()) orelse
+ (?is_amqqueue_v1(Q) andalso
+ node(?amqqueue_v1_field_pid(Q)) =:= node()))).
+
+-define(amqqueue_pid_equals(Q, Pid),
+ ((?is_amqqueue_v2(Q) andalso
+ ?amqqueue_v2_field_pid(Q) =:= Pid) orelse
+ (?is_amqqueue_v1(Q) andalso
+ ?amqqueue_v1_field_pid(Q) =:= Pid))).
+
+-define(amqqueue_pids_are_equal(Q0, Q1),
+ ((?is_amqqueue_v2(Q0) andalso ?is_amqqueue_v2(Q1) andalso
+ ?amqqueue_v2_field_pid(Q0) =:= ?amqqueue_v2_field_pid(Q1)) orelse
+ (?is_amqqueue_v1(Q0) andalso ?is_amqqueue_v1(Q1) andalso
+ ?amqqueue_v1_field_pid(Q0) =:= ?amqqueue_v1_field_pid(Q1)))).
+
+-define(amqqueue_field_name(Q),
+ case ?is_amqqueue_v2(Q) of
+ true -> ?amqqueue_v2_field_name(Q);
+ false -> case ?is_amqqueue_v1(Q) of
+ true -> ?amqqueue_v1_field_name(Q)
+ end
+ end).
+
+-define(amqqueue_field_pid(Q),
+ case ?is_amqqueue_v2(Q) of
+ true -> ?amqqueue_v2_field_pid(Q);
+ false -> case ?is_amqqueue_v1(Q) of
+ true -> ?amqqueue_v1_field_pid(Q)
+ end
+ end).
+
+-define(amqqueue_v1_vhost(Q), element(2, ?amqqueue_v1_field_name(Q))).
+-define(amqqueue_v2_vhost(Q), element(2, ?amqqueue_v2_field_name(Q))).
+
+-define(amqqueue_vhost_equals(Q, VHost),
+ ((?is_amqqueue_v2(Q) andalso
+ ?amqqueue_v2_vhost(Q) =:= VHost) orelse
+ (?is_amqqueue_v1(Q) andalso
+ ?amqqueue_v1_vhost(Q) =:= VHost))).
diff --git a/include/amqqueue_v1.hrl b/include/amqqueue_v1.hrl
new file mode 100644
index 0000000000..04b2d72850
--- /dev/null
+++ b/include/amqqueue_v1.hrl
@@ -0,0 +1,20 @@
+-define(is_amqqueue_v1(Q), is_record(Q, amqqueue, 19)).
+
+-define(amqqueue_v1_field_name(Q), element(2, Q)).
+-define(amqqueue_v1_field_durable(Q), element(3, Q)).
+-define(amqqueue_v1_field_auto_delete(Q), element(4, Q)).
+-define(amqqueue_v1_field_exclusive_owner(Q), element(5, Q)).
+-define(amqqueue_v1_field_arguments(Q), element(6, Q)).
+-define(amqqueue_v1_field_pid(Q), element(7, Q)).
+-define(amqqueue_v1_field_slave_pids(Q), element(8, Q)).
+-define(amqqueue_v1_field_sync_slave_pids(Q), element(9, Q)).
+-define(amqqueue_v1_field_recoverable_slaves(Q), element(10, Q)).
+-define(amqqueue_v1_field_policy(Q), element(11, Q)).
+-define(amqqueue_v1_field_operator_policy(Q), element(12, Q)).
+-define(amqqueue_v1_field_gm_pids(Q), element(13, Q)).
+-define(amqqueue_v1_field_decorators(Q), element(14, Q)).
+-define(amqqueue_v1_field_state(Q), element(15, Q)).
+-define(amqqueue_v1_field_policy_version(Q), element(16, Q)).
+-define(amqqueue_v1_field_slave_pids_pending_shutdown(Q), element(17, Q)).
+-define(amqqueue_v1_field_vhost(Q), element(18, Q)).
+-define(amqqueue_v1_field_options(Q), element(19, Q)).
diff --git a/include/amqqueue_v2.hrl b/include/amqqueue_v2.hrl
new file mode 100644
index 0000000000..37cd7ba2a8
--- /dev/null
+++ b/include/amqqueue_v2.hrl
@@ -0,0 +1,22 @@
+-define(is_amqqueue_v2(Q), is_record(Q, amqqueue, 21)).
+
+-define(amqqueue_v2_field_name(Q), element(2, Q)).
+-define(amqqueue_v2_field_durable(Q), element(3, Q)).
+-define(amqqueue_v2_field_auto_delete(Q), element(4, Q)).
+-define(amqqueue_v2_field_exclusive_owner(Q), element(5, Q)).
+-define(amqqueue_v2_field_arguments(Q), element(6, Q)).
+-define(amqqueue_v2_field_pid(Q), element(7, Q)).
+-define(amqqueue_v2_field_slave_pids(Q), element(8, Q)).
+-define(amqqueue_v2_field_sync_slave_pids(Q), element(9, Q)).
+-define(amqqueue_v2_field_recoverable_slaves(Q), element(10, Q)).
+-define(amqqueue_v2_field_policy(Q), element(11, Q)).
+-define(amqqueue_v2_field_operator_policy(Q), element(12, Q)).
+-define(amqqueue_v2_field_gm_pids(Q), element(13, Q)).
+-define(amqqueue_v2_field_decorators(Q), element(14, Q)).
+-define(amqqueue_v2_field_state(Q), element(15, Q)).
+-define(amqqueue_v2_field_policy_version(Q), element(16, Q)).
+-define(amqqueue_v2_field_slave_pids_pending_shutdown(Q), element(17, Q)).
+-define(amqqueue_v2_field_vhost(Q), element(18, Q)).
+-define(amqqueue_v2_field_options(Q), element(19, Q)).
+-define(amqqueue_v2_field_type(Q), element(20, Q)).
+-define(amqqueue_v2_field_quorum_nodes(Q), element(21, Q)).
diff --git a/src/amqqueue.erl b/src/amqqueue.erl
new file mode 100644
index 0000000000..83b65cd048
--- /dev/null
+++ b/src/amqqueue.erl
@@ -0,0 +1,682 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(amqqueue). %% Could become amqqueue_v2 in the future.
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
+
+-export([new/9,
+ new_with_version/10,
+ fields/0,
+ fields/1,
+ field_vhost/0,
+ record_version_to_use/0,
+ upgrade/1,
+ upgrade_to/2,
+ % arguments
+ get_arguments/1,
+ set_arguments/2,
+ % decorators
+ get_decorators/1,
+ set_decorators/2,
+ % exclusive_owner
+ get_exclusive_owner/1,
+ % gm_pids
+ get_gm_pids/1,
+ set_gm_pids/2,
+ get_leader/1,
+ % name (#resource)
+ get_name/1,
+ set_name/2,
+ % operator_policy
+ get_operator_policy/1,
+ set_operator_policy/2,
+ get_options/1,
+ % pid
+ get_pid/1,
+ set_pid/2,
+ % policy
+ get_policy/1,
+ set_policy/2,
+ % policy_version
+ get_policy_version/1,
+ set_policy_version/2,
+ % quorum_nodes
+ get_quorum_nodes/1,
+ set_quorum_nodes/2,
+ % recoverable_slaves
+ get_recoverable_slaves/1,
+ set_recoverable_slaves/2,
+ % slave_pids
+ get_slave_pids/1,
+ set_slave_pids/2,
+ % slave_pids_pending_shutdown
+ get_slave_pids_pending_shutdown/1,
+ set_slave_pids_pending_shutdown/2,
+ % state
+ get_state/1,
+ set_state/2,
+ % sync_slave_pids
+ get_sync_slave_pids/1,
+ set_sync_slave_pids/2,
+ get_type/1,
+ get_vhost/1,
+ is_amqqueue/1,
+ is_auto_delete/1,
+ is_durable/1,
+ is_classic/1,
+ is_quorum/1,
+ pattern_match_all/0,
+ pattern_match_on_name/1,
+ pattern_match_on_type/1,
+ reset_mirroring_and_decorators/1,
+ set_immutable/1,
+ qnode/1,
+ macros/0]).
+
+-define(record_version, amqqueue_v2).
+
+-record(amqqueue, {
+ name :: rabbit_amqqueue:name() | '_', %% immutable
+ durable :: boolean() | '_', %% immutable
+ auto_delete :: boolean() | '_', %% immutable
+ exclusive_owner = none :: pid() | none | '_', %% immutable
+ arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable
+ pid :: pid() | ra_server_id() | none | '_', %% durable (just so we
+ %% know home node)
+ slave_pids = [] :: [pid()] | none | '_', %% transient
+ sync_slave_pids = [] :: [pid()] | none| '_',%% transient
+ recoverable_slaves = [] :: [atom()] | none | '_', %% durable
+ policy :: binary() | none | undefined | '_', %% durable, implicit
+ %% update as above
+ operator_policy :: binary() | none | undefined | '_', %% durable,
+ %% implicit
+ %% update
+ %% as above
+ gm_pids = [] :: [pid()] | none | '_', %% transient
+ decorators :: [atom()] | none | undefined | '_', %% transient,
+ %% recalculated
+ %% as above
+ state = live :: atom() | none | '_', %% durable (have we crashed?)
+ policy_version = 0 :: non_neg_integer() | '_',
+ slave_pids_pending_shutdown = [] :: [pid()] | '_',
+ vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index
+ options = #{} :: map() | '_',
+ type = ?amqqueue_v1_type :: atom() | '_',
+ quorum_nodes = [] :: [node()] | '_'
+ }).
+
+-type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2().
+-type amqqueue_v2() :: #amqqueue{
+ name :: rabbit_amqqueue:name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: pid() | none,
+ arguments :: rabbit_framing:amqp_table(),
+ pid :: pid() | ra_server_id() | none,
+ slave_pids :: [pid()] | none,
+ sync_slave_pids :: [pid()] | none,
+ recoverable_slaves :: [atom()] | none,
+ policy :: binary() | none | undefined,
+ operator_policy :: binary() | none | undefined,
+ gm_pids :: [pid()] | none,
+ decorators :: [atom()] | none | undefined,
+ state :: atom() | none,
+ policy_version :: non_neg_integer(),
+ slave_pids_pending_shutdown :: [pid()],
+ vhost :: rabbit_types:vhost() | undefined,
+ options :: map(),
+ type :: atom(),
+ quorum_nodes :: [node()]
+ }.
+
+-type ra_server_id() :: {Name :: atom(), Node :: node()}.
+
+-type amqqueue_pattern() :: amqqueue_v1:amqqueue_v1_pattern() |
+ amqqueue_v2_pattern().
+-type amqqueue_v2_pattern() :: #amqqueue{
+ name :: rabbit_amqqueue:name() | '_',
+ durable :: '_',
+ auto_delete :: '_',
+ exclusive_owner :: '_',
+ arguments :: '_',
+ pid :: '_',
+ slave_pids :: '_',
+ sync_slave_pids :: '_',
+ recoverable_slaves :: '_',
+ policy :: '_',
+ operator_policy :: '_',
+ gm_pids :: '_',
+ decorators :: '_',
+ state :: '_',
+ policy_version :: '_',
+ slave_pids_pending_shutdown :: '_',
+ vhost :: '_',
+ options :: '_',
+ type :: atom() | '_',
+ quorum_nodes :: '_'
+ }.
+
+-export_type([amqqueue/0,
+ amqqueue_v2/0,
+ amqqueue_pattern/0,
+ amqqueue_v2_pattern/0,
+ ra_server_id/0]).
+
+-spec new(rabbit_amqqueue:name(),
+ pid() | ra_server_id() | none,
+ boolean(),
+ boolean(),
+ pid() | none,
+ rabbit_framing:amqp_table(),
+ rabbit_types:vhost() | undefined,
+ map(),
+ atom()) -> amqqueue().
+
+new(#resource{kind = queue} = Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options,
+ Type)
+ when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso
+ is_boolean(Durable) andalso
+ is_boolean(AutoDelete) andalso
+ (is_pid(Owner) orelse Owner =:= none) andalso
+ is_list(Args) andalso
+ (is_binary(VHost) orelse VHost =:= undefined) andalso
+ is_map(Options) andalso
+ is_atom(Type) ->
+ case record_version_to_use() of
+ ?record_version ->
+ new_with_version(
+ ?record_version,
+ Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options,
+ Type);
+ _ ->
+ amqqueue_v1:new(
+ Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options)
+ end.
+
+-spec new_with_version
+(amqqueue_v1 | amqqueue_v2,
+ rabbit_amqqueue:name(),
+ pid() | ra_server_id() | none,
+ boolean(),
+ boolean(),
+ pid() | none,
+ rabbit_framing:amqp_table(),
+ rabbit_types:vhost() | undefined,
+ map(),
+ atom()) -> amqqueue().
+
+new_with_version(?record_version,
+ #resource{kind = queue} = Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options,
+ Type)
+ when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso
+ is_boolean(Durable) andalso
+ is_boolean(AutoDelete) andalso
+ (is_pid(Owner) orelse Owner =:= none) andalso
+ is_list(Args) andalso
+ (is_binary(VHost) orelse VHost =:= undefined) andalso
+ is_map(Options) andalso
+ is_atom(Type) ->
+ #amqqueue{name = Name,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = Pid,
+ vhost = VHost,
+ options = Options,
+ type = Type};
+new_with_version(Version,
+ Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options,
+ ?amqqueue_v1_type) ->
+ amqqueue_v1:new_with_version(
+ Version,
+ Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options).
+
+-spec is_amqqueue(any()) -> boolean().
+
+is_amqqueue(#amqqueue{}) -> true;
+is_amqqueue(Queue) -> amqqueue_v1:is_amqqueue(Queue).
+
+-spec record_version_to_use() -> amqqueue_v1 | amqqueue_v2.
+
+record_version_to_use() ->
+ case rabbit_feature_flags:is_enabled(quorum_queue) of
+ true -> ?record_version;
+ false -> amqqueue_v1:record_version_to_use()
+ end.
+
+-spec upgrade(amqqueue()) -> amqqueue().
+
+upgrade(#amqqueue{} = Queue) -> Queue;
+upgrade(OldQueue) -> upgrade_to(record_version_to_use(), OldQueue).
+
+-spec upgrade_to
+(amqqueue_v2, amqqueue()) -> amqqueue_v2();
+(amqqueue_v1, amqqueue_v1:amqqueue_v1()) -> amqqueue_v1:amqqueue_v1().
+
+upgrade_to(?record_version, #amqqueue{} = Queue) ->
+ Queue;
+upgrade_to(?record_version, OldQueue) ->
+ Fields = erlang:tuple_to_list(OldQueue) ++ [?amqqueue_v1_type,
+ undefined],
+ #amqqueue{} = erlang:list_to_tuple(Fields);
+upgrade_to(Version, OldQueue) ->
+ amqqueue_v1:upgrade_to(Version, OldQueue).
+
+% arguments
+
+-spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table().
+
+get_arguments(#amqqueue{arguments = Args}) ->
+ Args;
+get_arguments(Queue) ->
+ amqqueue_v1:get_arguments(Queue).
+
+-spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue().
+
+set_arguments(#amqqueue{} = Queue, Args) ->
+ Queue#amqqueue{arguments = Args};
+set_arguments(Queue, Args) ->
+ amqqueue_v1:set_arguments(Queue, Args).
+
+% decorators
+
+-spec get_decorators(amqqueue()) -> [atom()] | none | undefined.
+
+get_decorators(#amqqueue{decorators = Decorators}) ->
+ Decorators;
+get_decorators(Queue) ->
+ amqqueue_v1:get_decorators(Queue).
+
+-spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue().
+
+set_decorators(#amqqueue{} = Queue, Decorators) ->
+ Queue#amqqueue{decorators = Decorators};
+set_decorators(Queue, Decorators) ->
+ amqqueue_v1:set_decorators(Queue, Decorators).
+
+-spec get_exclusive_owner(amqqueue()) -> pid() | none.
+
+get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) ->
+ Owner;
+get_exclusive_owner(Queue) ->
+ amqqueue_v1:get_exclusive_owner(Queue).
+
+-spec get_gm_pids(amqqueue()) -> [pid()] | none.
+
+get_gm_pids(#amqqueue{gm_pids = GMPids}) ->
+ GMPids;
+get_gm_pids(Queue) ->
+ amqqueue_v1:get_gm_pids(Queue).
+
+-spec set_gm_pids(amqqueue(), [pid()] | none) -> amqqueue().
+
+set_gm_pids(#amqqueue{} = Queue, GMPids) ->
+ Queue#amqqueue{gm_pids = GMPids};
+set_gm_pids(Queue, GMPids) ->
+ amqqueue_v1:set_gm_pids(Queue, GMPids).
+
+-spec get_leader(amqqueue_v2()) -> node().
+
+get_leader(#amqqueue{type = quorum, pid = {_, Leader}}) -> Leader.
+
+% operator_policy
+
+-spec get_operator_policy(amqqueue()) -> binary() | none | undefined.
+
+get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy;
+get_operator_policy(Queue) -> amqqueue_v1:get_operator_policy(Queue).
+
+-spec set_operator_policy(amqqueue(), binary() | none | undefined) ->
+ amqqueue().
+
+set_operator_policy(#amqqueue{} = Queue, Policy) ->
+ Queue#amqqueue{operator_policy = Policy};
+set_operator_policy(Queue, Policy) ->
+ amqqueue_v1:set_operator_policy(Queue, Policy).
+
+% name
+
+-spec get_name(amqqueue()) -> rabbit_amqqueue:name().
+
+get_name(#amqqueue{name = Name}) -> Name;
+get_name(Queue) -> amqqueue_v1:get_name(Queue).
+
+-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue().
+
+set_name(#amqqueue{} = Queue, Name) ->
+ Queue#amqqueue{name = Name};
+set_name(Queue, Name) ->
+ amqqueue_v1:set_name(Queue, Name).
+
+-spec get_options(amqqueue()) -> map().
+
+get_options(#amqqueue{options = Options}) -> Options;
+get_options(Queue) -> amqqueue_v1:get_options(Queue).
+
+% pid
+
+-spec get_pid
+(amqqueue_v2()) -> pid() | ra_server_id() | none;
+(amqqueue_v1:amqqueue_v1()) -> pid() | none.
+
+get_pid(#amqqueue{pid = Pid}) -> Pid;
+get_pid(Queue) -> amqqueue_v1:get_pid(Queue).
+
+-spec set_pid
+(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2();
+(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1().
+
+set_pid(#amqqueue{} = Queue, Pid) ->
+ Queue#amqqueue{pid = Pid};
+set_pid(Queue, Pid) ->
+ amqqueue_v1:set_pid(Queue, Pid).
+
+% policy
+
+-spec get_policy(amqqueue()) -> binary() | none | undefined.
+
+get_policy(#amqqueue{policy = Policy}) -> Policy;
+get_policy(Queue) -> amqqueue_v1:get_policy(Queue).
+
+-spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue().
+
+set_policy(#amqqueue{} = Queue, Policy) ->
+ Queue#amqqueue{policy = Policy};
+set_policy(Queue, Policy) ->
+ amqqueue_v1:set_policy(Queue, Policy).
+
+% policy_version
+
+-spec get_policy_version(amqqueue()) -> non_neg_integer().
+
+get_policy_version(#amqqueue{policy_version = PV}) ->
+ PV;
+get_policy_version(Queue) ->
+ amqqueue_v1:get_policy_version(Queue).
+
+-spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue().
+
+set_policy_version(#amqqueue{} = Queue, PV) ->
+ Queue#amqqueue{policy_version = PV};
+set_policy_version(Queue, PV) ->
+ amqqueue_v1:set_policy_version(Queue, PV).
+
+% recoverable_slaves
+
+-spec get_recoverable_slaves(amqqueue()) -> [atom()] | none.
+
+get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) ->
+ Slaves;
+get_recoverable_slaves(Queue) ->
+ amqqueue_v1:get_recoverable_slaves(Queue).
+
+-spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue().
+
+set_recoverable_slaves(#amqqueue{} = Queue, Slaves) ->
+ Queue#amqqueue{recoverable_slaves = Slaves};
+set_recoverable_slaves(Queue, Slaves) ->
+ amqqueue_v1:set_recoverable_slaves(Queue, Slaves).
+
+% quorum_nodes (new in v2)
+
+-spec get_quorum_nodes(amqqueue()) -> [node()].
+
+get_quorum_nodes(#amqqueue{quorum_nodes = Nodes}) -> Nodes;
+get_quorum_nodes(_) -> [].
+
+-spec set_quorum_nodes(amqqueue(), [node()]) -> amqqueue().
+
+set_quorum_nodes(#amqqueue{} = Queue, Nodes) ->
+ Queue#amqqueue{quorum_nodes = Nodes};
+set_quorum_nodes(Queue, _Nodes) ->
+ Queue.
+
+% slave_pids
+
+-spec get_slave_pids(amqqueue()) -> [pid()] | none.
+
+get_slave_pids(#amqqueue{slave_pids = Slaves}) ->
+ Slaves;
+get_slave_pids(Queue) ->
+ amqqueue_v1:get_slave_pids(Queue).
+
+-spec set_slave_pids(amqqueue(), [pid()] | none) -> amqqueue().
+
+set_slave_pids(#amqqueue{} = Queue, SlavePids) ->
+ Queue#amqqueue{slave_pids = SlavePids};
+set_slave_pids(Queue, SlavePids) ->
+ amqqueue_v1:set_slave_pids(Queue, SlavePids).
+
+% slave_pids_pending_shutdown
+
+-spec get_slave_pids_pending_shutdown(amqqueue()) -> [pid()].
+
+get_slave_pids_pending_shutdown(#amqqueue{slave_pids_pending_shutdown = Slaves}) ->
+ Slaves;
+get_slave_pids_pending_shutdown(Queue) ->
+ amqqueue_v1:get_slave_pids_pending_shutdown(Queue).
+
+-spec set_slave_pids_pending_shutdown(amqqueue(), [pid()]) -> amqqueue().
+
+set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) ->
+ Queue#amqqueue{slave_pids_pending_shutdown = SlavePids};
+set_slave_pids_pending_shutdown(Queue, SlavePids) ->
+ amqqueue_v1:set_slave_pids_pending_shutdown(Queue, SlavePids).
+
+% state
+
+-spec get_state(amqqueue()) -> atom() | none.
+
+get_state(#amqqueue{state = State}) -> State;
+get_state(Queue) -> amqqueue_v1:get_state(Queue).
+
+-spec set_state(amqqueue(), atom() | none) -> amqqueue().
+
+set_state(#amqqueue{} = Queue, State) ->
+ Queue#amqqueue{state = State};
+set_state(Queue, State) ->
+ amqqueue_v1:set_state(Queue, State).
+
+% sync_slave_pids
+
+-spec get_sync_slave_pids(amqqueue()) -> [pid()] | none.
+
+get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) ->
+ Pids;
+get_sync_slave_pids(Queue) ->
+ amqqueue_v1:get_sync_slave_pids(Queue).
+
+-spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue().
+
+set_sync_slave_pids(#amqqueue{} = Queue, Pids) ->
+ Queue#amqqueue{sync_slave_pids = Pids};
+set_sync_slave_pids(Queue, Pids) ->
+ amqqueue_v1:set_sync_slave_pids(Queue, Pids).
+
+%% New in v2.
+
+-spec get_type(amqqueue()) -> atom().
+
+get_type(#amqqueue{type = Type}) -> Type;
+get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type.
+
+-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined.
+
+get_vhost(#amqqueue{vhost = VHost}) -> VHost;
+get_vhost(Queue) -> amqqueue_v1:get_vhost(Queue).
+
+-spec is_auto_delete(amqqueue()) -> boolean().
+
+is_auto_delete(#amqqueue{auto_delete = AutoDelete}) ->
+ AutoDelete;
+is_auto_delete(Queue) ->
+ amqqueue_v1:is_auto_delete(Queue).
+
+-spec is_durable(amqqueue()) -> boolean().
+
+is_durable(#amqqueue{durable = Durable}) -> Durable;
+is_durable(Queue) -> amqqueue_v1:is_durable(Queue).
+
+-spec is_classic(amqqueue()) -> boolean().
+
+is_classic(Queue) ->
+ get_type(Queue) =:= ?amqqueue_v1_type.
+
+-spec is_quorum(amqqueue()) -> boolean().
+
+is_quorum(Queue) ->
+ get_type(Queue) =:= quorum.
+
+fields() ->
+ case record_version_to_use() of
+ ?record_version -> fields(?record_version);
+ _ -> amqqueue_v1:fields()
+ end.
+
+fields(?record_version) -> record_info(fields, amqqueue);
+fields(Version) -> amqqueue_v1:fields(Version).
+
+field_vhost() ->
+ case record_version_to_use() of
+ ?record_version -> #amqqueue.vhost;
+ _ -> amqqueue_v1:field_vhost()
+ end.
+
+-spec pattern_match_all() -> amqqueue_pattern().
+
+pattern_match_all() ->
+ case record_version_to_use() of
+ ?record_version -> #amqqueue{_ = '_'};
+ _ -> amqqueue_v1:pattern_match_all()
+ end.
+
+-spec pattern_match_on_name(rabbit_amqqueue:name()) -> amqqueue_pattern().
+
+pattern_match_on_name(Name) ->
+ case record_version_to_use() of
+ ?record_version -> #amqqueue{name = Name, _ = '_'};
+ _ -> amqqueue_v1:pattern_match_on_name(Name)
+ end.
+
+-spec pattern_match_on_type(atom()) -> amqqueue_pattern().
+
+pattern_match_on_type(Type) ->
+ case record_version_to_use() of
+ ?record_version -> #amqqueue{type = Type, _ = '_'};
+ _ when Type =:= classic -> amqqueue_v1:pattern_match_all();
+ %% FIXME: We try a pattern which should never match when the
+ %% `quorum_queue` feature flag is not enabled yet. Is there
+ %% a better solution?
+ _ -> amqqueue_v1:pattern_match_on_name(
+ rabbit_misc:r(<<0>>, queue, <<0>>))
+ end.
+
+-spec reset_mirroring_and_decorators(amqqueue()) -> amqqueue().
+
+reset_mirroring_and_decorators(#amqqueue{} = Queue) ->
+ Queue#amqqueue{slave_pids = [],
+ sync_slave_pids = [],
+ gm_pids = [],
+ decorators = undefined};
+reset_mirroring_and_decorators(Queue) ->
+ amqqueue_v1:reset_mirroring_and_decorators(Queue).
+
+-spec set_immutable(amqqueue()) -> amqqueue().
+
+set_immutable(#amqqueue{} = Queue) ->
+ Queue#amqqueue{pid = none,
+ slave_pids = [],
+ sync_slave_pids = none,
+ recoverable_slaves = none,
+ gm_pids = none,
+ policy = none,
+ decorators = none,
+ state = none};
+set_immutable(Queue) ->
+ amqqueue_v1:set_immutable(Queue).
+
+-spec qnode(amqqueue() | pid() | ra_server_id()) -> node().
+
+qnode(Queue) when ?is_amqqueue(Queue) ->
+ QPid = get_pid(Queue),
+ qnode(QPid);
+qnode(QPid) when is_pid(QPid) ->
+ node(QPid);
+qnode({_, Node}) ->
+ Node.
+
+% private
+
+macros() ->
+ io:format(
+ "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n",
+ [?record_version, record_info(size, amqqueue)]),
+ %% The field number starts at 2 because the first element is the
+ %% record name.
+ macros(record_info(fields, amqqueue), 2).
+
+macros([Field | Rest], I) ->
+ io:format(
+ "-define(~s_field_~s(Q), element(~b, Q)).~n",
+ [?record_version, Field, I]),
+ macros(Rest, I + 1);
+macros([], _) ->
+ ok.
diff --git a/src/amqqueue_v1.erl b/src/amqqueue_v1.erl
new file mode 100644
index 0000000000..0b35418d1d
--- /dev/null
+++ b/src/amqqueue_v1.erl
@@ -0,0 +1,403 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(amqqueue_v1).
+
+-include_lib("rabbit_common/include/resource.hrl").
+
+-export([new/8,
+ new_with_version/9,
+ fields/0,
+ fields/1,
+ field_vhost/0,
+ record_version_to_use/0,
+ upgrade/1,
+ upgrade_to/2,
+ % arguments
+ get_arguments/1,
+ set_arguments/2,
+ % decorators
+ get_decorators/1,
+ set_decorators/2,
+ % exclusive_owner
+ get_exclusive_owner/1,
+ % gm_pids
+ get_gm_pids/1,
+ set_gm_pids/2,
+ % name
+ get_name/1,
+ set_name/2,
+ % operator_policy
+ get_operator_policy/1,
+ set_operator_policy/2,
+ get_options/1,
+ % pid
+ get_pid/1,
+ set_pid/2,
+ % policy
+ get_policy/1,
+ set_policy/2,
+ % policy_version
+ get_policy_version/1,
+ set_policy_version/2,
+ % recoverable_slaves
+ get_recoverable_slaves/1,
+ set_recoverable_slaves/2,
+ % slave_pids
+ get_slave_pids/1,
+ set_slave_pids/2,
+ % slave_pids_pending_shutdown
+ get_slave_pids_pending_shutdown/1,
+ set_slave_pids_pending_shutdown/2,
+ % state
+ get_state/1,
+ set_state/2,
+ % sync_slave_pids
+ get_sync_slave_pids/1,
+ set_sync_slave_pids/2,
+ get_vhost/1,
+ is_amqqueue/1,
+ is_auto_delete/1,
+ is_durable/1,
+ pattern_match_all/0,
+ pattern_match_on_name/1,
+ reset_mirroring_and_decorators/1,
+ set_immutable/1,
+ macros/0]).
+
+-define(record_version, ?MODULE).
+
+-record(amqqueue, {
+ name :: rabbit_amqqueue:name() | '_', %% immutable
+ durable :: boolean() | '_', %% immutable
+ auto_delete :: boolean() | '_', %% immutable
+ exclusive_owner = none :: pid() | none | '_', %% immutable
+ arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable
+ pid :: pid() | none | '_', %% durable (just so we
+ %% know home node)
+ slave_pids = [] :: [pid()] | none | '_', %% transient
+ sync_slave_pids = [] :: [pid()] | none| '_',%% transient
+ recoverable_slaves = [] :: [atom()] | none | '_', %% durable
+ policy :: binary() | none | undefined | '_', %% durable, implicit
+ %% update as above
+ operator_policy :: binary() | none | undefined | '_', %% durable,
+ %% implicit
+ %% update
+ %% as above
+ gm_pids = [] :: [pid()] | none | '_', %% transient
+ decorators :: [atom()] | none | undefined | '_', %% transient,
+ %% recalculated
+ %% as above
+ state = live :: atom() | none | '_', %% durable (have we crashed?)
+ policy_version = 0 :: non_neg_integer() | '_',
+ slave_pids_pending_shutdown = [] :: [pid()] | '_',
+ vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index
+ options = #{} :: map() | '_'
+ }).
+
+-type amqqueue() :: amqqueue_v1().
+-type amqqueue_v1() :: #amqqueue{
+ name :: rabbit_amqqueue:name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: pid() | none,
+ arguments :: rabbit_framing:amqp_table(),
+ pid :: pid() | none,
+ slave_pids :: [pid()] | none,
+ sync_slave_pids :: [pid()] | none,
+ recoverable_slaves :: [atom()] | none,
+ policy :: binary() | none | undefined,
+ operator_policy :: binary() | none | undefined,
+ gm_pids :: [pid()] | none,
+ decorators :: [atom()] | none | undefined,
+ state :: atom() | none,
+ policy_version :: non_neg_integer(),
+ slave_pids_pending_shutdown :: [pid()],
+ vhost :: rabbit_types:vhost() | undefined,
+ options :: map()
+ }.
+
+-type amqqueue_pattern() :: amqqueue_v1_pattern().
+-type amqqueue_v1_pattern() :: #amqqueue{
+ name :: rabbit_amqqueue:name() | '_',
+ durable :: '_',
+ auto_delete :: '_',
+ exclusive_owner :: '_',
+ arguments :: '_',
+ pid :: '_',
+ slave_pids :: '_',
+ sync_slave_pids :: '_',
+ recoverable_slaves :: '_',
+ policy :: '_',
+ operator_policy :: '_',
+ gm_pids :: '_',
+ decorators :: '_',
+ state :: '_',
+ policy_version :: '_',
+ slave_pids_pending_shutdown :: '_',
+ vhost :: '_',
+ options :: '_'
+ }.
+
+-export_type([amqqueue/0,
+ amqqueue_v1/0,
+ amqqueue_pattern/0,
+ amqqueue_v1_pattern/0]).
+
+-spec new(rabbit_amqqueue:name(),
+ pid() | none,
+ boolean(),
+ boolean(),
+ pid() | none,
+ rabbit_framing:amqp_table(),
+ rabbit_types:vhost() | undefined,
+ map()) -> amqqueue().
+
+new(#resource{kind = queue} = Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options)
+ when (is_pid(Pid) orelse Pid =:= none) andalso
+ is_boolean(Durable) andalso
+ is_boolean(AutoDelete) andalso
+ (is_pid(Owner) orelse Owner =:= none) andalso
+ is_list(Args) andalso
+ (is_binary(VHost) orelse VHost =:= undefined) andalso
+ is_map(Options) ->
+ new_with_version(
+ ?record_version,
+ Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options).
+
+-spec new_with_version(amqqueue_v1,
+ rabbit_amqqueue:name(),
+ pid() | none,
+ boolean(),
+ boolean(),
+ pid() | none,
+ rabbit_framing:amqp_table(),
+ rabbit_types:vhost() | undefined,
+ map()) -> amqqueue().
+
+new_with_version(?record_version,
+ #resource{kind = queue} = Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options)
+ when (is_pid(Pid) orelse Pid =:= none) andalso
+ is_boolean(Durable) andalso
+ is_boolean(AutoDelete) andalso
+ (is_pid(Owner) orelse Owner =:= none) andalso
+ is_list(Args) andalso
+ (is_binary(VHost) orelse VHost =:= undefined) andalso
+ is_map(Options) ->
+ #amqqueue{name = Name,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = Pid,
+ vhost = VHost,
+ options = Options}.
+
+-spec is_amqqueue(any()) -> boolean().
+
+is_amqqueue(#amqqueue{}) -> true;
+is_amqqueue(_) -> false.
+
+-spec record_version_to_use() -> amqqueue_v1.
+
+record_version_to_use() ->
+ ?record_version.
+
+-spec upgrade(amqqueue()) -> amqqueue().
+
+upgrade(#amqqueue{} = Queue) ->
+ Queue.
+
+-spec upgrade_to(amqqueue_v1, amqqueue()) -> amqqueue().
+
+upgrade_to(?record_version, #amqqueue{} = Queue) ->
+ Queue.
+
+% arguments
+
+-spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table().
+
+get_arguments(#amqqueue{arguments = Args}) -> Args.
+
+-spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue().
+
+set_arguments(#amqqueue{} = Queue, Args) ->
+ Queue#amqqueue{arguments = Args}.
+
+% decorators
+
+get_decorators(#amqqueue{decorators = Decorators}) -> Decorators.
+
+set_decorators(#amqqueue{} = Queue, Decorators) ->
+ Queue#amqqueue{decorators = Decorators}.
+
+get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> Owner.
+
+% gm_pids
+
+get_gm_pids(#amqqueue{gm_pids = GMPids}) -> GMPids.
+
+set_gm_pids(#amqqueue{} = Queue, GMPids) ->
+ Queue#amqqueue{gm_pids = GMPids}.
+
+% name
+
+get_name(#amqqueue{name = Name}) -> Name.
+
+set_name(#amqqueue{} = Queue, Name) ->
+ Queue#amqqueue{name = Name}.
+
+% operator_policy
+
+get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy.
+
+set_operator_policy(#amqqueue{} = Queue, OpPolicy) ->
+ Queue#amqqueue{operator_policy = OpPolicy}.
+
+get_options(#amqqueue{options = Options}) -> Options.
+
+% pid
+
+get_pid(#amqqueue{pid = Pid}) -> Pid.
+
+set_pid(#amqqueue{} = Queue, Pid) ->
+ Queue#amqqueue{pid = Pid}.
+
+% policy
+
+get_policy(#amqqueue{policy = Policy}) -> Policy.
+
+set_policy(#amqqueue{} = Queue, Policy) ->
+ Queue#amqqueue{policy = Policy}.
+
+% policy_version
+
+get_policy_version(#amqqueue{policy_version = PV}) ->
+ PV.
+
+set_policy_version(#amqqueue{} = Queue, PV) ->
+ Queue#amqqueue{policy_version = PV}.
+
+% recoverable_slaves
+
+get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) ->
+ Slaves.
+
+set_recoverable_slaves(#amqqueue{} = Queue, Slaves) ->
+ Queue#amqqueue{recoverable_slaves = Slaves}.
+
+% slave_pids
+
+get_slave_pids(#amqqueue{slave_pids = Slaves}) ->
+ Slaves.
+
+set_slave_pids(#amqqueue{} = Queue, SlavePids) ->
+ Queue#amqqueue{slave_pids = SlavePids}.
+
+% slave_pids_pending_shutdown
+
+get_slave_pids_pending_shutdown(#amqqueue{slave_pids_pending_shutdown = Slaves}) ->
+ Slaves.
+
+set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) ->
+ Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}.
+
+% state
+
+get_state(#amqqueue{state = State}) -> State.
+
+set_state(#amqqueue{} = Queue, State) -> Queue#amqqueue{state = State}.
+
+% sync_slave_pids
+
+get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> Pids.
+
+set_sync_slave_pids(#amqqueue{} = Queue, Pids) ->
+ Queue#amqqueue{sync_slave_pids = Pids}.
+
+get_vhost(#amqqueue{vhost = VHost}) -> VHost.
+
+is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> AutoDelete.
+
+is_durable(#amqqueue{durable = Durable}) -> Durable.
+
+fields() -> fields(?record_version).
+
+fields(?record_version) -> record_info(fields, amqqueue).
+
+field_vhost() -> #amqqueue.vhost.
+
+-spec pattern_match_all() -> amqqueue_pattern().
+
+pattern_match_all() -> #amqqueue{_ = '_'}.
+
+-spec pattern_match_on_name(rabbit_amqqueue:name()) ->
+ amqqueue_pattern().
+
+pattern_match_on_name(Name) -> #amqqueue{name = Name, _ = '_'}.
+
+reset_mirroring_and_decorators(#amqqueue{} = Queue) ->
+ Queue#amqqueue{slave_pids = [],
+ sync_slave_pids = [],
+ gm_pids = [],
+ decorators = undefined}.
+
+set_immutable(#amqqueue{} = Queue) ->
+ Queue#amqqueue{pid = none,
+ slave_pids = none,
+ sync_slave_pids = none,
+ recoverable_slaves = none,
+ gm_pids = none,
+ policy = none,
+ decorators = none,
+ state = none}.
+
+macros() ->
+ io:format(
+ "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n",
+ [?record_version, record_info(size, amqqueue)]),
+ %% The field number starts at 2 because the first element is the
+ %% record name.
+ macros(record_info(fields, amqqueue), 2).
+
+macros([Field | Rest], I) ->
+ io:format(
+ "-define(~s_field_~s(Q), element(~b, Q)).~n",
+ [?record_version, Field, I]),
+ macros(Rest, I + 1);
+macros([], _) ->
+ ok.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 63d89ff4a7..ced59a8c62 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -21,11 +21,12 @@
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
forget_all_durable/1, delete_crashed/1, delete_crashed/2,
delete_crashed_internal/2]).
--export([pseudo_queue/2, immutable/1]).
+-export([pseudo_queue/2, pseudo_queue/3, immutable/1]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver/3, requeue/4, ack/4, reject/5]).
+-export([not_found/1, absent/2]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
emit_info_all/5, list_local/1, info_local/1,
emit_info_local/4, emit_info_down/4]).
@@ -56,6 +57,7 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
+-include("amqqueue.hrl").
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long,
unsignedbyte, unsignedshort, unsignedint]).
@@ -71,99 +73,102 @@
-type name() :: rabbit_types:r('queue').
-type qpids() :: [pid()].
-type qlen() :: rabbit_types:ok(non_neg_integer()).
--type qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return()).
+-type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()).
-type qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}.
-type msg_id() :: non_neg_integer().
-type ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}.
-type absent_reason() :: 'nodedown' | 'crashed'.
--type queue_or_absent() :: rabbit_types:amqqueue() |
- {'absent', rabbit_types:amqqueue(),absent_reason()}.
+-type queue_or_absent() :: amqqueue:amqqueue() |
+ {'absent', amqqueue:amqqueue(),absent_reason()}.
-type not_found_or_absent() ::
- 'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}.
--spec recover(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
+ 'not_found' | {'absent', amqqueue:amqqueue(), absent_reason()}.
+-spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
-spec stop(rabbit_types:vhost()) -> 'ok'.
--spec start([rabbit_types:amqqueue()]) -> 'ok'.
+-spec start([amqqueue:amqqueue()]) -> 'ok'.
-spec declare
(name(), boolean(), boolean(), rabbit_framing:amqp_table(),
rabbit_types:maybe(pid()), rabbit_types:username()) ->
{'new' | 'existing' | 'absent' | 'owner_died',
- rabbit_types:amqqueue()} |
- {'new', rabbit_types:amqqueue(), rabbit_fifo_client:state()} |
+ amqqueue:amqqueue()} |
+ {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} |
rabbit_types:channel_exit().
-spec declare
(name(), boolean(), boolean(), rabbit_framing:amqp_table(),
rabbit_types:maybe(pid()), rabbit_types:username(), node()) ->
- {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} |
- {'new', rabbit_types:amqqueue(), rabbit_fifo_client:state()} |
- {'absent', rabbit_types:amqqueue(), absent_reason()} |
+ {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
+ {'new', amqqueue:amqqueue(), rabbit_fifo_client:state()} |
+ {'absent', amqqueue:amqqueue(), absent_reason()} |
rabbit_types:channel_exit().
--spec internal_declare(rabbit_types:amqqueue(), boolean()) ->
+-spec internal_declare(amqqueue:amqqueue(), boolean()) ->
queue_or_absent() | rabbit_misc:thunk(queue_or_absent()).
-spec update
- (name(), fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) ->
- 'not_found' | rabbit_types:amqqueue().
+ (name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
+ 'not_found' | amqqueue:amqqueue().
-spec lookup
(name()) ->
- rabbit_types:ok(rabbit_types:amqqueue()) |
+ rabbit_types:ok(amqqueue:amqqueue()) |
rabbit_types:error('not_found');
([name()]) ->
- [rabbit_types:amqqueue()].
+ [amqqueue:amqqueue()].
-spec not_found_or_absent(name()) -> not_found_or_absent().
-spec with(name(), qfun(A)) ->
A | rabbit_types:error(not_found_or_absent()).
-spec with(name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B.
-spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit().
+-spec not_found(rabbit_types:r(atom())) -> rabbit_types:channel_exit().
+-spec absent(amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()) ->
+ rabbit_types:channel_exit().
-spec assert_equivalence
- (rabbit_types:amqqueue(), boolean(), boolean(),
+ (amqqueue:amqqueue(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) ->
'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit().
--spec check_exclusive_access(rabbit_types:amqqueue(), pid()) ->
+-spec check_exclusive_access(amqqueue:amqqueue(), pid()) ->
'ok' | rabbit_types:channel_exit().
-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
A | rabbit_types:channel_exit().
--spec list() -> [rabbit_types:amqqueue()].
--spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
+-spec list() -> [amqqueue:amqqueue()].
+-spec list(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
-spec list_names() -> [rabbit_amqqueue:name()].
--spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
--spec list_by_type(atom()) -> [rabbit_types:amqqueue()].
+-spec list_down(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
+-spec list_by_type(atom()) -> [amqqueue:amqqueue()].
-spec info_keys() -> rabbit_types:info_keys().
--spec info(rabbit_types:amqqueue()) -> rabbit_types:infos().
--spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) ->
+-spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
+-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) ->
rabbit_types:infos().
-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
[rabbit_types:infos()].
--spec notify_policy_changed(rabbit_types:amqqueue()) -> 'ok'.
--spec consumers(rabbit_types:amqqueue()) ->
+-spec notify_policy_changed(amqqueue:amqqueue()) -> 'ok'.
+-spec consumers(amqqueue:amqqueue()) ->
[{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(),
rabbit_framing:amqp_table()}].
-spec consumer_info_keys() -> rabbit_types:info_keys().
-spec consumers_all(rabbit_types:vhost()) ->
[{name(), pid(), rabbit_types:ctag(), boolean(),
non_neg_integer(), rabbit_framing:amqp_table()}].
--spec stat(rabbit_types:amqqueue()) ->
+-spec stat(amqqueue:amqqueue()) ->
{'ok', non_neg_integer(), non_neg_integer()}.
-spec delete_immediately(qpids()) -> 'ok'.
-spec delete_exclusive(qpids(), pid()) -> 'ok'.
-spec delete
- (rabbit_types:amqqueue(), 'false', 'false', rabbit_types:username()) ->
+ (amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) ->
qlen();
- (rabbit_types:amqqueue(), 'true' , 'false', rabbit_types:username()) ->
+ (amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) ->
qlen() | rabbit_types:error('in_use');
- (rabbit_types:amqqueue(), 'false', 'true', rabbit_types:username()) ->
+ (amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) ->
qlen() | rabbit_types:error('not_empty');
- (rabbit_types:amqqueue(), 'true' , 'true', rabbit_types:username()) ->
+ (amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) ->
qlen() |
rabbit_types:error('in_use') |
rabbit_types:error('not_empty').
--spec delete_crashed(rabbit_types:amqqueue()) -> 'ok'.
--spec delete_crashed_internal(rabbit_types:amqqueue(), rabbit_types:username()) -> 'ok'.
--spec purge(rabbit_types:amqqueue()) -> {ok, qlen()}.
+-spec delete_crashed(amqqueue:amqqueue()) -> 'ok'.
+-spec delete_crashed_internal(amqqueue:amqqueue(), rabbit_types:username()) -> 'ok'.
+-spec purge(amqqueue:amqqueue()) -> {ok, qlen()}.
-spec forget_all_durable(node()) -> 'ok'.
--spec deliver([rabbit_types:amqqueue()], rabbit_types:delivery(), #{Name :: atom() => rabbit_fifo_client:state()} | 'untracked') ->
+-spec deliver([amqqueue:amqqueue()], rabbit_types:delivery(), #{Name :: atom() => rabbit_fifo_client:state()} | 'untracked') ->
{qpids(), #{Name :: atom() => rabbit_fifo_client:state()}}.
--spec deliver([rabbit_types:amqqueue()], rabbit_types:delivery()) -> 'ok'.
+-spec deliver([amqqueue:amqqueue()], rabbit_types:delivery()) -> 'ok'.
-spec requeue(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
-spec ack(pid(), [msg_id()], pid(), #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
-spec reject(pid() | {atom(), node()}, [msg_id()], boolean(), pid(),
@@ -172,24 +177,24 @@
-spec notify_down_all(qpids(), pid(), non_neg_integer()) ->
ok_or_errors().
-spec activate_limit_all(qpids(), pid()) -> ok_or_errors().
--spec basic_get(rabbit_types:amqqueue(), pid(), boolean(), pid(), rabbit_types:ctag(),
+-spec basic_get(amqqueue:amqqueue(), pid(), boolean(), pid(), rabbit_types:ctag(),
#{Name :: atom() => rabbit_fifo_client:state()}) ->
{'ok', non_neg_integer(), qmsg()} | 'empty'.
-spec credit
- (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(),
+ (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(),
boolean(), #{Name :: atom() => rabbit_fifo_client:state()}) ->
'ok'.
-spec basic_consume
- (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
+ (amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(),
non_neg_integer(), rabbit_types:ctag(), boolean(),
rabbit_framing:amqp_table(), any(), rabbit_types:username(),
#{Name :: atom() => rabbit_fifo_client:state()}) ->
rabbit_types:ok_or_error('exclusive_consume_unavailable').
-spec basic_cancel
- (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any(),
+ (amqqueue:amqqueue(), pid(), rabbit_types:ctag(), any(),
rabbit_types:username(), #{Name :: atom() => rabbit_fifo_client:state()}) ->
'ok' | {'ok', #{Name :: atom() => rabbit_fifo_client:state()}}.
--spec notify_decorators(rabbit_types:amqqueue()) -> 'ok'.
+-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'.
-spec resume(pid(), pid()) -> 'ok'.
-spec internal_delete(name(), rabbit_types:username()) ->
'ok' | rabbit_types:connection_exit() |
@@ -202,20 +207,21 @@
-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
-spec on_node_up(node()) -> 'ok'.
-spec on_node_down(node()) -> 'ok'.
--spec pseudo_queue(name(), pid()) -> rabbit_types:amqqueue().
--spec immutable(rabbit_types:amqqueue()) -> rabbit_types:amqqueue().
--spec store_queue(rabbit_types:amqqueue()) -> 'ok'.
+-spec pseudo_queue(name(), pid()) -> amqqueue:amqqueue().
+-spec pseudo_queue(name(), pid(), boolean()) -> amqqueue:amqqueue().
+-spec immutable(amqqueue:amqqueue()) -> amqqueue:amqqueue().
+-spec store_queue(amqqueue:amqqueue()) -> 'ok'.
-spec update_decorators(name()) -> 'ok'.
--spec policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) ->
+-spec policy_changed(amqqueue:amqqueue(), amqqueue:amqqueue()) ->
'ok'.
-spec update_mirroring(pid()) -> 'ok'.
--spec sync_mirrors(rabbit_types:amqqueue() | pid()) ->
+-spec sync_mirrors(amqqueue:amqqueue() | pid()) ->
'ok' | rabbit_types:error('not_mirrored').
--spec cancel_sync_mirrors(rabbit_types:amqqueue() | pid()) ->
+-spec cancel_sync_mirrors(amqqueue:amqqueue() | pid()) ->
'ok' | {'ok', 'not_syncing'}.
--spec is_replicated(rabbit_types:amqqueue()) -> boolean().
+-spec is_replicated(amqqueue:amqqueue()) -> boolean().
--spec pid_of(rabbit_types:amqqueue()) ->
+-spec pid_of(amqqueue:amqqueue()) ->
{'ok', pid()} | rabbit_types:error('not_found').
-spec pid_of(rabbit_types:vhost(), rabbit_misc:resource_name()) ->
{'ok', pid()} | rabbit_types:error('not_found').
@@ -252,7 +258,7 @@ recover_classic_queues(VHost, Queues) ->
%% order as the supplied queue names, so that we can zip them together
%% for further processing in recover_durable_queues.
{ok, OrderedRecoveryTerms} =
- BQ:start(VHost, [QName || #amqqueue{name = QName} <- Queues]),
+ BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]),
case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of
{ok, _} ->
recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms));
@@ -262,7 +268,7 @@ recover_classic_queues(VHost, Queues) ->
end.
filter_per_type(Queues) ->
- lists:partition(fun(#amqqueue{type = Type}) -> Type == classic end, Queues).
+ lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues).
filter_pid_per_type(QPids) ->
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
@@ -287,47 +293,42 @@ start(Qs) ->
%% visible to routing, so now it is safe for them to complete
%% their initialisation (which may involve interacting with other
%% queues).
- _ = [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Classic],
+ _ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic],
ok.
mark_local_durable_queues_stopped(VHost) ->
Qs = find_local_durable_classic_queues(VHost),
rabbit_misc:execute_mnesia_transaction(
fun() ->
- [ store_queue(Q#amqqueue{ state = stopped })
- || Q = #amqqueue{ state = State } <- Qs,
- State =/= stopped ]
+ [ store_queue(amqqueue:set_state(Q, stopped))
+ || Q <- Qs,
+ amqqueue:get_state(Q) =/= stopped ]
end).
find_local_quorum_queues(VHost) ->
Node = node(),
mnesia:async_dirty(
fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{vhost = VH,
- type = quorum,
- quorum_nodes = QuorumNodes}
- <- mnesia:table(rabbit_durable_queue),
- VH =:= VHost,
- (lists:member(Node, QuorumNodes))]))
+ qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
+ amqqueue:get_vhost(Q) =:= VHost,
+ amqqueue:is_quorum(Q) andalso
+ (lists:member(Node, amqqueue:get_quorum_nodes(Q)))]))
end).
find_local_durable_classic_queues(VHost) ->
Node = node(),
mnesia:async_dirty(
fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
- vhost = VH,
- pid = Pid,
- type = classic}
- <- mnesia:table(rabbit_durable_queue),
- VH =:= VHost,
- (is_local_to_node(Pid, Node) andalso
+ qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
+ amqqueue:get_vhost(Q) =:= VHost,
+ amqqueue:is_classic(Q) andalso
+ (is_local_to_node(amqqueue:get_pid(Q), Node) andalso
%% Terminations on node down will not remove the rabbit_queue
%% record if it is a mirrored queue (such info is now obtained from
%% the policy). Thus, we must check if the local pid is alive
%% - if the record is present - in order to restart.
- (mnesia:read(rabbit_queue, Name, read) =:= []
- orelse not rabbit_mnesia:is_process_alive(Pid)))
+ (mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= []
+ orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))))
]))
end).
@@ -335,20 +336,16 @@ find_recoverable_queues() ->
Node = node(),
mnesia:async_dirty(
fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
- pid = Pid,
- type = Type,
- quorum_nodes = QuorumNodes}
- <- mnesia:table(rabbit_durable_queue),
- (Type == classic andalso
- (is_local_to_node(Pid, Node) andalso
+ qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
+ (amqqueue:is_classic(Q) andalso
+ (is_local_to_node(amqqueue:get_pid(Q), Node) andalso
%% Terminations on node down will not remove the rabbit_queue
%% record if it is a mirrored queue (such info is now obtained from
%% the policy). Thus, we must check if the local pid is alive
%% - if the record is present - in order to restart.
- (mnesia:read(rabbit_queue, Name, read) =:= []
- orelse not rabbit_mnesia:is_process_alive(Pid))))
- orelse (Type == quorum andalso lists:member(Node, QuorumNodes))
+ (mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= []
+ orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)))))
+ orelse (amqqueue:is_quorum(Q) andalso lists:member(Node, amqqueue:get_quorum_nodes(Q)))
]))
end).
@@ -372,32 +369,39 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
Owner, ActingUser, Node) ->
ok = check_declare_arguments(QueueName, Args),
Type = get_queue_type(Args),
- Q = rabbit_queue_decorator:set(
- rabbit_policy:set(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- recoverable_slaves = [],
- gm_pids = [],
- state = live,
- policy_version = 0,
- slave_pids_pending_shutdown = [],
- vhost = VHost,
- options = #{user => ActingUser},
- type = Type})),
-
- case Type of
- classic ->
- declare_classic_queue(Q, Node);
- quorum ->
- rabbit_quorum_queue:declare(Q)
+ TypeIsAllowed =
+ Type =:= classic orelse
+ rabbit_feature_flags:is_enabled(quorum_queue),
+ case TypeIsAllowed of
+ true ->
+ Q0 = amqqueue:new(QueueName,
+ none,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ #{user => ActingUser},
+ Type),
+ Q = rabbit_queue_decorator:set(
+ rabbit_policy:set(Q0)),
+ do_declare(Q, Node);
+ false ->
+ rabbit_misc:protocol_error(
+ internal_error,
+ "Cannot declare a queue '~s' of type '~s' on node '~s': "
+ "the 'quorum_queue' feature flag is disabled",
+ [rabbit_misc:rs(QueueName), Type, Node])
end.
-declare_classic_queue(#amqqueue{name = QName, vhost = VHost} = Q, Node) ->
+do_declare(Q, Node) when ?amqqueue_is_classic(Q) ->
+ declare_classic_queue(Q, Node);
+do_declare(Q, _Node) when ?amqqueue_is_quorum(Q) ->
+ rabbit_quorum_queue:declare(Q).
+
+declare_classic_queue(Q, Node) ->
+ QName = amqqueue:get_name(Q),
+ VHost = amqqueue:get_vhost(Q),
Node1 = case rabbit_queue_master_location_misc:get_location(Q) of
{ok, Node0} -> Node0;
{error, _} -> Node
@@ -425,17 +429,18 @@ get_queue_type(Args) ->
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
- ok = store_queue(Q#amqqueue{state = live}),
+ ok = store_queue(amqqueue:set_state(Q, live)),
rabbit_misc:const({created, Q})
end);
-internal_declare(Q = #amqqueue{name = QueueName}, false) ->
+internal_declare(Q, false) ->
+ QueueName = amqqueue:get_name(Q),
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
case not_found_or_absent(QueueName) of
not_found -> Q1 = rabbit_policy:set(Q),
- Q2 = Q1#amqqueue{state = live},
+ Q2 = amqqueue:set_state(Q1, live),
ok = store_queue(Q2),
fun () -> {created, Q2} end;
{absent, _Q, _} = R -> rabbit_misc:const(R)
@@ -447,7 +452,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
update(Name, Fun) ->
case mnesia:wread({rabbit_queue, Name}) of
- [Q = #amqqueue{durable = Durable}] ->
+ [Q] ->
+ Durable = amqqueue:is_durable(Q),
Q1 = Fun(Q),
ok = mnesia:write(rabbit_queue, Q1, write),
case Durable of
@@ -468,14 +474,11 @@ ensure_rabbit_queue_record_is_initialized(Q) ->
rabbit_misc:const({ok, Q})
end).
-store_queue(Q = #amqqueue{durable = true}) ->
- ok = mnesia:write(rabbit_durable_queue,
- Q#amqqueue{slave_pids = [],
- sync_slave_pids = [],
- gm_pids = [],
- decorators = undefined}, write),
+store_queue(Q) when ?amqqueue_is_durable(Q) ->
+ Q1 = amqqueue:reset_mirroring_and_decorators(Q),
+ ok = mnesia:write(rabbit_durable_queue, Q1, write),
store_queue_ram(Q);
-store_queue(Q = #amqqueue{durable = false}) ->
+store_queue(Q) when not ?amqqueue_is_durable(Q) ->
store_queue_ram(Q).
store_queue_ram(Q) ->
@@ -491,8 +494,9 @@ update_decorators(Name) ->
end
end).
-policy_changed(Q1 = #amqqueue{decorators = Decorators1},
- Q2 = #amqqueue{decorators = Decorators2}) ->
+policy_changed(Q1, Q2) ->
+ Decorators1 = amqqueue:get_decorators(Q1),
+ Decorators2 = amqqueue:get_decorators(Q2),
rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
D1 = rabbit_queue_decorator:select(Decorators1),
D2 = rabbit_queue_decorator:select(Decorators2),
@@ -532,20 +536,20 @@ with(Name, F, E) ->
with(Name, F, E, RetriesLeft) ->
case lookup(Name) of
- {ok, Q = #amqqueue{state = live}} when RetriesLeft =:= 0 ->
+ {ok, Q} when ?amqqueue_state_is(Q, live) andalso RetriesLeft =:= 0 ->
%% Something bad happened to that queue, we are bailing out
%% on processing current request.
E({absent, Q, timeout});
- {ok, Q = #amqqueue{state = stopped}} when RetriesLeft =:= 0 ->
+ {ok, Q} when ?amqqueue_state_is(Q, stopped) andalso RetriesLeft =:= 0 ->
%% The queue was stopped and not migrated
E({absent, Q, stopped});
%% The queue process has crashed with unknown error
- {ok, Q = #amqqueue{state = crashed}} ->
+ {ok, Q} when ?amqqueue_state_is(Q, crashed) ->
E({absent, Q, crashed});
%% The queue process has been stopped by a supervisor.
%% In that case a synchronised slave can take over
%% so we should retry.
- {ok, Q = #amqqueue{state = stopped}} ->
+ {ok, Q} when ?amqqueue_state_is(Q, stopped) ->
%% The queue process was stopped by the supervisor
rabbit_misc:with_exit_handler(
fun () -> retry_wait(Q, F, E, RetriesLeft) end,
@@ -553,7 +557,7 @@ with(Name, F, E, RetriesLeft) ->
%% The queue is supposed to be active.
%% The master node can go away or queue can be killed
%% so we retry, waiting for a slave to take over.
- {ok, Q = #amqqueue{state = live}} ->
+ {ok, Q} when ?amqqueue_state_is(Q, live) ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
%% with the QPid. F() should be written s.t. that this
@@ -567,7 +571,10 @@ with(Name, F, E, RetriesLeft) ->
E(not_found_or_absent_dirty(Name))
end.
-retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, RetriesLeft) ->
+retry_wait(Q, F, E, RetriesLeft) ->
+ Name = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
+ QState = amqqueue:get_state(Q),
case {QState, is_replicated(Q)} of
%% We don't want to repeat an operation if
%% there are no slaves to migrate to
@@ -592,14 +599,47 @@ retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, Retries
with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
with_or_die(Name, F) ->
- with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name);
- ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
+ with(Name, F, fun (not_found) -> not_found(Name);
+ ({absent, Q, Reason}) -> absent(Q, Reason)
end).
-assert_equivalence(#amqqueue{name = QName,
- durable = Durable,
- auto_delete = AD} = Q,
- Durable1, AD1, Args1, Owner) ->
+not_found(R) -> rabbit_misc:protocol_error(not_found, "no ~s", [rabbit_misc:rs(R)]).
+
+absent(Q, AbsentReason) ->
+ QueueName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
+ IsDurable = amqqueue:is_durable(Q),
+ priv_absent(QueueName, QPid, IsDurable, AbsentReason).
+
+priv_absent(QueueName, QPid, true, nodedown) ->
+ %% The assertion of durability is mainly there because we mention
+ %% durability in the error message. That way we will hopefully
+ %% notice if at some future point our logic changes s.t. we get
+ %% here with non-durable queues.
+ rabbit_misc:protocol_error(
+ not_found,
+ "home node '~s' of durable ~s is down or inaccessible",
+ [node(QPid), rabbit_misc:rs(QueueName)]);
+
+priv_absent(QueueName, _QPid, _IsDurable, stopped) ->
+ rabbit_misc:protocol_error(
+ not_found,
+ "~s process is stopped by supervisor", [rabbit_misc:rs(QueueName)]);
+
+priv_absent(QueueName, _QPid, _IsDurable, crashed) ->
+ rabbit_misc:protocol_error(
+ not_found,
+ "~s has crashed and failed to restart", [rabbit_misc:rs(QueueName)]);
+
+priv_absent(QueueName, _QPid, _IsDurable, timeout) ->
+ rabbit_misc:protocol_error(
+ not_found,
+ "failed to perform operation on ~s due to timeout", [rabbit_misc:rs(QueueName)]).
+
+assert_equivalence(Q, Durable1, AD1, Args1, Owner) ->
+ QName = amqqueue:get_name(Q),
+ Durable = amqqueue:is_durable(Q),
+ AD = amqqueue:is_auto_delete(Q),
rabbit_misc:assert_field_equivalence(Durable, Durable1, QName, durable),
rabbit_misc:assert_field_equivalence(AD, AD1, QName, auto_delete),
assert_args_equivalence(Q, Args1),
@@ -607,11 +647,14 @@ assert_equivalence(#amqqueue{name = QName,
check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax).
-check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
+check_exclusive_access(Q, Owner, _MatchType)
+ when ?amqqueue_exclusive_owner_is(Q, Owner) ->
ok;
-check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
+check_exclusive_access(Q, _ReaderPid, lax)
+ when ?amqqueue_exclusive_owner_is(Q, none) ->
ok;
-check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) ->
+check_exclusive_access(Q, _ReaderPid, _MatchType) ->
+ QueueName = amqqueue:get_name(Q),
rabbit_misc:protocol_error(
resource_locked,
"cannot obtain exclusive access to locked ~s",
@@ -621,8 +664,9 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
with_or_die(Name,
fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
-assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
- RequiredArgs) ->
+assert_args_equivalence(Q, RequiredArgs) ->
+ QueueName = amqqueue:get_name(Q),
+ Args = amqqueue:get_arguments(Q),
rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
[Key || {Key, _Fun} <- declare_args()]).
@@ -750,41 +794,37 @@ check_queue_type({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
-list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
+list() -> mnesia:dirty_match_object(rabbit_queue, amqqueue:pattern_match_all()).
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
-list_names(VHost) -> [Q#amqqueue.name || Q <- list(VHost)].
+list_names(VHost) -> [amqqueue:get_name(Q) || Q <- list(VHost)].
list_local_names() ->
- [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
- State =/= crashed, is_local_to_node(QPid, node())].
+ [ amqqueue:get_name(Q) || Q <- list(),
+ amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())].
list_by_type(Type) ->
{atomic, Qs} =
mnesia:sync_transaction(
fun () ->
mnesia:match_object(rabbit_durable_queue,
- #amqqueue{_ = '_', type = Type}, read)
+ amqqueue:pattern_match_on_type(Type),
+ read)
end),
Qs.
list_local_followers() ->
- [ Q#amqqueue.name
- || #amqqueue{state = State, type = quorum, pid = {_, Leader},
- quorum_nodes = Nodes} = Q <- list(),
- State =/= crashed, Leader =/= node(), lists:member(node(), Nodes)].
+ [ amqqueue:get_name(Q)
+ || Q <- list(),
+ amqqueue:is_quorum(Q),
+ amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), amqqueue:get_quorum_nodes(Q))].
is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) ->
Node =:= node(QPid);
is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) ->
Node =:= Leader.
-qnode(QPid) when ?IS_CLASSIC(QPid) ->
- node(QPid);
-qnode({_, Node} = QPid) when ?IS_QUORUM(QPid) ->
- Node.
-
list(VHostPath) ->
list(VHostPath, rabbit_queue).
@@ -795,7 +835,7 @@ list(VHostPath, TableName) ->
fun () ->
mnesia:match_object(
TableName,
- #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'},
+ amqqueue:pattern_match_on_name(rabbit_misc:r(VHostPath, queue)),
read)
end).
@@ -805,8 +845,9 @@ list_down(VHostPath) ->
true ->
Present = list(VHostPath),
Durable = list(VHostPath, rabbit_durable_queue),
- PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]),
- sets:to_list(sets:filter(fun (#amqqueue{name = N}) ->
+ PresentS = sets:from_list([amqqueue:get_name(Q) || Q <- Present]),
+ sets:to_list(sets:filter(fun (Q) ->
+ N = amqqueue:get_name(Q),
not sets:is_element(N, PresentS)
end, sets:from_list(Durable)))
end.
@@ -818,7 +859,7 @@ count(VHost) ->
%% won't work here because with master migration of mirrored queues
%% the "ownership" of queues by nodes becomes a non-trivial problem
%% that requires a proper consensus algorithm.
- length(mnesia:dirty_index_read(rabbit_queue, VHost, #amqqueue.vhost))
+ length(mnesia:dirty_index_read(rabbit_queue, VHost, amqqueue:field_vhost()))
catch _:Err ->
rabbit_log:error("Failed to fetch number of queues in vhost ~p:~n~p~n",
[VHost, Err]),
@@ -829,9 +870,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
-is_unresponsive(#amqqueue{ state = crashed }, _Timeout) ->
+is_unresponsive(Q, _Timeout) when ?amqqueue_state_is(Q, crashed) ->
false;
-is_unresponsive(#amqqueue{ pid = QPid }, Timeout) ->
+is_unresponsive(Q, Timeout) ->
+ QPid = amqqueue:get_pid(Q),
try
delegate:invoke(QPid, {gen_server2, call, [{info, [name]}, Timeout]}),
false
@@ -841,21 +883,24 @@ is_unresponsive(#amqqueue{ pid = QPid }, Timeout) ->
true
end.
-format(Q = #amqqueue{ type = quorum }) -> rabbit_quorum_queue:format(Q);
+format(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:format(Q);
format(_) -> [].
-info(Q = #amqqueue{ type = quorum }) -> rabbit_quorum_queue:info(Q);
-info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
-info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped);
-info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
+info(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:info(Q);
+info(Q) when ?amqqueue_state_is(Q, crashed) -> info_down(Q, crashed);
+info(Q) when ?amqqueue_state_is(Q, stopped) -> info_down(Q, stopped);
+info(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
-info(Q = #amqqueue{ type = quorum }, Items) ->
+info(Q, Items) when ?amqqueue_is_quorum(Q) ->
rabbit_quorum_queue:info(Q, Items);
-info(Q = #amqqueue{ state = crashed }, Items) ->
+info(Q, Items) when ?amqqueue_state_is(Q, crashed) ->
info_down(Q, Items, crashed);
-info(Q = #amqqueue{ state = stopped }, Items) ->
+info(Q, Items) when ?amqqueue_state_is(Q, stopped) ->
info_down(Q, Items, stopped);
-info(#amqqueue{ pid = QPid }, Items) ->
+info(Q, Items) ->
+ QPid = amqqueue:get_pid(Q),
case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
@@ -867,13 +912,13 @@ info_down(Q, DownReason) ->
info_down(Q, Items, DownReason) ->
[{Item, i_down(Item, Q, DownReason)} || Item <- Items].
-i_down(name, #amqqueue{name = Name}, _) -> Name;
-i_down(durable, #amqqueue{durable = Dur}, _) -> Dur;
-i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD;
-i_down(arguments, #amqqueue{arguments = Args}, _) -> Args;
-i_down(pid, #amqqueue{pid = QPid}, _) -> QPid;
-i_down(recoverable_slaves, #amqqueue{recoverable_slaves = RS}, _) -> RS;
-i_down(state, _Q, DownReason) -> DownReason;
+i_down(name, Q, _) -> amqqueue:get_name(Q);
+i_down(durable, Q, _) -> amqqueue:is_durable(Q);
+i_down(auto_delete, Q, _) -> amqqueue:is_auto_delete(Q);
+i_down(arguments, Q, _) -> amqqueue:get_arguments(Q);
+i_down(pid, Q, _) -> amqqueue:get_pid(Q);
+i_down(recoverable_slaves, Q, _) -> amqqueue:get_recoverable_slaves(Q);
+i_down(state, _Q, DownReason) -> DownReason;
i_down(K, _Q, _DownReason) ->
case lists:member(K, rabbit_amqqueue_process:info_keys()) of
true -> '';
@@ -919,18 +964,22 @@ info_local(VHostPath) ->
map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end).
list_local(VHostPath) ->
- [ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath),
- State =/= crashed, is_local_to_node(QPid, node()) ].
+ [Q || Q <- list(VHostPath),
+ amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())].
-notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) ->
+notify_policy_changed(Q) when ?amqqueue_is_classic(Q) ->
+ QPid = amqqueue:get_pid(Q),
gen_server2:cast(QPid, policy_changed);
-notify_policy_changed(#amqqueue{pid = QPid,
- name = QName}) when ?IS_QUORUM(QPid) ->
+notify_policy_changed(Q) when ?amqqueue_is_quorum(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ QName = amqqueue:get_name(Q),
rabbit_quorum_queue:policy_changed(QName, QPid).
-consumers(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) ->
+consumers(Q) when ?amqqueue_is_classic(Q) ->
+ QPid = amqqueue:get_pid(Q),
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]});
-consumers(#amqqueue{pid = QPid}) when ?IS_QUORUM(QPid) ->
+consumers(Q) when ?amqqueue_is_quorum(Q) ->
+ QPid = amqqueue:get_pid(Q),
{ok, {_, Result}, _} = ra:local_query(QPid,
fun rabbit_fifo:query_consumers/1),
maps:values(Result).
@@ -957,14 +1006,14 @@ emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
get_queue_consumer_info(Q, ConsumerInfoKeys) ->
[lists:zip(ConsumerInfoKeys,
- [Q#amqqueue.name, ChPid, CTag,
+ [amqqueue:get_name(Q), ChPid, CTag,
AckRequired, Prefetch, Active, ActivityStatus, Args]) ||
{ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)].
-stat(#amqqueue{type = quorum} = Q) -> rabbit_quorum_queue:stat(Q);
-stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}).
+stat(Q) when ?amqqueue_is_quorum(Q) -> rabbit_quorum_queue:stat(Q);
+stat(Q) -> delegate:invoke(amqqueue:get_pid(Q), {gen_server2, call, [stat, infinity]}).
-pid_of(#amqqueue{pid = Pid}) -> Pid.
+pid_of(Q) -> amqqueue:get_pid(Q).
pid_of(VHost, QueueName) ->
case lookup(rabbit_misc:r(VHost, queue, QueueName)) of
{ok, Q} -> pid_of(Q);
@@ -990,15 +1039,16 @@ delete_immediately_by_resource(Resources) ->
|| {Resource, QPid} <- Quorum],
ok.
-delete(#amqqueue{ type = quorum} = Q,
- IfUnused, IfEmpty, ActingUser) ->
+delete(Q,
+ IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
rabbit_quorum_queue:delete(Q, IfUnused, IfEmpty, ActingUser);
delete(Q, IfUnused, IfEmpty, ActingUser) ->
case wait_for_promoted_or_stopped(Q) of
- {promoted, #amqqueue{pid = QPid}} ->
+ {promoted, Q1} ->
+ QPid = amqqueue:get_pid(Q1),
delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]});
{stopped, Q1} ->
- #resource{name = Name, virtual_host = Vhost} = Q1#amqqueue.name,
+ #resource{name = Name, virtual_host = Vhost} = amqqueue:get_name(Q1),
case IfEmpty of
true ->
rabbit_log:error("Queue ~s in vhost ~s has its master node down and "
@@ -1020,10 +1070,16 @@ delete(Q, IfUnused, IfEmpty, ActingUser) ->
{ok, 0}
end.
--spec wait_for_promoted_or_stopped(#amqqueue{}) -> {promoted, #amqqueue{}} | {stopped, #amqqueue{}} | {error, not_found}.
-wait_for_promoted_or_stopped(#amqqueue{name = QName}) ->
+-spec wait_for_promoted_or_stopped(amqqueue:amqqueue()) ->
+ {promoted, amqqueue:amqqueue()} |
+ {stopped, amqqueue:amqqueue()} |
+ {error, not_found}.
+wait_for_promoted_or_stopped(Q0) ->
+ QName = amqqueue:get_name(Q0),
case lookup(QName) of
- {ok, Q = #amqqueue{pid = QPid, slave_pids = SPids}} ->
+ {ok, Q} ->
+ QPid = amqqueue:get_pid(Q),
+ SPids = amqqueue:get_slave_pids(Q),
case rabbit_mnesia:is_process_alive(QPid) of
true -> {promoted, Q};
false ->
@@ -1046,17 +1102,20 @@ wait_for_promoted_or_stopped(#amqqueue{name = QName}) ->
delete_crashed(Q) ->
delete_crashed(Q, ?INTERNAL_USER).
-delete_crashed(#amqqueue{ pid = QPid } = Q, ActingUser) ->
- ok = rpc:call(qnode(QPid), ?MODULE, delete_crashed_internal, [Q, ActingUser]).
+delete_crashed(Q, ActingUser) ->
+ ok = rpc:call(amqqueue:qnode(Q), ?MODULE, delete_crashed_internal, [Q, ActingUser]).
-delete_crashed_internal(Q = #amqqueue{ name = QName }, ActingUser) ->
+delete_crashed_internal(Q, ActingUser) ->
+ QName = amqqueue:get_name(Q),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
BQ:delete_crashed(Q),
ok = internal_delete(QName, ActingUser).
-purge(#amqqueue{ pid = QPid, type = classic}) ->
+purge(Q) when ?amqqueue_is_classic(Q) ->
+ QPid = amqqueue:get_pid(Q),
delegate:invoke(QPid, {gen_server2, call, [purge, infinity]});
-purge(#amqqueue{ pid = NodeId, type = quorum}) ->
+purge(Q) when ?amqqueue_is_quorum(Q) ->
+ NodeId = amqqueue:get_pid(Q),
rabbit_quorum_queue:purge(NodeId).
@@ -1129,25 +1188,31 @@ activate_limit_all(QRefs, ChPid) ->
delegate:invoke_no_result(QPids, {gen_server2, cast,
[{activate_limit, ChPid}]}).
-credit(#amqqueue{pid = QPid, type = classic}, ChPid, CTag, Credit,
- Drain, QStates) ->
+credit(Q, ChPid, CTag, Credit,
+ Drain, QStates) when ?amqqueue_is_classic(Q) ->
+ QPid = amqqueue:get_pid(Q),
delegate:invoke_no_result(QPid, {gen_server2, cast,
[{credit, ChPid, CTag, Credit, Drain}]}),
{ok, QStates};
-credit(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum},
+credit(Q,
_ChPid, CTag, Credit,
- Drain, QStates) ->
+ Drain, QStates) when ?amqqueue_is_quorum(Q) ->
+ {Name, _} = Id = amqqueue:get_pid(Q),
+ QName = amqqueue:get_name(Q),
QState0 = get_quorum_state(Id, QName, QStates),
{ok, QState} = rabbit_quorum_queue:credit(CTag, Credit, Drain, QState0),
{ok, maps:put(Name, QState, QStates)}.
-basic_get(#amqqueue{pid = QPid, type = classic}, ChPid, NoAck, LimiterPid,
- _CTag, _) ->
+basic_get(Q, ChPid, NoAck, LimiterPid, _CTag, _)
+ when ?amqqueue_is_classic(Q) ->
+ QPid = amqqueue:get_pid(Q),
delegate:invoke(QPid, {gen_server2, call,
[{basic_get, ChPid, NoAck, LimiterPid}, infinity]});
-basic_get(#amqqueue{pid = {Name, _} = Id, type = quorum, name = QName} = Q, _ChPid, NoAck,
- _LimiterPid, CTag, QStates) ->
+basic_get(Q, _ChPid, NoAck, _LimiterPid, CTag, QStates)
+ when ?amqqueue_is_quorum(Q) ->
+ {Name, _} = Id = amqqueue:get_pid(Q),
+ QName = amqqueue:get_name(Q),
QState0 = get_quorum_state(Id, QName, QStates),
case rabbit_quorum_queue:basic_get(Q, NoAck, CTag, QState0) of
{ok, empty, QState} ->
@@ -1160,9 +1225,12 @@ basic_get(#amqqueue{pid = {Name, _} = Id, type = quorum, name = QName} = Q, _ChP
[rabbit_misc:rs(QName), Reason])
end.
-basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid,
+basic_consume(Q, NoAck, ChPid,
LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag,
- ExclusiveConsume, Args, OkMsg, ActingUser, QState) ->
+ ExclusiveConsume, Args, OkMsg, ActingUser, QState)
+ when ?amqqueue_is_classic(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ QName = amqqueue:get_name(Q),
ok = check_consume_arguments(QName, Args),
case delegate:invoke(QPid,
{gen_server2, call,
@@ -1174,14 +1242,18 @@ basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid,
Err ->
Err
end;
-basic_consume(#amqqueue{type = quorum}, _NoAck, _ChPid,
+basic_consume(Q, _NoAck, _ChPid,
_LimiterPid, true, _ConsumerPrefetchCount, _ConsumerTag,
- _ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates) ->
+ _ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates)
+ when ?amqqueue_is_quorum(Q) ->
{error, global_qos_not_supported_for_queue_type};
-basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q,
+basic_consume(Q,
NoAck, ChPid, _LimiterPid, _LimiterActive, ConsumerPrefetchCount,
ConsumerTag, ExclusiveConsume, Args, OkMsg,
- ActingUser, QStates) ->
+ ActingUser, QStates)
+ when ?amqqueue_is_quorum(Q) ->
+ {Name, _} = Id = amqqueue:get_pid(Q),
+ QName = amqqueue:get_name(Q),
ok = check_consume_arguments(QName, Args),
QState0 = get_quorum_state(Id, QName, QStates),
{ok, QState} = rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid,
@@ -1192,8 +1264,10 @@ basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q,
OkMsg, QState0),
{ok, maps:put(Name, QState, QStates)}.
-basic_cancel(#amqqueue{pid = QPid, type = classic}, ChPid, ConsumerTag, OkMsg, ActingUser,
- QState) ->
+basic_cancel(Q, ChPid, ConsumerTag, OkMsg, ActingUser,
+ QState)
+ when ?amqqueue_is_classic(Q) ->
+ QPid = amqqueue:get_pid(Q),
case delegate:invoke(QPid, {gen_server2, call,
[{basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser},
infinity]}) of
@@ -1201,13 +1275,16 @@ basic_cancel(#amqqueue{pid = QPid, type = classic}, ChPid, ConsumerTag, OkMsg, A
{ok, QState};
Err -> Err
end;
-basic_cancel(#amqqueue{pid = {Name, _} = Id, type = quorum}, ChPid,
- ConsumerTag, OkMsg, _ActingUser, QStates) ->
+basic_cancel(Q, ChPid,
+ ConsumerTag, OkMsg, _ActingUser, QStates)
+ when ?amqqueue_is_quorum(Q) ->
+ {Name, _} = Id = amqqueue:get_pid(Q),
QState0 = get_quorum_state(Id, QStates),
{ok, QState} = rabbit_quorum_queue:basic_cancel(ConsumerTag, ChPid, OkMsg, QState0),
{ok, maps:put(Name, QState, QStates)}.
-notify_decorators(#amqqueue{pid = QPid}) ->
+notify_decorators(Q) ->
+ QPid = amqqueue:get_pid(Q),
delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}).
notify_sent(QPid, ChPid) ->
@@ -1267,10 +1344,10 @@ forget_all_durable(Node) ->
mnesia:sync_transaction(
fun () ->
Qs = mnesia:match_object(rabbit_durable_queue,
- #amqqueue{_ = '_'}, write),
+ amqqueue:pattern_match_all(), write),
[forget_node_for_queue(Node, Q) ||
- #amqqueue{pid = Pid} = Q <- Qs,
- is_local_to_node(Pid, Node)],
+ Q <- Qs,
+ is_local_to_node(amqqueue:get_pid(Q), Node)],
ok
end),
ok.
@@ -1278,26 +1355,30 @@ forget_all_durable(Node) ->
%% Try to promote a slave while down - it should recover as a
%% master. We try to take the oldest slave here for best chance of
%% recovery.
-forget_node_for_queue(DeadNode, Q = #amqqueue{type = quorum,
- quorum_nodes = QN}) ->
+forget_node_for_queue(DeadNode, Q)
+ when ?amqqueue_is_quorum(Q) ->
+ QN = amqqueue:get_quorum_nodes(Q),
forget_node_for_queue(DeadNode, QN, Q);
-forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) ->
+forget_node_for_queue(DeadNode, Q) ->
+ RS = amqqueue:get_recoverable_slaves(Q),
forget_node_for_queue(DeadNode, RS, Q).
-forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) ->
+forget_node_for_queue(_DeadNode, [], Q) ->
%% No slaves to recover from, queue is gone.
%% Don't process_deletions since that just calls callbacks and we
%% are not really up.
+ Name = amqqueue:get_name(Q),
internal_delete1(Name, true);
%% Should not happen, but let's be conservative.
forget_node_for_queue(DeadNode, [DeadNode | T], Q) ->
forget_node_for_queue(DeadNode, T, Q);
-forget_node_for_queue(DeadNode, [H|T], #amqqueue{type = Type} = Q) ->
+forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
+ Type = amqqueue:get_type(Q),
case {node_permits_offline_promotion(H), Type} of
{false, _} -> forget_node_for_queue(DeadNode, T, Q);
- {true, classic} -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)},
+ {true, classic} -> Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)),
ok = mnesia:write(rabbit_durable_queue, Q1, write);
{true, quorum} -> ok
end.
@@ -1329,37 +1410,40 @@ set_maximum_since_use(QPid, Age) ->
update_mirroring(QPid) ->
ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}).
-sync_mirrors(#amqqueue{pid = QPid}) ->
+sync_mirrors(Q) when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]});
sync_mirrors(QPid) ->
delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}).
-cancel_sync_mirrors(#amqqueue{pid = QPid}) ->
+cancel_sync_mirrors(Q) when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]});
cancel_sync_mirrors(QPid) ->
delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}).
-is_replicated(#amqqueue{type = quorum}) ->
+is_replicated(Q) when ?amqqueue_is_quorum(Q) ->
true;
is_replicated(Q) ->
rabbit_mirror_queue_misc:is_mirrored(Q).
-is_dead_exclusive(#amqqueue{exclusive_owner = none}) ->
+is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
false;
-is_dead_exclusive(#amqqueue{exclusive_owner = Pid}) when is_pid(Pid) ->
+is_dead_exclusive(Q) when ?amqqueue_exclusive_owner_is_pid(Q) ->
+ Pid = amqqueue:get_pid(Q),
not rabbit_mnesia:is_process_alive(Pid).
on_node_up(Node) ->
ok = rabbit_misc:execute_mnesia_transaction(
fun () ->
Qs = mnesia:match_object(rabbit_queue,
- #amqqueue{_ = '_'}, write),
+ amqqueue:pattern_match_all(), write),
[maybe_clear_recoverable_node(Node, Q) || Q <- Qs],
ok
end).
-maybe_clear_recoverable_node(Node,
- #amqqueue{sync_slave_pids = SPids,
- recoverable_slaves = RSs} = Q) ->
+maybe_clear_recoverable_node(Node, Q) ->
+ SPids = amqqueue:get_sync_slave_pids(Q),
+ RSs = amqqueue:get_recoverable_slaves(Q),
case lists:member(Node, RSs) of
true ->
%% There is a race with
@@ -1381,7 +1465,7 @@ maybe_clear_recoverable_node(Node,
if
DoClearNode -> RSs1 = RSs -- [Node],
store_queue(
- Q#amqqueue{recoverable_slaves = RSs1});
+ amqqueue:set_recoverable_slaves(Q, RSs1));
true -> ok
end;
false ->
@@ -1421,10 +1505,10 @@ partition_queues(T) ->
queues_to_delete_when_node_down(NodeDown) ->
rabbit_misc:execute_mnesia_transaction(fun () ->
- qlc:e(qlc:q([QName ||
- #amqqueue{name = QName, pid = Pid} = Q <- mnesia:table(rabbit_queue),
- qnode(Pid) == NodeDown andalso
- not rabbit_mnesia:is_process_alive(Pid) andalso
+ qlc:e(qlc:q([amqqueue:get_name(Q) ||
+ Q <- mnesia:table(rabbit_queue),
+ amqqueue:qnode(Q) == NodeDown andalso
+ not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)) andalso
(not rabbit_amqqueue:is_replicated(Q) orelse
rabbit_amqqueue:is_dead_exclusive(Q))]
))
@@ -1454,21 +1538,21 @@ notify_queues_deleted(QueueDeletions) ->
QueueDeletions).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
- auto_delete = false,
- arguments = [],
- pid = Pid,
- slave_pids = []}.
-
-immutable(Q) -> Q#amqqueue{pid = none,
- slave_pids = none,
- sync_slave_pids = none,
- recoverable_slaves = none,
- gm_pids = none,
- policy = none,
- decorators = none,
- state = none}.
+ pseudo_queue(QueueName, Pid, false).
+
+pseudo_queue(QueueName, Pid, Durable) ->
+ amqqueue:new(QueueName,
+ Pid,
+ Durable,
+ false,
+ none, % Owner,
+ [],
+ undefined, % VHost,
+ #{user => undefined}, % ActingUser
+ classic % Type
+ ).
+
+immutable(Q) -> amqqueue:set_immutable(Q).
deliver(Qs, Delivery) ->
deliver(Qs, Delivery, untracked),
@@ -1530,17 +1614,26 @@ deliver(Qs, Delivery = #delivery{flow = Flow,
{QPids, QuorumPids, QueueState}.
qpids([]) -> {[], [], []}; %% optimisation
-qpids([#amqqueue{pid = {LocalName, LeaderNode}, type = quorum, name = QName}]) ->
+qpids([Q]) when ?amqqueue_is_quorum(Q) ->
+ QName = amqqueue:get_name(Q),
+ {LocalName, LeaderNode} = amqqueue:get_pid(Q),
{[{{LocalName, LeaderNode}, QName}], [], []}; %% opt
-qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) ->
+qpids([Q]) ->
+ QPid = amqqueue:get_pid(Q),
+ SPids = amqqueue:get_slave_pids(Q),
{[], [QPid], SPids}; %% opt
qpids(Qs) ->
{QuoPids, MPids, SPids} =
- lists:foldl(fun (#amqqueue{pid = QPid, type = quorum, name = QName},
- {QuoPidAcc, MPidAcc, SPidAcc}) ->
+ lists:foldl(fun (Q,
+ {QuoPidAcc, MPidAcc, SPidAcc})
+ when ?amqqueue_is_quorum(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ QName = amqqueue:get_name(Q),
{[{QPid, QName} | QuoPidAcc], MPidAcc, SPidAcc};
- (#amqqueue{pid = QPid, slave_pids = SPids},
+ (Q,
{QuoPidAcc, MPidAcc, SPidAcc}) ->
+ QPid = amqqueue:get_pid(Q),
+ SPids = amqqueue:get_slave_pids(Q),
{QuoPidAcc, [QPid | MPidAcc], [SPids | SPidAcc]}
end, {[], [], []}, Qs),
{QuoPids, MPids, lists:append(SPids)}.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5782bc6e23..c6684050dc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -17,6 +17,7 @@
-module(rabbit_amqqueue_process).
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
+-include("amqqueue.hrl").
-behaviour(gen_server2).
@@ -35,7 +36,7 @@
%% Queue's state
-record(q, {
%% an #amqqueue record
- q,
+ q :: amqqueue:amqqueue(),
%% none | {exclusive consumer channel PID, consumer tag} | {single active consumer channel PID, consumer}
active_consumer,
%% Set to true if a queue has ever had a consumer.
@@ -103,7 +104,7 @@
-spec info_keys() -> rabbit_types:info_keys().
-spec init_with_backing_queue_state
- (rabbit_types:amqqueue(), atom(), tuple(), any(),
+ (amqqueue:amqqueue(), atom(), tuple(), any(),
[rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) ->
#q{}.
@@ -153,13 +154,13 @@ statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys().
init(Q) ->
process_flag(trap_exit, true),
- ?store_proc_name(Q#amqqueue.name),
- {ok, init_state(Q#amqqueue{pid = self()}), hibernate,
+ ?store_proc_name(amqqueue:get_name(Q)),
+ {ok, init_state(amqqueue:set_pid(Q, self())), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE},
?MODULE}.
init_state(Q) ->
- SingleActiveConsumerOn = case rabbit_misc:table_lookup(Q#amqqueue.arguments, <<"x-single-active-consumer">>) of
+ SingleActiveConsumerOn = case rabbit_misc:table_lookup(amqqueue:get_arguments(Q), <<"x-single-active-consumer">>) of
{bool, true} -> true;
_ -> false
end,
@@ -175,14 +176,16 @@ init_state(Q) ->
single_active_consumer_on = SingleActiveConsumerOn},
rabbit_event:init_stats_timer(State, #q.stats_timer).
-init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
+init_it(Recover, From, State = #q{q = Q})
+ when ?amqqueue_exclusive_owner_is(Q, none) ->
init_it2(Recover, From, State);
%% You used to be able to declare an exclusive durable queue. Sadly we
%% need to still tidy up after that case, there could be the remnants
%% of one left over from an upgrade. So that's why we don't enforce
%% Recover = new here.
-init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
+init_it(Recover, From, State = #q{q = Q0}) ->
+ Owner = amqqueue:get_exclusive_owner(Q0),
case rabbit_misc:is_process_alive(Owner) of
true -> erlang:monitor(process, Owner),
init_it2(Recover, From, State);
@@ -204,7 +207,9 @@ init_it2(Recover, From, State = #q{q = Q,
backing_queue_state = undefined}) ->
{Barrier, TermsOrNew} = recovery_status(Recover),
case rabbit_amqqueue:internal_declare(Q, Recover /= new) of
- {Res, #amqqueue{} = Q1} when Res == created orelse Res == existing ->
+ {Res, Q1}
+ when ?is_amqqueue(Q1) andalso
+ (Res == created orelse Res == existing) ->
case matches(Recover, Q, Q1) of
true ->
ok = file_handle_cache:register_callback(
@@ -240,13 +245,14 @@ send_reply(From, Q) -> gen_server2:reply(From, Q).
matches(new, Q1, Q2) ->
%% i.e. not policy
- Q1#amqqueue.name =:= Q2#amqqueue.name andalso
- Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
- Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
- Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
- Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
- Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
- Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids;
+ amqqueue:get_name(Q1) =:= amqqueue:get_name(Q2) andalso
+ amqqueue:is_durable(Q1) =:= amqqueue:is_durable(Q2) andalso
+ amqqueue:is_auto_delete(Q1) =:= amqqueue:is_auto_delete(Q2) andalso
+ amqqueue:get_exclusive_owner(Q1) =:= amqqueue:get_exclusive_owner(Q2) andalso
+ amqqueue:get_arguments(Q1) =:= amqqueue:get_arguments(Q2) andalso
+ amqqueue:get_pid(Q1) =:= amqqueue:get_pid(Q2) andalso
+ amqqueue:get_slave_pids(Q1) =:= amqqueue:get_slave_pids(Q2);
+%% FIXME: Should v1 vs. v2 of the same record match?
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
@@ -259,8 +265,9 @@ recovery_barrier(BarrierPid) ->
{'DOWN', MRef, process, _, _} -> ok
end.
-init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
+init_with_backing_queue_state(Q, BQ, BQS,
RateTRef, Deliveries, Senders, MTC) ->
+ Owner = amqqueue:get_exclusive_owner(Q),
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -278,14 +285,15 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
notify_decorators(startup, State3),
State3.
-terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
+ QName = amqqueue:get_name(Q0),
rabbit_core_metrics:queue_deleted(qname(State)),
terminate_shutdown(
fun (BQS) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
[Q] = mnesia:read({rabbit_queue, QName}),
- Q2 = Q#amqqueue{state = stopped},
+ Q2 = amqqueue:set_state(Q, stopped),
rabbit_amqqueue:store_queue(Q2)
end),
BQ:terminate(R, BQS)
@@ -309,7 +317,7 @@ terminate(normal, State) -> %% delete case
%% If we crashed don't try to clean up the BQS, probably best to leave it.
terminate(_Reason, State = #q{q = Q}) ->
terminate_shutdown(fun (BQS) ->
- Q2 = Q#amqqueue{state = crashed},
+ Q2 = amqqueue:set_state(Q, crashed),
rabbit_misc:execute_mnesia_transaction(
fun() ->
rabbit_amqqueue:store_queue(Q2)
@@ -318,9 +326,10 @@ terminate(_Reason, State = #q{q = Q}) ->
end, State).
terminate_delete(EmitStats, Reason0,
- State = #q{q = #amqqueue{name = QName},
+ State = #q{q = Q,
backing_queue = BQ,
status = Status}) ->
+ QName = amqqueue:get_name(Q),
ActingUser = terminated_by(Status),
fun (BQS) ->
Reason = case Reason0 of
@@ -389,7 +398,8 @@ notify_decorators(State = #q{consumers = Consumers,
decorator_callback(QName, F, A) ->
%% Look up again in case policy and hence decorators have changed
case rabbit_amqqueue:lookup(QName) of
- {ok, Q = #amqqueue{decorators = Ds}} ->
+ {ok, Q} ->
+ Ds = amqqueue:get_decorators(Q),
[ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)];
{error, not_found} ->
ok
@@ -418,7 +428,8 @@ process_args_policy(State = #q{q = Q,
Fun(args_policy_lookup(Name, Resolve, Q), StateN)
end, State#q{args_policy_version = N + 1}, ArgsTable)).
-args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) ->
+args_policy_lookup(Name, Resolve, Q) ->
+ Args = amqqueue:get_arguments(Q),
AName = <<"x-", Name/binary>>,
case {rabbit_policy:get(Name, Q), rabbit_misc:table_lookup(Args, AName)} of
{undefined, undefined} -> undefined;
@@ -442,7 +453,8 @@ init_ttl(TTL, State) -> (init_ttl(undefined, State))#q{ttl = TTL}.
init_dlx(undefined, State) ->
State#q{dlx = undefined};
-init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
+init_dlx(DLX, State = #q{q = Q}) ->
+ QName = amqqueue:get_name(Q),
State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}.
init_dlx_rkey(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}.
@@ -596,8 +608,9 @@ send_or_record_confirm(#delivery{confirm = true,
message = #basic_message {
is_persistent = true,
id = MsgId}},
- State = #q{q = #amqqueue{durable = true},
- msg_id_to_channel = MTC}) ->
+ State = #q{q = Q,
+ msg_id_to_channel = MTC})
+ when ?amqqueue_is_durable(Q) ->
MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
{eventually, State#q{msg_id_to_channel = MTC1}};
send_or_record_confirm(#delivery{confirm = true,
@@ -815,7 +828,8 @@ possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
run_message_queue(true, State1)
end.
-should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
+should_auto_delete(#q{q = Q})
+ when not ?amqqueue_is_auto_delete(Q) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
@@ -896,7 +910,7 @@ is_unused(_State) -> rabbit_queue_consumers:count() == 0.
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
-qname(#q{q = #amqqueue{name = QName}}) -> QName.
+qname(#q{q = Q}) -> amqqueue:get_name(Q).
backing_queue_timeout(State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -1001,17 +1015,18 @@ stop(Reply, State) -> {stop, normal, Reply, State}.
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(name, #q{q = #amqqueue{name = Name}}) -> Name;
-i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
-i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
-i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
+i(name, #q{q = Q}) -> amqqueue:get_name(Q);
+i(durable, #q{q = Q}) -> amqqueue:is_durable(Q);
+i(auto_delete, #q{q = Q}) -> amqqueue:is_auto_delete(Q);
+i(arguments, #q{q = Q}) -> amqqueue:get_arguments(Q);
i(pid, _) ->
self();
-i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
+i(owner_pid, #q{q = Q}) when ?amqqueue_exclusive_owner_is(Q, none) ->
'';
-i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
- ExclusiveOwner;
-i(exclusive, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
+i(owner_pid, #q{q = Q}) ->
+ amqqueue:get_exclusive_owner(Q);
+i(exclusive, #q{q = Q}) ->
+ ExclusiveOwner = amqqueue:get_exclusive_owner(Q),
is_pid(ExclusiveOwner);
i(policy, #q{q = Q}) ->
case rabbit_policy:name(Q) of
@@ -1061,27 +1076,27 @@ i(consumer_utilisation, #q{consumers = Consumers}) ->
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
-i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
- {ok, Q = #amqqueue{slave_pids = SPids}} =
- rabbit_amqqueue:lookup(Name),
+i(slave_pids, #q{q = Q0}) ->
+ Name = amqqueue:get_name(Q0),
+ {ok, Q} = rabbit_amqqueue:lookup(Name),
case rabbit_mirror_queue_misc:is_mirrored(Q) of
false -> '';
- true -> SPids
+ true -> amqqueue:get_slave_pids(Q)
end;
-i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
- {ok, Q = #amqqueue{sync_slave_pids = SSPids}} =
- rabbit_amqqueue:lookup(Name),
+i(synchronised_slave_pids, #q{q = Q0}) ->
+ Name = amqqueue:get_name(Q0),
+ {ok, Q} = rabbit_amqqueue:lookup(Name),
case rabbit_mirror_queue_misc:is_mirrored(Q) of
false -> '';
- true -> SSPids
+ true -> amqqueue:get_sync_slave_pids(Q)
end;
-i(recoverable_slaves, #q{q = #amqqueue{name = Name,
- durable = Durable}}) ->
- {ok, Q = #amqqueue{recoverable_slaves = Nodes}} =
- rabbit_amqqueue:lookup(Name),
+i(recoverable_slaves, #q{q = Q0}) ->
+ Name = amqqueue:get_name(Q0),
+ Durable = amqqueue:is_durable(Q0),
+ {ok, Q} = rabbit_amqqueue:lookup(Name),
case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of
false -> '';
- true -> Nodes
+ true -> amqqueue:get_recoverable_slaves(Q)
end;
i(state, #q{status = running}) -> credit_flow:state();
i(state, #q{status = State}) -> State;
@@ -1090,7 +1105,8 @@ i(garbage_collection, _State) ->
i(reductions, _State) ->
{reductions, Reductions} = erlang:process_info(self(), reductions),
Reductions;
-i(user_who_performed_action, #q{q = #amqqueue{options = Opts}}) ->
+i(user_who_performed_action, #q{q = Q}) ->
+ Opts = amqqueue:get_options(Q),
maps:get(user, Opts, ?UNKNOWN_USER);
i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:info(Item, BQS).
@@ -1174,7 +1190,8 @@ consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) ->
{_, _} -> Low
end.
-prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
+prioritise_info(Msg, _Len, #q{q = Q}) ->
+ DownPid = amqqueue:get_exclusive_owner(Q),
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;
update_ram_duration -> 8;
@@ -1225,7 +1242,8 @@ handle_call({notify_down, ChPid}, _From, State) ->
end;
handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
- State = #q{q = #amqqueue{name = QName}}) ->
+ State = #q{q = Q}) ->
+ QName = amqqueue:get_name(Q),
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
case fetch(AckRequired, State1) of
@@ -1546,7 +1564,8 @@ handle_cast(notify_decorators, State) ->
notify_decorators(State),
noreply(State);
-handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
+handle_cast(policy_changed, State = #q{q = Q0}) ->
+ Name = amqqueue:get_name(Q0),
%% We depend on the #q.q field being up to date at least WRT
%% policy (but not slave pids) in various places, so when it
%% changes we go and read it from Mnesia again.
@@ -1556,7 +1575,8 @@ handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
{ok, Q} = rabbit_amqqueue:lookup(Name),
noreply(process_args_policy(State#q{q = Q}));
-handle_cast({sync_start, _, _}, State = #q{q = #amqqueue{name = Name}}) ->
+handle_cast({sync_start, _, _}, State = #q{q = Q}) ->
+ Name = amqqueue:get_name(Q),
%% Only a slave should receive this, it means we are a duplicated master
rabbit_mirror_queue_misc:log_warning(
Name, "Stopping after receiving sync_start from another master", []),
@@ -1587,7 +1607,7 @@ handle_info(emit_stats, State) ->
{noreply, State1, Timeout};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
- State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
+ State = #q{q = Q}) when ?amqqueue_exclusive_owner_is(Q, DownPid) ->
%% Exclusively owned queues must disappear with their owner. In
%% the case of clean shutdown we delete the queue synchronously in
%% the reader - although not required by the spec this seems to
@@ -1665,21 +1685,23 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
log_delete_exclusive({ConPid, _ConRef}, State) ->
log_delete_exclusive(ConPid, State);
-log_delete_exclusive(ConPid, #q{ q = #amqqueue{ name = Resource } }) ->
+log_delete_exclusive(ConPid, #q{ q = Q }) ->
+ Resource = amqqueue:get_name(Q),
#resource{ name = QName, virtual_host = VHost } = Resource,
rabbit_log_queue:debug("Deleting exclusive queue '~s' in vhost '~s' " ++
"because its declaring connection ~p was closed",
[QName, VHost, ConPid]).
-log_auto_delete(Reason, #q{ q = #amqqueue{ name = Resource } }) ->
+log_auto_delete(Reason, #q{ q = Q }) ->
+ Resource = amqqueue:get_name(Q),
#resource{ name = QName, virtual_host = VHost } = Resource,
rabbit_log_queue:debug("Deleting auto-delete queue '~s' in vhost '~s' " ++
Reason,
[QName, VHost]).
needs_update_mirroring(Q, Version) ->
- {ok, UpQ} = rabbit_amqqueue:lookup(Q#amqqueue.name),
- DBVersion = UpQ#amqqueue.policy_version,
+ {ok, UpQ} = rabbit_amqqueue:lookup(amqqueue:get_name(Q)),
+ DBVersion = amqqueue:get_policy_version(UpQ),
case DBVersion > Version of
true -> {rabbit_policy:get(<<"ha-mode">>, UpQ), DBVersion};
false -> false
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 54a97b717e..7c5a70b529 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -26,7 +26,7 @@
%%----------------------------------------------------------------------------
--spec start_link(rabbit_types:amqqueue(), rabbit_prequeue:start_mode()) ->
+-spec start_link(amqqueue:amqqueue(), rabbit_prequeue:start_mode()) ->
{'ok', pid(), pid()}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index d8045276c6..7e40e54aca 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -32,7 +32,7 @@
-spec start_link() -> rabbit_types:ok_pid_or_error().
-spec start_queue_process
- (node(), rabbit_types:amqqueue(), 'declare' | 'recovery' | 'slave') ->
+ (node(), amqqueue:amqqueue(), 'declare' | 'recovery' | 'slave') ->
pid().
%%----------------------------------------------------------------------------
@@ -41,7 +41,7 @@ start_link() ->
supervisor2:start_link(?MODULE, []).
start_queue_process(Node, Q, StartMode) ->
- #amqqueue{name = #resource{virtual_host = VHost}} = Q,
+ #resource{virtual_host = VHost} = amqqueue:get_name(Q),
{ok, Sup} = find_for_vhost(VHost, Node),
{ok, _SupPid, QPid} = supervisor2:start_child(Sup, [Q, StartMode]),
QPid.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
new file mode 100644
index 0000000000..abd8635720
--- /dev/null
+++ b/src/rabbit_backing_queue.erl
@@ -0,0 +1,273 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_backing_queue).
+
+-export([info_keys/0]).
+
+-define(INFO_KEYS, [messages_ram, messages_ready_ram,
+ messages_unacknowledged_ram, messages_persistent,
+ message_bytes, message_bytes_ready,
+ message_bytes_unacknowledged, message_bytes_ram,
+ message_bytes_persistent, head_message_timestamp,
+ disk_reads, disk_writes, backing_queue_status,
+ messages_paged_out, message_bytes_paged_out]).
+
+%% We can't specify a per-queue ack/state with callback signatures
+-type ack() :: any().
+-type state() :: any().
+
+-type flow() :: 'flow' | 'noflow'.
+-type msg_ids() :: [rabbit_types:msg_id()].
+-type publish() :: {rabbit_types:basic_message(),
+ rabbit_types:message_properties(), boolean()}.
+-type delivered_publish() :: {rabbit_types:basic_message(),
+ rabbit_types:message_properties()}.
+-type fetch_result(Ack) ::
+ ('empty' | {rabbit_types:basic_message(), boolean(), Ack}).
+-type drop_result(Ack) ::
+ ('empty' | {rabbit_types:msg_id(), Ack}).
+-type recovery_terms() :: [term()] | 'non_clean_shutdown'.
+-type recovery_info() :: 'new' | recovery_terms().
+-type purged_msg_count() :: non_neg_integer().
+-type async_callback() ::
+ fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok').
+-type duration() :: ('undefined' | 'infinity' | number()).
+
+-type msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A).
+-type msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean()).
+
+-type queue_mode() :: atom().
+
+-spec info_keys() -> rabbit_types:info_keys().
+
+%% Called on startup with a vhost and a list of durable queue names on this vhost.
+%% The queues aren't being started at this point, but this call allows the
+%% backing queue to perform any checking necessary for the consistency
+%% of those queues, or initialise any other shared resources.
+%%
+%% The list of queue recovery terms returned as {ok, Terms} must be given
+%% in the same order as the list of queue names supplied.
+-callback start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
+
+%% Called to tear down any state/resources for vhost. NB: Implementations should
+%% not depend on this function being called on shutdown and instead
+%% should hook into the rabbit supervision hierarchy.
+-callback stop(rabbit_types:vhost()) -> 'ok'.
+
+%% Initialise the backing queue and its state.
+%%
+%% Takes
+%% 1. the amqqueue record
+%% 2. a term indicating whether the queue is an existing queue that
+%% should be recovered or not. When 'new' is given, no recovery is
+%% taking place, otherwise a list of recovery terms is given, or
+%% the atom 'non_clean_shutdown' if no recovery terms are available.
+%% 3. an asynchronous callback which accepts a function of type
+%% backing-queue-state to backing-queue-state. This callback
+%% function can be safely invoked from any process, which makes it
+%% useful for passing messages back into the backing queue,
+%% especially as the backing queue does not have control of its own
+%% mailbox.
+-callback init(amqqueue:amqqueue(), recovery_info(),
+ async_callback()) -> state().
+
+%% Called on queue shutdown when queue isn't being deleted.
+-callback terminate(any(), state()) -> state().
+
+%% Called when the queue is terminating and needs to delete all its
+%% content.
+-callback delete_and_terminate(any(), state()) -> state().
+
+%% Called to clean up after a crashed queue. In this case we don't
+%% have a process and thus a state(), we are just removing on-disk data.
+-callback delete_crashed(amqqueue:amqqueue()) -> 'ok'.
+
+%% Remove all 'fetchable' messages from the queue, i.e. all messages
+%% except those that have been fetched already and are pending acks.
+-callback purge(state()) -> {purged_msg_count(), state()}.
+
+%% Remove all messages in the queue which have been fetched and are
+%% pending acks.
+-callback purge_acks(state()) -> state().
+
+%% Publish a message.
+-callback publish(rabbit_types:basic_message(),
+ rabbit_types:message_properties(), boolean(), pid(), flow(),
+ state()) -> state().
+
+%% Like publish/6 but for batches of publishes.
+-callback batch_publish([publish()], pid(), flow(), state()) -> state().
+
+%% Called for messages which have already been passed straight
+%% out to a client. The queue will be empty for these calls
+%% (i.e. saves the round trip through the backing queue).
+-callback publish_delivered(rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), flow(),
+ state())
+ -> {ack(), state()}.
+
+%% Like publish_delivered/5 but for batches of publishes.
+-callback batch_publish_delivered([delivered_publish()], pid(), flow(),
+ state())
+ -> {[ack()], state()}.
+
+%% Called to inform the BQ about messages which have reached the
+%% queue, but are not going to be further passed to BQ.
+-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state().
+
+%% Return ids of messages which have been confirmed since the last
+%% invocation of this function (or initialisation).
+%%
+%% Message ids should only appear in the result of drain_confirmed
+%% under the following circumstances:
+%%
+%% 1. The message appears in a call to publish_delivered/4 and the
+%% first argument (ack_required) is false; or
+%% 2. The message is fetched from the queue with fetch/2 and the first
+%% argument (ack_required) is false; or
+%% 3. The message is acked (ack/2 is called for the message); or
+%% 4. The message is fully fsync'd to disk in such a way that the
+%% recovery of the message is guaranteed in the event of a crash of
+%% this rabbit node (excluding hardware failure).
+%%
+%% In addition to the above conditions, a message id may only appear
+%% in the result of drain_confirmed if
+%% #message_properties.needs_confirming = true when the msg was
+%% published (through whichever means) to the backing queue.
+%%
+%% It is legal for the same message id to appear in the results of
+%% multiple calls to drain_confirmed, which means that the backing
+%% queue is not required to keep track of which messages it has
+%% already confirmed. The confirm will be issued to the publisher the
+%% first time the message id appears in the result of
+%% drain_confirmed. All subsequent appearances of that message id will
+%% be ignored.
+-callback drain_confirmed(state()) -> {msg_ids(), state()}.
+
+%% Drop messages from the head of the queue while the supplied
+%% predicate on message properties returns true. Returns the first
+%% message properties for which the predictate returned false, or
+%% 'undefined' if the whole backing queue was traversed w/o the
+%% predicate ever returning false.
+-callback dropwhile(msg_pred(), state())
+ -> {rabbit_types:message_properties() | undefined, state()}.
+
+%% Like dropwhile, except messages are fetched in "require
+%% acknowledgement" mode and are passed, together with their ack tag,
+%% to the supplied function. The function is also fed an
+%% accumulator. The result of fetchwhile is as for dropwhile plus the
+%% accumulator.
+-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
+ -> {rabbit_types:message_properties() | undefined,
+ A, state()}.
+
+%% Produce the next message.
+-callback fetch(true, state()) -> {fetch_result(ack()), state()};
+ (false, state()) -> {fetch_result(undefined), state()}.
+
+%% Remove the next message.
+-callback drop(true, state()) -> {drop_result(ack()), state()};
+ (false, state()) -> {drop_result(undefined), state()}.
+
+%% Acktags supplied are for messages which can now be forgotten
+%% about. Must return 1 msg_id per Ack, in the same order as Acks.
+-callback ack([ack()], state()) -> {msg_ids(), state()}.
+
+%% Reinsert messages into the queue which have already been delivered
+%% and were pending acknowledgement.
+-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+
+%% Fold over messages by ack tag. The supplied function is called with
+%% each message, its ack tag, and an accumulator.
+-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
+
+%% Fold over all the messages in a queue and return the accumulated
+%% results, leaving the queue undisturbed.
+-callback fold(fun((rabbit_types:basic_message(),
+ rabbit_types:message_properties(),
+ boolean(), A) -> {('stop' | 'cont'), A}),
+ A, state()) -> {A, state()}.
+
+%% How long is my queue?
+-callback len(state()) -> non_neg_integer().
+
+%% Is my queue empty?
+-callback is_empty(state()) -> boolean().
+
+%% What's the queue depth, where depth = length + number of pending acks
+-callback depth(state()) -> non_neg_integer().
+
+%% For the next three functions, the assumption is that you're
+%% monitoring something like the ingress and egress rates of the
+%% queue. The RAM duration is thus the length of time represented by
+%% the messages held in RAM given the current rates. If you want to
+%% ignore all of this stuff, then do so, and return 0 in
+%% ram_duration/1.
+
+%% The target is to have no more messages in RAM than indicated by the
+%% duration and the current queue rates.
+-callback set_ram_duration_target(duration(), state()) -> state().
+
+%% Optionally recalculate the duration internally (likely to be just
+%% update your internal rates), and report how many seconds the
+%% messages in RAM represent given the current rates of the queue.
+-callback ram_duration(state()) -> {duration(), state()}.
+
+%% Should 'timeout' be called as soon as the queue process can manage
+%% (either on an empty mailbox, or when a timer fires)?
+-callback needs_timeout(state()) -> 'false' | 'timed' | 'idle'.
+
+%% Called (eventually) after needs_timeout returns 'idle' or 'timed'.
+%% Note this may be called more than once for each 'idle' or 'timed'
+%% returned from needs_timeout
+-callback timeout(state()) -> state().
+
+%% Called immediately before the queue hibernates.
+-callback handle_pre_hibernate(state()) -> state().
+
+%% Called when more credit has become available for credit_flow.
+-callback resume(state()) -> state().
+
+%% Used to help prioritisation in rabbit_amqqueue_process. The rate of
+%% inbound messages and outbound messages at the moment.
+-callback msg_rates(state()) -> {float(), float()}.
+
+-callback info(atom(), state()) -> any().
+
+%% Passed a function to be invoked with the relevant backing queue's
+%% state. Useful for when the backing queue or other components need
+%% to pass functions into the backing queue.
+-callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state().
+
+%% Called prior to a publish or publish_delivered call. Allows the BQ
+%% to signal that it's already seen this message, (e.g. it was published
+%% or discarded previously) specifying whether to drop the message or reject it.
+-callback is_duplicate(rabbit_types:basic_message(), state())
+ -> {{true, drop} | {true, reject} | boolean(), state()}.
+
+-callback set_queue_mode(queue_mode(), state()) -> state().
+
+-callback zip_msgs_and_acks(delivered_publish(),
+ [ack()], Acc, state())
+ -> Acc.
+
+%% Called when rabbit_amqqueue_process receives a message via
+%% handle_info and it should be processed by the backing
+%% queue
+-callback handle_info(term(), state()) -> state().
+
+info_keys() -> ?INFO_KEYS.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index bb1c754b5c..2b63b809b8 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -15,7 +15,8 @@
%%
-module(rabbit_binding).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-export([recover/0, recover/2, exists/1, add/2, add/3, remove/1, remove/3, list/1]).
-export([list_for_source/1, list_for_destination/1,
@@ -44,7 +45,7 @@
{'resources_missing',
[{'not_found', (rabbit_types:binding_source() |
rabbit_types:binding_destination())} |
- {'absent', rabbit_types:amqqueue()}]}).
+ {'absent', amqqueue:amqqueue()}]}).
-type bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error(
@@ -53,7 +54,7 @@
-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
-type inner_fun() ::
fun((rabbit_types:exchange(),
- rabbit_types:exchange() | rabbit_types:amqqueue()) ->
+ rabbit_types:exchange() | amqqueue:amqqueue()) ->
rabbit_types:ok_or_error(rabbit_types:amqp_error())).
-type bindings() :: [rabbit_types:binding()].
@@ -393,7 +394,8 @@ remove_transient_for_destination(DstName) ->
%%----------------------------------------------------------------------------
durable(#exchange{durable = D}) -> D;
-durable(#amqqueue{durable = D}) -> D.
+durable(Q) when ?is_amqqueue(Q) ->
+ amqqueue:is_durable(Q).
binding_action(Binding = #binding{source = SrcName,
destination = DstName,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 634789adab..dac3037bff 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -51,6 +51,7 @@
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-behaviour(gen_server2).
@@ -1366,7 +1367,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, {Q = #amqqueue{pid = QPid}, _CParams}} ->
+ {ok, {Q, _CParams}} when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping),
QRef = qpid_to_ref(QPid),
QCons1 =
@@ -1636,7 +1638,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
Username, QueueStates0),
Q}
end) of
- {{ok, QueueStates}, Q = #amqqueue{pid = QPid, name = QName}} ->
+ {{ok, QueueStates}, Q} when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ QName = amqqueue:get_name(Q),
CM1 = maps:put(
ActualConsumerTag,
{Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}},
@@ -1649,7 +1653,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
true -> consumer_monitor(ActualConsumerTag, State1);
false -> State1
end};
- {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
+ {ok, Q} when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ QName = amqqueue:get_name(Q),
CM1 = maps:put(
ActualConsumerTag,
{Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}},
@@ -1674,7 +1680,8 @@ consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
queue_consumers = QCons}) ->
- {#amqqueue{pid = QPid}, _} = maps:get(ConsumerTag, ConsumerMapping),
+ {Q, _} = maps:get(ConsumerTag, ConsumerMapping),
+ QPid = amqqueue:get_pid(Q),
QRef = qpid_to_ref(QPid),
CTags1 = case maps:find(QRef, QCons) of
{ok, CTags} -> gb_sets:insert(ConsumerTag, CTags);
@@ -1782,7 +1789,7 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
destination = DestinationName,
key = RoutingKey,
args = Arguments},
- fun (_X, Q = #amqqueue{}) ->
+ fun (_X, Q) when ?is_amqqueue(Q) ->
try rabbit_amqqueue:check_exclusive_access(Q, ConnPid)
catch exit:Reason -> {error, Reason}
end;
@@ -1791,9 +1798,9 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
end,
Username) of
{error, {resources_missing, [{not_found, Name} | _]}} ->
- rabbit_misc:not_found(Name);
+ rabbit_amqqueue:not_found(Name);
{error, {resources_missing, [{absent, Q, Reason} | _]}} ->
- rabbit_misc:absent(Q, Reason);
+ rabbit_amqqueue:absent(Q, Reason);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
@@ -1956,8 +1963,9 @@ foreach_per_queue(F, UAL, Acc) ->
rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T).
consumer_queue_refs(Consumers) ->
- lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}}
- <- maps:to_list(Consumers)]).
+ lists:usort([qpid_to_ref(amqqueue:get_pid(Q))
+ || {_Key, {Q, _CParams}} <- maps:to_list(Consumers),
+ amqqueue:is_amqqueue(Q)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
@@ -2011,9 +2019,10 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
%% since alternative algorithms to update queue_names less
%% frequently would in fact be more expensive in the common case.
{QNames1, QMons1} =
- lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
- {QNames0, QMons0}) ->
+ lists:foldl(fun (Q, {QNames0, QMons0}) when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
QRef = qpid_to_ref(QPid),
+ QName = amqqueue:get_name(Q),
{case maps:is_key(QRef, QNames0) of
true -> QNames0;
false -> maps:put(QRef, QName, QNames0)
@@ -2287,7 +2296,7 @@ handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
case declare_fast_reply_to(StrippedQueueNameBin) of
exists -> {ok, QueueName, 0, 1};
- not_found -> rabbit_misc:not_found(QueueName)
+ not_found -> rabbit_amqqueue:not_found(QueueName)
end;
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = false,
@@ -2338,11 +2347,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
end,
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
Args, Owner, Username) of
- {new, #amqqueue{pid = QPid}} ->
+ {new, Q} when ?is_amqqueue(Q) ->
%% We need to notify the reader within the channel
%% process so that we can be sure there are no
%% outstanding exclusive queues being declared as
%% the connection shuts down.
+ QPid = amqqueue:get_pid(Q),
ok = case {Owner, CollectorPid} of
{none, _} -> ok;
{_, none} -> ok; %% Supports call from mgmt API
@@ -2357,7 +2367,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(Declare, ConnPid, CollectorPid, VHostPath,
User);
{absent, Q, Reason} ->
- rabbit_misc:absent(Q, Reason);
+ rabbit_amqqueue:absent(Q, Reason);
{owner_died, _Q} ->
%% Presumably our own days are numbered since the
%% connection has died. Pretend the queue exists though,
@@ -2365,7 +2375,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{ok, QueueName, 0, 0}
end;
{error, {absent, Q, Reason}} ->
- rabbit_misc:absent(Q, Reason)
+ rabbit_amqqueue:absent(Q, Reason)
end;
handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
@@ -2373,9 +2383,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
ConnPid, _CollectorPid, VHostPath, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
- {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
- rabbit_amqqueue:with_or_die(
- QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end),
+ Fun = fun (Q0) ->
+ QStat = maybe_stat(NoWait, Q0),
+ {QStat, Q0}
+ end,
+ %% Note: no need to check if Q is an #amqqueue, with_or_die does it
+ {{ok, MessageCount, ConsumerCount}, Q} = rabbit_amqqueue:with_or_die(QueueName, Fun),
ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
{ok, QueueName, MessageCount, ConsumerCount};
handle_method(#'queue.delete'{queue = QueueNameBin,
@@ -2399,7 +2412,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
{ok, 0};
({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username),
{ok, 0};
- ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
+ ({absent, Q, Reason}) -> rabbit_amqqueue:absent(Q, Reason)
end) of
{error, in_use} ->
precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl
new file mode 100644
index 0000000000..158eca4df8
--- /dev/null
+++ b/src/rabbit_core_ff.erl
@@ -0,0 +1,53 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_core_ff).
+
+-export([quorum_queue_migration/3]).
+
+-rabbit_feature_flag(
+ {quorum_queue,
+ #{desc => "Support queues of type `quorum`",
+ doc_url => "http://www.rabbitmq.com/quorum-queues.html",
+ stability => stable,
+ migration_fun => {?MODULE, quorum_queue_migration}
+ }}).
+
+quorum_queue_migration(FeatureName, _FeatureProps, enable) ->
+ Tables = [rabbit_queue,
+ rabbit_durable_queue],
+ rabbit_table:wait(Tables),
+ Fields = amqqueue:fields(amqqueue_v2),
+ migrate_to_amqqueue_with_type(FeatureName, Tables, Fields);
+quorum_queue_migration(_FeatureName, _FeatureProps, is_enabled) ->
+ Fields = amqqueue:fields(amqqueue_v2),
+ mnesia:table_info(rabbit_queue, attributes) =:= Fields andalso
+ mnesia:table_info(rabbit_durable_queue, attributes) =:= Fields.
+
+migrate_to_amqqueue_with_type(FeatureName, [Table | Rest], Fields) ->
+ rabbit_log:info("Feature flag `~s`: migrating Mnesia table ~s...",
+ [FeatureName, Table]),
+ Fun = fun(Queue) -> amqqueue:upgrade_to(amqqueue_v2, Queue) end,
+ case mnesia:transform_table(Table, Fun, Fields) of
+ {atomic, ok} -> migrate_to_amqqueue_with_type(FeatureName,
+ Rest,
+ Fields);
+ {aborted, Reason} -> {error, Reason}
+ end;
+migrate_to_amqqueue_with_type(FeatureName, [], _) ->
+ rabbit_log:info("Feature flag `~s`: Mnesia tables migration done",
+ [FeatureName]),
+ ok.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 9c879ad041..2480e7c3d0 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -259,7 +259,7 @@ lookup(Name) ->
lookup_or_die(Name) ->
case lookup(Name) of
{ok, X} -> X;
- {error, not_found} -> rabbit_misc:not_found(Name)
+ {error, not_found} -> rabbit_amqqueue:not_found(Name)
end.
list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}).
diff --git a/src/rabbit_fhc_helpers.erl b/src/rabbit_fhc_helpers.erl
index 90833795da..57dbd981f1 100644
--- a/src/rabbit_fhc_helpers.erl
+++ b/src/rabbit_fhc_helpers.erl
@@ -18,7 +18,7 @@
-export([clear_read_cache/0]).
--include("rabbit.hrl"). % For #amqqueue record definition.
+-include("amqqueue.hrl").
clear_read_cache() ->
case application:get_env(rabbit, fhc_read_buffering) of
@@ -37,7 +37,9 @@ clear_vhost_read_cache([VHost | Rest]) ->
clear_queue_read_cache([]) ->
ok;
-clear_queue_read_cache([#amqqueue{pid = MPid, slave_pids = SPids} | Rest]) ->
+clear_queue_read_cache([Q | Rest]) when ?is_amqqueue(Q) ->
+ MPid = amqqueue:get_pid(Q),
+ SPids = amqqueue:get_slave_pids(Q),
%% Limit the action to the current node.
Pids = [P || P <- [MPid | SPids], node(P) =:= node()],
%% This function is executed in the context of the backing queue
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index f2426e156f..9e5bddb28a 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -26,7 +26,8 @@
-behaviour(gen_server2).
-behaviour(gm).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-include("gm_specs.hrl").
-record(state, { q,
@@ -37,7 +38,7 @@
}).
-spec start_link
- (rabbit_types:amqqueue(), pid() | 'undefined',
+ (amqqueue:amqqueue(), pid() | 'undefined',
rabbit_mirror_queue_master:death_fun(),
rabbit_mirror_queue_master:depth_fun()) ->
rabbit_types:ok_pid_or_error().
@@ -319,7 +320,8 @@ ensure_monitoring(CPid, Pids) ->
%% gen_server
%% ---------------------------------------------------------------------------
-init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
+init([Q, GM, DeathFun, DepthFun]) when ?is_amqqueue(Q) ->
+ QueueName = amqqueue:get_name(Q),
?store_proc_name(QueueName),
GM1 = case GM of
undefined ->
@@ -345,9 +347,9 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
handle_call(get_gm, _From, State = #state { gm = GM }) ->
reply(GM, State).
-handle_cast({gm_deaths, DeadGMPids},
- State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
- when node(MPid) =:= node() ->
+handle_cast({gm_deaths, DeadGMPids}, State = #state{q = Q}) when ?amqqueue_pid_runs_on_local_node(Q) ->
+ QueueName = amqqueue:get_name(Q),
+ MPid = amqqueue:get_pid(Q),
case rabbit_mirror_queue_misc:remove_from_queue(
QueueName, MPid, DeadGMPids) of
{ok, MPid, DeadPids, ExtraNodes} ->
@@ -373,14 +375,15 @@ handle_cast({gm_deaths, DeadGMPids},
error(unexpected_mirrored_state)
end;
-handle_cast(request_depth, State = #state { depth_fun = DepthFun,
- q = #amqqueue { name = QName, pid = MPid }}) ->
+handle_cast(request_depth, State = #state{depth_fun = DepthFun, q = QArg}) when ?is_amqqueue(QArg) ->
+ QName = amqqueue:get_name(QArg),
+ MPid = amqqueue:get_pid(QArg),
case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{ pid = MPid }} ->
- ok = DepthFun(),
- noreply(State);
- _ ->
- {stop, shutdown, State}
+ {ok, QFound} when ?amqqueue_pid_equals(QFound, MPid) ->
+ ok = DepthFun(),
+ noreply(State);
+ _ ->
+ {stop, shutdown, State}
end;
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4cc6856442..a5736a36fc 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -34,7 +34,8 @@
-behaviour(rabbit_backing_queue).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-record(state, { name,
gm,
@@ -68,7 +69,7 @@
-spec sender_death_fun() -> death_fun().
-spec depth_fun() -> depth_fun().
--spec init_with_existing_bq(rabbit_types:amqqueue(), atom(), any()) ->
+-spec init_with_existing_bq(amqqueue:amqqueue(), atom(), any()) ->
master_state().
-spec stop_mirroring(master_state()) -> {atom(), any()}.
-spec sync_mirrors(stats_fun(), stats_fun(), master_state()) ->
@@ -100,43 +101,46 @@ init(Q, Recover, AsyncCallback) ->
ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
State.
-init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
+init_with_existing_bq(Q0, BQ, BQS) when ?is_amqqueue(Q0) ->
+ QName = amqqueue:get_name(Q0),
case rabbit_mirror_queue_coordinator:start_link(
- Q, undefined, sender_death_fun(), depth_fun()) of
- {ok, CPid} ->
- GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- Self = self(),
- ok = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue{gm_pids = GMPids}]
- = mnesia:read({rabbit_queue, QName}),
- ok = rabbit_amqqueue:store_queue(
- Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
- state = live})
- end),
- {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
- %% We need synchronous add here (i.e. do not return until the
- %% slave is running) so that when queue declaration is finished
- %% all slaves are up; we don't want to end up with unsynced slaves
- %% just by declaring a new queue. But add can't be synchronous all
- %% the time as it can be called by slaves and that's
- %% deadlock-prone.
- rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
- #state { name = QName,
- gm = GM,
- coordinator = CPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = #{},
- confirmed = [],
- known_senders = sets:new(),
- wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) };
- {error, Reason} ->
- %% The GM can shutdown before the coordinator has started up
- %% (lost membership or missing group), thus the start_link of
- %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
- % is trapping exists
- throw({coordinator_not_started, Reason})
+ Q0, undefined, sender_death_fun(), depth_fun()) of
+ {ok, CPid} ->
+ GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ Self = self(),
+ Fun = fun () ->
+ [Q1] = mnesia:read({rabbit_queue, QName}),
+ true = amqqueue:is_amqqueue(Q1),
+ GMPids0 = amqqueue:get_gm_pids(Q1),
+ GMPids1 = [{GM, Self} | GMPids0],
+ Q2 = amqqueue:set_gm_pids(Q1, GMPids1),
+ Q3 = amqqueue:set_state(Q2, live),
+ ok = rabbit_amqqueue:store_queue(Q3)
+ end,
+ ok = rabbit_misc:execute_mnesia_transaction(Fun),
+ {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
+ %% We need synchronous add here (i.e. do not return until the
+ %% slave is running) so that when queue declaration is finished
+ %% all slaves are up; we don't want to end up with unsynced slaves
+ %% just by declaring a new queue. But add can't be synchronous all
+ %% the time as it can be called by slaves and that's
+ %% deadlock-prone.
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
+ #state{name = QName,
+ gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = #{},
+ confirmed = [],
+ known_senders = sets:new(),
+ wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000)};
+ {error, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ % is trapping exists
+ throw({coordinator_not_started, Reason})
end.
stop_mirroring(State = #state { coordinator = CPid,
@@ -156,7 +160,8 @@ sync_mirrors(HandleInfo, EmitStats,
QName, "Synchronising: " ++ Fmt ++ "~n", Params)
end,
Log("~p messages to synchronise", [BQ:len(BQS)]),
- {ok, #amqqueue{slave_pids = SPids} = Q} = rabbit_amqqueue:lookup(QName),
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ SPids = amqqueue:get_slave_pids(Q),
SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q),
Log("batch size: ~p", [SyncBatchSize]),
Ref = make_ref(),
@@ -193,8 +198,8 @@ terminate(Reason,
%% Backing queue termination. The queue is going down but
%% shouldn't be deleted. Most likely safe shutdown of this
%% node.
- {ok, Q = #amqqueue{sync_slave_pids = SSPids}} =
- rabbit_amqqueue:lookup(QName),
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ SSPids = amqqueue:get_sync_slave_pids(Q),
case SSPids =:= [] andalso
rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of
true -> %% Remove the whole queue to avoid data loss
@@ -213,7 +218,8 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
- {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ SPids = amqqueue:get_slave_pids(Q),
rabbit_mirror_queue_misc:stop_all_slaves(Reason, SPids, QName, GM, WT).
purge(State = #state { gm = GM,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index ac933d4df4..9fc70527c0 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -31,7 +31,8 @@
%% for testing only
-export([module/1]).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-define(HA_NODES_MODULE, rabbit_mirror_queue_mode_nodes).
@@ -61,18 +62,18 @@
{'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}.
-spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') ->
'ok'.
--spec store_updated_slaves(rabbit_types:amqqueue()) ->
- rabbit_types:amqqueue().
--spec initial_queue_node(rabbit_types:amqqueue(), node()) -> node().
--spec suggested_queue_nodes(rabbit_types:amqqueue()) ->
+-spec store_updated_slaves(amqqueue:amqqueue()) ->
+ amqqueue:amqqueue().
+-spec initial_queue_node(amqqueue:amqqueue(), node()) -> node().
+-spec suggested_queue_nodes(amqqueue:amqqueue()) ->
{node(), [node()]}.
--spec is_mirrored(rabbit_types:amqqueue()) -> boolean().
+-spec is_mirrored(amqqueue:amqqueue()) -> boolean().
-spec update_mirrors
- (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'.
+ (amqqueue:amqqueue(), amqqueue:amqqueue()) -> 'ok'.
-spec update_mirrors
- (rabbit_types:amqqueue()) -> 'ok'.
--spec maybe_drop_master_after_sync(rabbit_types:amqqueue()) -> 'ok'.
--spec maybe_auto_sync(rabbit_types:amqqueue()) -> 'ok'.
+ (amqqueue:amqqueue()) -> 'ok'.
+-spec maybe_drop_master_after_sync(amqqueue:amqqueue()) -> 'ok'.
+-spec maybe_auto_sync(amqqueue:amqqueue()) -> 'ok'.
-spec log_info(rabbit_amqqueue:name(), string(), [any()]) -> 'ok'.
-spec log_warning(rabbit_amqqueue:name(), string(), [any()]) -> 'ok'.
@@ -86,10 +87,11 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% get here. Or, gm group could've altered. see rabbitmq-server#914
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids,
- sync_slave_pids = SyncSPids,
- gm_pids = GMPids }] ->
+ [Q0] when ?is_amqqueue(Q0) ->
+ QPid = amqqueue:get_pid(Q0),
+ SPids = amqqueue:get_slave_pids(Q0),
+ SyncSPids = amqqueue:get_sync_slave_pids(Q0),
+ GMPids = amqqueue:get_gm_pids(Q0),
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
lists:member(GM, DeadGMPids)
@@ -109,7 +111,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
_ -> promote_slave(Alive)
end,
DoNotPromote = SyncSPids =:= [] andalso
- rabbit_policy:get(<<"ha-promote-on-failure">>, Q) =:= <<"when-synced">>,
+ rabbit_policy:get(<<"ha-promote-on-failure">>, Q0) =:= <<"when-synced">>,
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
{ok, QPid1, DeadPids, []};
@@ -124,23 +126,23 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% we're ok to update mnesia; or we have
%% become the master. If gm altered,
%% we have no choice but to proceed.
- Q1 = Q#amqqueue{pid = QPid1,
- slave_pids = SPids1,
- gm_pids = AliveGM},
- store_updated_slaves(Q1),
+ Q1 = amqqueue:set_pid(Q0, QPid1),
+ Q2 = amqqueue:set_slave_pids(Q1, SPids1),
+ Q3 = amqqueue:set_gm_pids(Q2, AliveGM),
+ store_updated_slaves(Q3),
%% If we add and remove nodes at the
%% same time we might tell the old
%% master we need to sync and then
%% shut it down. So let's check if
%% the new master needs to sync.
- maybe_auto_sync(Q1),
- {ok, QPid1, DeadPids, slaves_to_start_on_failure(Q1, DeadGMPids)};
+ maybe_auto_sync(Q3),
+ {ok, QPid1, DeadPids, slaves_to_start_on_failure(Q3, DeadGMPids)};
_ ->
%% Master has changed, and we're not it.
%% [1].
- Q1 = Q#amqqueue{slave_pids = Alive,
- gm_pids = AliveGM},
- store_updated_slaves(Q1),
+ Q1 = amqqueue:set_slave_pids(Q0, Alive),
+ Q2 = amqqueue:set_gm_pids(Q1, AliveGM),
+ store_updated_slaves(Q2),
{ok, QPid1, DeadPids, []}
end
end
@@ -185,13 +187,12 @@ on_vhost_up(VHost) ->
fun () ->
mnesia:foldl(
fun
- (#amqqueue{name = #resource{virtual_host = OtherVhost}},
- QNames0) when OtherVhost =/= VHost ->
+ (Q, QNames0) when not ?amqqueue_vhost_equals(Q, VHost) ->
QNames0;
- (Q = #amqqueue{name = QName,
- pid = Pid,
- slave_pids = SPids,
- type = classic}, QNames0) ->
+ (Q, QNames0) when ?amqqueue_is_classic(Q) ->
+ QName = amqqueue:get_name(Q),
+ Pid = amqqueue:get_pid(Q),
+ SPids = amqqueue:get_slave_pids(Q),
%% We don't want to pass in the whole
%% cluster - we don't want a situation
%% where starting one node causes us to
@@ -221,7 +222,10 @@ drop_mirrors(QName, Nodes) ->
drop_mirror(QName, MirrorNode) ->
case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids }} ->
+ {ok, Q} when ?is_amqqueue(Q) ->
+ Name = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
+ SPids = amqqueue:get_slave_pids(Q),
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] ->
{error, {queue_not_mirrored_on_node, MirrorNode}};
@@ -247,7 +251,7 @@ add_mirror(QName, MirrorNode, SyncMode) ->
rabbit_misc:with_exit_handler(
rabbit_misc:const(ok),
fun () ->
- #amqqueue{name = #resource{virtual_host = VHost}} = Q,
+ #resource{virtual_host = VHost} = amqqueue:get_name(Q),
case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of
{ok, _} ->
SPid = rabbit_amqqueue_sup_sup:start_queue_process(
@@ -285,19 +289,21 @@ log_warning(QName, Fmt, Args) ->
rabbit_log_mirroring:warning("Mirrored ~s: " ++ Fmt,
[rabbit_misc:rs(QName) | Args]).
-store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
- sync_slave_pids = SSPids,
- recoverable_slaves = RS}) ->
+store_updated_slaves(Q0) when ?is_amqqueue(Q0) ->
+ SPids = amqqueue:get_slave_pids(Q0),
+ SSPids = amqqueue:get_sync_slave_pids(Q0),
+ RS0 = amqqueue:get_recoverable_slaves(Q0),
%% TODO now that we clear sync_slave_pids in rabbit_durable_queue,
%% do we still need this filtering?
SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
- Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
- recoverable_slaves = update_recoverable(SPids, RS),
- state = live},
- ok = rabbit_amqqueue:store_queue(Q1),
+ Q1 = amqqueue:set_sync_slave_pids(Q0, SSPids1),
+ RS1 = update_recoverable(SPids, RS0),
+ Q2 = amqqueue:set_recoverable_slaves(Q1, RS1),
+ Q3 = amqqueue:set_state(Q2, live),
+ ok = rabbit_amqqueue:store_queue(Q3),
%% Wake it up so that we emit a stats event
- rabbit_amqqueue:notify_policy_changed(Q1),
- Q1.
+ rabbit_amqqueue:notify_policy_changed(Q3),
+ Q3.
%% Recoverable nodes are those which we could promote if the whole
%% cluster were to suddenly stop and we then lose the master; i.e. all
@@ -346,13 +352,14 @@ stop_all_slaves(Reason, SPids, QName, GM, WaitTimeout) ->
%% notice and update Mnesia. But we just removed them all, and
%% have stopped listening ourselves. So manually clean up.
rabbit_misc:execute_mnesia_transaction(fun () ->
- [Q] = mnesia:read({rabbit_queue, QName}),
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { gm_pids = [], slave_pids = [],
- %% Restarted slaves on running nodes can
- %% ensure old incarnations are stopped using
- %% the pending slave pids.
- slave_pids_pending_shutdown = PendingSlavePids})
+ [Q0] = mnesia:read({rabbit_queue, QName}),
+ Q1 = amqqueue:set_gm_pids(Q0, []),
+ Q2 = amqqueue:set_slave_pids(Q1, []),
+ %% Restarted slaves on running nodes can
+ %% ensure old incarnations are stopped using
+ %% the pending slave pids.
+ Q3 = amqqueue:set_slave_pids_pending_shutdown(Q2, PendingSlavePids),
+ rabbit_mirror_queue_misc:store_updated_slaves(Q3)
end),
ok = gm:forget_group(QName).
@@ -373,7 +380,8 @@ suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
%% The third argument exists so we can pull a call to
%% rabbit_mnesia:cluster_nodes(running) out of a loop or transaction
%% or both.
-suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, DefNode, All) ->
+suggested_queue_nodes(Q, DefNode, All) when ?is_amqqueue(Q) ->
+ Owner = amqqueue:get_exclusive_owner(Q),
{MNode0, SNodes, SSNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> DefNode;
@@ -395,7 +403,7 @@ policy(Policy, Q) ->
P -> P
end.
-module(#amqqueue{} = Q) ->
+module(Q) when ?is_amqqueue(Q) ->
case rabbit_policy:get(<<"ha-mode">>, Q) of
undefined -> not_mirrored;
Mode -> module(Mode)
@@ -430,16 +438,18 @@ is_mirrored_ha_nodes(Q) ->
_ -> false
end.
-actual_queue_nodes(#amqqueue{pid = MPid,
- slave_pids = SPids,
- sync_slave_pids = SSPids}) ->
+actual_queue_nodes(Q) when ?is_amqqueue(Q) ->
+ MPid = amqqueue:get_pid(Q),
+ SPids = amqqueue:get_slave_pids(Q),
+ SSPids = amqqueue:get_sync_slave_pids(Q),
Nodes = fun (L) -> [node(Pid) || Pid <- L] end,
{case MPid of
none -> none;
_ -> node(MPid)
end, Nodes(SPids), Nodes(SSPids)}.
-maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
+maybe_auto_sync(Q) when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
case policy(<<"ha-sync-mode">>, Q) of
<<"automatic">> ->
spawn(fun() -> rabbit_amqqueue:sync_mirrors(QPid) end);
@@ -447,23 +457,27 @@ maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
ok
end.
-sync_queue(Q) ->
- rabbit_amqqueue:with(
- Q, fun(#amqqueue{pid = QPid, type = classic}) ->
- rabbit_amqqueue:sync_mirrors(QPid);
- (#amqqueue{type = quorum}) ->
- {error, quorum_queue_not_supported}
- end).
-
-cancel_sync_queue(Q) ->
- rabbit_amqqueue:with(
- Q, fun(#amqqueue{pid = QPid, type = classic}) ->
- rabbit_amqqueue:cancel_sync_mirrors(QPid);
- (#amqqueue{type = quorum}) ->
- {error, quorum_queue_not_supported}
- end).
-
-sync_batch_size(#amqqueue{} = Q) ->
+sync_queue(Q0) ->
+ F = fun
+ (Q) when ?amqqueue_is_classic(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ rabbit_amqqueue:sync_mirrors(QPid);
+ (Q) when ?amqqueue_is_quorum(Q) ->
+ {error, quorum_queue_not_supported}
+ end,
+ rabbit_amqqueue:with(Q0, F).
+
+cancel_sync_queue(Q0) ->
+ F = fun
+ (Q) when ?amqqueue_is_classic(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ rabbit_amqqueue:cancel_sync_mirrors(QPid);
+ (Q) when ?amqqueue_is_quorum(Q) ->
+ {error, quorum_queue_not_supported}
+ end,
+ rabbit_amqqueue:with(Q0, F).
+
+sync_batch_size(Q) when ?is_amqqueue(Q) ->
case policy(<<"ha-sync-batch-size">>, Q) of
none -> %% we need this case because none > 1 == true
default_batch_size();
@@ -479,14 +493,17 @@ default_batch_size() ->
rabbit_misc:get_env(rabbit, mirroring_sync_batch_size,
?DEFAULT_BATCH_SIZE).
-update_mirrors(OldQ = #amqqueue{pid = QPid},
- NewQ = #amqqueue{pid = QPid}) ->
+update_mirrors(OldQ, NewQ) when ?amqqueue_pids_are_equal(OldQ, NewQ) ->
+ % Note: we do want to ensure both queues have same pid
+ QPid = amqqueue:get_pid(OldQ),
+ QPid = amqqueue:get_pid(NewQ),
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
{false, false} -> ok;
_ -> rabbit_amqqueue:update_mirroring(QPid)
end.
-update_mirrors(Q = #amqqueue{name = QName}) ->
+update_mirrors(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
{OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
{NewMNode, NewSNodes} = suggested_queue_nodes(Q),
OldNodes = [OldMNode | OldSNodes],
@@ -512,8 +529,9 @@ update_mirrors(Q = #amqqueue{name = QName}) ->
%% We don't just call update_mirrors/2 here since that could decide to
%% start a slave for some other reason, and since we are the slave ATM
%% that allows complicated deadlocks.
-maybe_drop_master_after_sync(Q = #amqqueue{name = QName,
- pid = MPid}) ->
+maybe_drop_master_after_sync(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ MPid = amqqueue:get_pid(Q),
{DesiredMNode, DesiredSNodes} = suggested_queue_nodes(Q),
case node(MPid) of
DesiredMNode -> ok;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index bf0a5a7ed0..637d49035e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -35,8 +35,9 @@
-behaviour(gen_server2).
-behaviour(gm).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-include("gm_specs.hrl").
%%----------------------------------------------------------------------------
@@ -76,8 +77,9 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
-init(Q) ->
- ?store_proc_name(Q#amqqueue.name),
+init(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ ?store_proc_name(QName),
{ok, {not_started, Q}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}, ?MODULE}.
@@ -85,7 +87,8 @@ init(Q) ->
go(SPid, sync) -> gen_server2:call(SPid, go, infinity);
go(SPid, async) -> gen_server2:cast(SPid, go).
-handle_go(Q = #amqqueue{name = QName}) ->
+handle_go(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
%% We join the GM group before we add ourselves to the amqqueue
%% record. As a result:
%% 1. We can receive msgs from GM that correspond to messages we will
@@ -119,10 +122,10 @@ handle_go(Q = #amqqueue{name = QName}) ->
ok = rabbit_memory_monitor:register(
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
- Q1 = Q #amqqueue { pid = QPid },
+ QPid = amqqueue:get_pid(Q),
_ = BQ:delete_crashed(Q), %% For crash recovery
- BQS = bq_init(BQ, Q1, new),
- State = #state { q = Q1,
+ BQS = bq_init(BQ, Q, new),
+ State = #state { q = Q,
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -139,7 +142,7 @@ handle_go(Q = #amqqueue{name = QName}) ->
},
ok = gm:broadcast(GM, request_depth),
ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
- rabbit_mirror_queue_misc:maybe_auto_sync(Q1),
+ rabbit_mirror_queue_misc:maybe_auto_sync(Q),
{ok, State};
{stale, StalePid} ->
rabbit_mirror_queue_misc:log_warning(
@@ -163,8 +166,11 @@ handle_go(Q = #amqqueue{name = QName}) ->
init_it(Self, GM, Node, QName) ->
case mnesia:read({rabbit_queue, QName}) of
- [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids,
- slave_pids_pending_shutdown = PSPids}] ->
+ [Q] when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ SPids = amqqueue:get_slave_pids(Q),
+ GMPids = amqqueue:get_gm_pids(Q),
+ PSPids = amqqueue:get_slave_pids_pending_shutdown(Q),
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
[] -> stop_pending_slaves(QName, PSPids),
add_slave(Q, Self, GM),
@@ -175,12 +181,11 @@ init_it(Self, GM, Node, QName) ->
end;
[SPid] -> case rabbit_mnesia:is_process_alive(SPid) of
true -> existing;
- false -> GMPids1 = [T || T = {_, S} <- GMPids,
- S =/= SPid],
- Q1 = Q#amqqueue{
- slave_pids = SPids -- [SPid],
- gm_pids = GMPids1},
- add_slave(Q1, Self, GM),
+ false -> GMPids1 = [T || T = {_, S} <- GMPids, S =/= SPid],
+ SPids1 = SPids -- [SPid],
+ Q1 = amqqueue:set_slave_pids(Q, SPids1),
+ Q2 = amqqueue:set_gm_pids(Q1, GMPids1),
+ add_slave(Q2, Self, GM),
{new, QPid, GMPids1}
end
end;
@@ -212,9 +217,14 @@ stop_pending_slaves(QName, Pids) ->
%% Add to the end, so they are in descending order of age, see
%% rabbit_mirror_queue_misc:promote_slave/1
-add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}).
+add_slave(Q0, New, GM) when ?is_amqqueue(Q0) ->
+ SPids = amqqueue:get_slave_pids(Q0),
+ GMPids = amqqueue:get_gm_pids(Q0),
+ SPids1 = SPids ++ [New],
+ GMPids1 = [{GM, New} | GMPids],
+ Q1 = amqqueue:set_slave_pids(Q0, SPids1),
+ Q2 = amqqueue:set_gm_pids(Q1, GMPids1),
+ rabbit_mirror_queue_misc:store_updated_slaves(Q2).
handle_call(go, _From, {not_started, Q} = NotStarted) ->
case handle_go(Q) of
@@ -223,10 +233,11 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
end;
handle_call({gm_deaths, DeadGMPids}, From,
- State = #state{ gm = GM,
- q = Q = #amqqueue{ name = QName, pid = MPid },
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ State = #state{ gm = GM, q = Q,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ MPid = amqqueue:get_pid(Q),
Self = self(),
case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
{error, not_found} ->
@@ -269,7 +280,9 @@ handle_call({gm_deaths, DeadGMPids}, From,
%% death. That is all process_death does, create
%% some traffic.
ok = gm:broadcast(GM, process_death),
- noreply(State #state { q = Q #amqqueue { pid = Pid } })
+ Q1 = amqqueue:set_pid(Q, Pid),
+ State1 = State#state{q = Q1},
+ noreply(State1)
end
end;
@@ -285,20 +298,22 @@ handle_cast(go, {not_started, Q} = NotStarted) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({gm, Instruction}, State = #state{q = #amqqueue { name = QName }}) ->
+handle_cast({gm, Instruction}, State = #state{q = Q0}) when ?is_amqqueue(Q0) ->
+ QName = amqqueue:get_name(Q0),
case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{slave_pids = SPids}} ->
- case lists:member(self(), SPids) of
- true ->
- handle_process_result(process_instruction(Instruction, State));
- false ->
- %% Potentially a duplicated slave caused by a partial partition,
- %% will stop as a new slave could start unaware of our presence
- {stop, shutdown, State}
- end;
- {error, not_found} ->
- %% Would not expect this to happen after fixing #953
- {stop, shutdown, State}
+ {ok, Q1} when ?is_amqqueue(Q1) ->
+ SPids = amqqueue:get_slave_pids(Q1),
+ case lists:member(self(), SPids) of
+ true ->
+ handle_process_result(process_instruction(Instruction, State));
+ false ->
+ %% Potentially a duplicated slave caused by a partial partition,
+ %% will stop as a new slave could start unaware of our presence
+ {stop, shutdown, State}
+ end;
+ {error, not_found} ->
+ %% Would not expect this to happen after fixing #953
+ {stop, shutdown, State}
end;
handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
@@ -521,11 +536,16 @@ handle_terminate([_SPid], _Reason) ->
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(pid, _State) -> self();
-i(name, #state { q = #amqqueue { name = Name } }) -> Name;
-i(master_pid, #state { q = #amqqueue { pid = MPid } }) -> MPid;
-i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
-i(Item, _State) -> throw({bad_argument, Item}).
+i(pid, _State) ->
+ self();
+i(name, #state{q = Q}) when ?is_amqqueue(Q) ->
+ amqqueue:get_name(Q);
+i(master_pid, #state{q = Q}) when ?is_amqqueue(Q) ->
+ amqqueue:get_pid(Q);
+i(is_synchronised, #state{depth_delta = DD}) ->
+ DD =:= 0;
+i(Item, _State) ->
+ throw({bad_argument, Item}).
bq_init(BQ, Q, Recover) ->
Self = self(),
@@ -550,7 +570,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid,
message = #basic_message {
id = MsgId,
is_persistent = true } },
- MS, #state { q = #amqqueue { durable = true } }) ->
+ MS, #state{q = Q}) when ?amqqueue_is_durable(Q) ->
maps:put(MsgId, {published, ChPid, MsgSeqNo} , MS);
send_or_record_confirm(_Status, #delivery { sender = ChPid,
confirm = true,
@@ -595,7 +615,7 @@ handle_process_result({stop, State}) -> {stop, normal, State}.
-spec promote_me({pid(), term()}, #state{}) -> no_return().
-promote_me(From, #state { q = Q = #amqqueue { name = QName },
+promote_me(From, #state { q = Q0,
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -603,13 +623,14 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
sender_queues = SQ,
msg_id_ack = MA,
msg_id_status = MS,
- known_senders = KS }) ->
+ known_senders = KS}) when ?is_amqqueue(Q0) ->
+ QName = amqqueue:get_name(Q0),
rabbit_mirror_queue_misc:log_info(QName, "Promoting slave ~s to master~n",
[rabbit_misc:pid_to_string(self())]),
- Q1 = Q #amqqueue { pid = self() },
- {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q1, GM, rabbit_mirror_queue_master:sender_death_fun(),
- rabbit_mirror_queue_master:depth_fun()),
+ Q1 = amqqueue:set_pid(Q0, self()),
+ DeathFun = rabbit_mirror_queue_master:sender_death_fun(),
+ DepthFun = rabbit_mirror_queue_master:depth_fun(),
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q1, GM, DeathFun, DepthFun),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
@@ -1040,19 +1061,22 @@ update_ram_duration(BQ, BQS) ->
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQ:set_ram_duration_target(DesiredDuration, BQS1).
-record_synchronised(#amqqueue { name = QName }) ->
+record_synchronised(Q0) when ?is_amqqueue(Q0) ->
+ QName = amqqueue:get_name(Q0),
Self = self(),
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q1 = #amqqueue { sync_slave_pids = SSPids }] ->
- Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
- rabbit_mirror_queue_misc:store_updated_slaves(Q2),
- {ok, Q2}
- end
- end) of
- ok -> ok;
- {ok, Q} -> rabbit_mirror_queue_misc:maybe_drop_master_after_sync(Q)
+ F = fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1] when ?is_amqqueue(Q1) ->
+ SSPids = amqqueue:get_sync_slave_pids(Q1),
+ SSPids1 = [Self | SSPids],
+ Q2 = amqqueue:set_sync_slave_pids(Q1, SSPids1),
+ rabbit_mirror_queue_misc:store_updated_slaves(Q2),
+ {ok, Q2}
+ end
+ end,
+ case rabbit_misc:execute_mnesia_transaction(F) of
+ ok -> ok;
+ {ok, Q2} -> rabbit_mirror_queue_misc:maybe_drop_master_after_sync(Q2)
end.
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 7f07b205ff..ed6ffc9ecd 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -36,7 +36,8 @@
-behaviour(rabbit_runtime_parameter).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-import(rabbit_misc, [pget/2, pget/3]).
@@ -59,16 +60,22 @@ register() ->
rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE),
rabbit_registry:register(runtime_parameter, <<"operator_policy">>, ?MODULE).
-name(#amqqueue{policy = Policy}) -> name0(Policy);
+name(Q) when ?is_amqqueue(Q) ->
+ Policy = amqqueue:get_policy(Q),
+ name0(Policy);
name(#exchange{policy = Policy}) -> name0(Policy).
-name_op(#amqqueue{operator_policy = Policy}) -> name0(Policy);
+name_op(Q) when ?is_amqqueue(Q) ->
+ OpPolicy = amqqueue:get_operator_policy(Q),
+ name0(OpPolicy);
name_op(#exchange{operator_policy = Policy}) -> name0(Policy).
name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
-effective_definition(#amqqueue{policy = Policy, operator_policy = OpPolicy}) ->
+effective_definition(Q) when ?is_amqqueue(Q) ->
+ Policy = amqqueue:get_policy(Q),
+ OpPolicy = amqqueue:get_operator_policy(Q),
effective_definition0(Policy, OpPolicy);
effective_definition(#exchange{policy = Policy, operator_policy = OpPolicy}) ->
effective_definition0(Policy, OpPolicy).
@@ -90,10 +97,15 @@ effective_definition0(Policy, OpPolicy) ->
end,
lists:umerge(Keys, OpKeys)).
-set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = match(Name),
- operator_policy = match_op(Name)};
-set(X = #exchange{name = Name}) -> X#exchange{policy = match(Name),
- operator_policy = match_op(Name)}.
+set(Q0) when ?is_amqqueue(Q0) ->
+ Name = amqqueue:get_name(Q0),
+ Policy = match(Name),
+ OpPolicy = match_op(Name),
+ Q1 = amqqueue:set_policy(Q0, Policy),
+ Q2 = amqqueue:set_operator_policy(Q1, OpPolicy),
+ Q2;
+set(X = #exchange{name = Name}) ->
+ X#exchange{policy = match(Name), operator_policy = match_op(Name)}.
match(Name = #resource{virtual_host = VHost}) ->
match(Name, list(VHost)).
@@ -101,7 +113,9 @@ match(Name = #resource{virtual_host = VHost}) ->
match_op(Name = #resource{virtual_host = VHost}) ->
match(Name, list_op(VHost)).
-get(Name, #amqqueue{policy = Policy, operator_policy = OpPolicy}) ->
+get(Name, Q) when ?is_amqqueue(Q) ->
+ Policy = amqqueue:get_policy(Q),
+ OpPolicy = amqqueue:get_operator_policy(Q),
get0(Name, Policy, OpPolicy);
get(Name, #exchange{policy = Policy, operator_policy = OpPolicy}) ->
get0(Name, Policy, OpPolicy);
@@ -170,7 +184,7 @@ recover() ->
%% variants.
recover0() ->
Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}),
- Qs = mnesia:dirty_match_object(rabbit_durable_queue, #amqqueue{_ = '_'}),
+ Qs = mnesia:dirty_match_object(rabbit_durable_queue, amqqueue:pattern_match_all()),
Policies = list(),
OpPolicies = list_op(),
[rabbit_misc:execute_mnesia_transaction(
@@ -182,15 +196,18 @@ recover0() ->
operator_policy = match(Name, OpPolicies)}),
write)
end) || X = #exchange{name = Name} <- Xs],
- [rabbit_misc:execute_mnesia_transaction(
- fun () ->
- mnesia:write(
- rabbit_durable_queue,
- rabbit_queue_decorator:set(
- Q#amqqueue{policy = match(Name, Policies),
- operator_policy = match(Name, OpPolicies)}),
- write)
- end) || Q = #amqqueue{name = Name} <- Qs],
+ [begin
+ QName = amqqueue:get_name(Q0),
+ Policy1 = match(QName, Policies),
+ Q1 = amqqueue:set_policy(Q0, Policy1),
+ OpPolicy1 = match(QName, OpPolicies),
+ Q2 = amqqueue:set_operator_policy(Q1, OpPolicy1),
+ Q3 = rabbit_queue_decorator:set(Q2),
+ F = fun () ->
+ mnesia:write(rabbit_durable_queue, Q3, write)
+ end,
+ rabbit_misc:execute_mnesia_transaction(F)
+ end || Q0 <- Qs],
ok.
invalid_file() ->
@@ -395,25 +412,26 @@ update_exchange(X = #exchange{name = XName,
end
end.
-update_queue(Q = #amqqueue{name = QName,
- policy = OldPolicy,
- operator_policy = OldOpPolicy},
- Policies, OpPolicies) ->
+update_queue(Q0, Policies, OpPolicies) when ?is_amqqueue(Q0) ->
+ QName = amqqueue:get_name(Q0),
+ OldPolicy = amqqueue:get_policy(Q0),
+ OldOpPolicy = amqqueue:get_operator_policy(Q0),
case {match(QName, Policies), match(QName, OpPolicies)} of
{OldPolicy, OldOpPolicy} -> no_change;
{NewPolicy, NewOpPolicy} ->
- NewQueue = rabbit_amqqueue:update(
- QName,
- fun(Q1) ->
- rabbit_queue_decorator:set(
- Q1#amqqueue{policy = NewPolicy,
- operator_policy = NewOpPolicy,
- policy_version =
- Q1#amqqueue.policy_version + 1 })
- end),
+ F = fun (QFun0) ->
+ QFun1 = amqqueue:set_policy(QFun0, NewPolicy),
+ QFun2 = amqqueue:set_operator_policy(QFun1, NewOpPolicy),
+ NewPolicyVersion = amqqueue:get_policy_version(QFun2) + 1,
+ QFun3 = amqqueue:set_policy_version(QFun2, NewPolicyVersion),
+ rabbit_queue_decorator:set(QFun3)
+ end,
+ NewQueue = rabbit_amqqueue:update(QName, F),
case NewQueue of
- #amqqueue{} = Q1 -> {Q, Q1};
- not_found -> {Q, Q }
+ Q1 when ?is_amqqueue(Q1) ->
+ {Q0, Q1};
+ not_found ->
+ {Q0, Q0}
end
end.
@@ -421,7 +439,7 @@ notify(no_change)->
ok;
notify({X1 = #exchange{}, X2 = #exchange{}}) ->
rabbit_exchange:policy_changed(X1, X2);
-notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) ->
+notify({Q1, Q2}) when ?is_amqqueue(Q1), ?is_amqqueue(Q2) ->
rabbit_amqqueue:policy_changed(Q1, Q2).
match(Name, Policies) ->
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index 63dbec545b..b750482ba1 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -29,7 +29,8 @@
-behaviour(gen_server2).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
%%----------------------------------------------------------------------------
@@ -37,7 +38,7 @@
-type start_mode() :: 'declare' | 'recovery' | 'slave'.
--spec start_link(rabbit_types:amqqueue(), start_mode(), pid())
+-spec start_link(amqqueue:amqqueue(), start_mode(), pid())
-> rabbit_types:ok_pid_or_error().
%%----------------------------------------------------------------------------
@@ -57,20 +58,22 @@ init({Q, StartMode, Marker}) ->
init(Q, master) -> rabbit_amqqueue_process:init(Q);
init(Q, slave) -> rabbit_mirror_queue_slave:init(Q);
-init(#amqqueue{name = QueueName}, restart) ->
- {ok, Q = #amqqueue{pid = QPid,
- slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName),
+init(Q0, restart) when ?is_amqqueue(Q0) ->
+ QueueName = amqqueue:get_name(Q0),
+ {ok, Q1} = rabbit_amqqueue:lookup(QueueName),
+ QPid = amqqueue:get_pid(Q1),
+ SPids = amqqueue:get_slave_pids(Q1),
LocalOrMasterDown = node(QPid) =:= node()
orelse not rabbit_mnesia:on_running_node(QPid),
Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)],
case rabbit_mnesia:is_process_alive(QPid) of
true -> false = LocalOrMasterDown, %% assertion
rabbit_mirror_queue_slave:go(self(), async),
- rabbit_mirror_queue_slave:init(Q); %% [1]
+ rabbit_mirror_queue_slave:init(Q1); %% [1]
false -> case LocalOrMasterDown andalso Slaves =:= [] of
- true -> crash_restart(Q); %% [2]
+ true -> crash_restart(Q1); %% [2]
false -> timer:sleep(25),
- init(Q, restart) %% [3]
+ init(Q1, restart) %% [3]
end
end.
%% [1] There is a master on another node. Regardless of whether we
@@ -83,10 +86,12 @@ init(#amqqueue{name = QueueName}, restart) ->
%% not a stable situation. Sleep and wait for somebody else to make a
%% move.
-crash_restart(Q = #amqqueue{name = QueueName}) ->
+crash_restart(Q0) when ?is_amqqueue(Q0) ->
+ QueueName = amqqueue:get_name(Q0),
rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]),
gen_server2:cast(self(), init),
- rabbit_amqqueue_process:init(Q#amqqueue{pid = self()}).
+ Q1 = amqqueue:set_pid(Q0, self()),
+ rabbit_amqqueue_process:init(Q1).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index b41511f874..621f42dafb 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -16,8 +16,10 @@
-module(rabbit_priority_queue).
--include_lib("rabbit.hrl").
--include_lib("rabbit_framing.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+-include("amqqueue.hrl").
+
-behaviour(rabbit_backing_queue).
%% enabled unconditionally. Disabling priority queueing after
@@ -102,10 +104,14 @@ stop(VHost) ->
%%----------------------------------------------------------------------------
-mutate_name(P, Q = #amqqueue{name = QName = #resource{name = QNameBin}}) ->
- Q#amqqueue{name = QName#resource{name = mutate_name_bin(P, QNameBin)}}.
+mutate_name(P, Q) when ?is_amqqueue(Q) ->
+ Res0 = #resource{name = QNameBin0} = amqqueue:get_name(Q),
+ QNameBin1 = mutate_name_bin(P, QNameBin0),
+ Res1 = Res0#resource{name = QNameBin1},
+ amqqueue:set_name(Q, Res1).
-mutate_name_bin(P, NameBin) -> <<NameBin/binary, 0, P:8>>.
+mutate_name_bin(P, NameBin) ->
+ <<NameBin/binary, 0, P:8>>.
expand_queues(QNames) ->
lists:unzip(
@@ -125,7 +131,8 @@ collapse_recovery(QNames, DupNames, Recovery) ->
end, dict:new(), lists:zip(DupNames, Recovery)),
[dict:fetch(Name, NameToTerms) || Name <- QNames].
-priorities(#amqqueue{arguments = Args}) ->
+priorities(Q) when ?is_amqqueue(Q) ->
+ Args = amqqueue:get_arguments(Q),
Ints = [long, short, signedint, byte, unsignedbyte, unsignedshort, unsignedint],
case rabbit_misc:table_lookup(Args, <<"x-max-priority">>) of
{Type, RequestedMax} ->
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 0c234e1072..6f40637b93 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -16,7 +16,8 @@
-module(rabbit_queue_decorator).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-export([select/1, set/1, register/2, unregister/1]).
@@ -26,18 +27,18 @@
%%----------------------------------------------------------------------------
--callback startup(rabbit_types:amqqueue()) -> 'ok'.
+-callback startup(amqqueue:amqqueue()) -> 'ok'.
--callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
+-callback shutdown(amqqueue:amqqueue()) -> 'ok'.
--callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) ->
+-callback policy_changed(amqqueue:amqqueue(), amqqueue:amqqueue()) ->
'ok'.
--callback active_for(rabbit_types:amqqueue()) -> boolean().
+-callback active_for(amqqueue:amqqueue()) -> boolean().
%% called with Queue, MaxActivePriority, IsEmpty
-callback consumer_state_changed(
- rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'.
+ amqqueue:amqqueue(), integer(), boolean()) -> 'ok'.
%%----------------------------------------------------------------------------
@@ -47,7 +48,9 @@ removed_from_rabbit_registry(_Type) -> ok.
select(Modules) ->
[M || M <- Modules, code:which(M) =/= non_existing].
-set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.
+set(Q) when ?is_amqqueue(Q) ->
+ Decorators = [D || D <- list(), D:active_for(Q)],
+ amqqueue:set_decorators(Q, Decorators).
list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].
@@ -61,13 +64,18 @@ unregister(TypeName) ->
[maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
ok.
-maybe_recover(Q = #amqqueue{name = Name,
- decorators = Decs}) ->
- #amqqueue{decorators = Decs1} = set(Q),
- Old = lists:sort(select(Decs)),
+maybe_recover(Q0) when ?is_amqqueue(Q0) ->
+ Name = amqqueue:get_name(Q0),
+ Decs0 = amqqueue:get_decorators(Q0),
+ Q1 = set(Q0),
+ Decs1 = amqqueue:get_decorators(Q1),
+ Old = lists:sort(select(Decs0)),
New = lists:sort(select(Decs1)),
case New of
- Old -> ok;
- _ -> [M:startup(Q) || M <- New -- Old],
- rabbit_amqqueue:update_decorators(Name)
+ Old ->
+ ok;
+ _ ->
+ %% TODO LRB JSP 160169569 should startup be passed Q1 here?
+ [M:startup(Q0) || M <- New -- Old],
+ rabbit_amqqueue:update_decorators(Name)
end.
diff --git a/src/rabbit_queue_location_client_local.erl b/src/rabbit_queue_location_client_local.erl
index aa07637ab1..4c51bf52d2 100644
--- a/src/rabbit_queue_location_client_local.erl
+++ b/src/rabbit_queue_location_client_local.erl
@@ -17,7 +17,8 @@
-module(rabbit_queue_location_client_local).
-behaviour(rabbit_queue_master_locator).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-export([description/0, queue_master_location/1]).
@@ -37,4 +38,5 @@
description() ->
[{description, <<"Locate queue master node as the client local node">>}].
-queue_master_location(#amqqueue{}) -> {ok, node()}.
+queue_master_location(Q) when ?is_amqqueue(Q) ->
+ {ok, node()}.
diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl
index a3a3021229..9462e15db9 100644
--- a/src/rabbit_queue_location_min_masters.erl
+++ b/src/rabbit_queue_location_min_masters.erl
@@ -17,7 +17,8 @@
-module(rabbit_queue_location_min_masters).
-behaviour(rabbit_queue_master_locator).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-export([description/0, queue_master_location/1]).
@@ -37,7 +38,7 @@ description() ->
[{description,
<<"Locate queue master node from cluster node with least bound queues">>}].
-queue_master_location(#amqqueue{} = Q) ->
+queue_master_location(Q) when ?is_amqqueue(Q) ->
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
QueueNames = rabbit_amqqueue:list_names(),
MastersPerNode = lists:foldl(
diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl
index a92c178dfe..e166fa350d 100644
--- a/src/rabbit_queue_location_random.erl
+++ b/src/rabbit_queue_location_random.erl
@@ -17,7 +17,8 @@
-module(rabbit_queue_location_random).
-behaviour(rabbit_queue_master_locator).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-export([description/0, queue_master_location/1]).
@@ -37,7 +38,7 @@ description() ->
[{description,
<<"Locate queue master node from cluster in a random manner">>}].
-queue_master_location(#amqqueue{} = Q) ->
+queue_master_location(Q) when ?is_amqqueue(Q) ->
Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)),
MasterNode = lists:nth(RandomPos + 1, Cluster),
diff --git a/src/rabbit_queue_location_validator.erl b/src/rabbit_queue_location_validator.erl
index 68a4bc3e06..2c7bb41b7e 100644
--- a/src/rabbit_queue_location_validator.erl
+++ b/src/rabbit_queue_location_validator.erl
@@ -17,7 +17,8 @@
-module(rabbit_queue_location_validator).
-behaviour(rabbit_policy_validator).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-export([validate_policy/1, validate_strategy/1]).
@@ -49,7 +50,7 @@ policy(Policy, Q) ->
P -> P
end.
-module(#amqqueue{} = Q) ->
+module(Q) when ?is_amqqueue(Q) ->
case policy(<<"queue-master-locator">>, Q) of
undefined -> no_location_strategy;
Mode -> module(Mode)
diff --git a/src/rabbit_queue_master_location_misc.erl b/src/rabbit_queue_master_location_misc.erl
index 6df7e5db6f..b31fd50192 100644
--- a/src/rabbit_queue_master_location_misc.erl
+++ b/src/rabbit_queue_master_location_misc.erl
@@ -16,7 +16,8 @@
-module(rabbit_queue_master_location_misc).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
-export([lookup_master/2,
lookup_queue/2,
@@ -28,22 +29,25 @@
lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
is_binary(VHostPath) ->
- Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin),
- case rabbit_amqqueue:lookup(Queue) of
- {ok, #amqqueue{pid = Pid}} when is_pid(Pid) ->
+ QueueR = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ case rabbit_amqqueue:lookup(QueueR) of
+ {ok, Queue} when ?amqqueue_has_valid_pid(Queue) ->
+ Pid = amqqueue:get_pid(Queue),
{ok, node(Pid)};
Error -> Error
end.
lookup_queue(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
is_binary(VHostPath) ->
- Queue = rabbit_misc:r(VHostPath, queue, QueueNameBin),
- case rabbit_amqqueue:lookup(Queue) of
- Reply = {ok, #amqqueue{}} -> Reply;
- Error -> Error
+ QueueR = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ case rabbit_amqqueue:lookup(QueueR) of
+ Reply = {ok, Queue} when ?is_amqqueue(Queue) ->
+ Reply;
+ Error ->
+ Error
end.
-get_location(Queue=#amqqueue{})->
+get_location(Queue) when ?is_amqqueue(Queue) ->
Reply1 = case get_location_mod_by_args(Queue) of
_Err1 = {error, _} ->
case get_location_mod_by_policy(Queue) of
@@ -62,7 +66,8 @@ get_location(Queue=#amqqueue{})->
Error -> Error
end.
-get_location_mod_by_args(#amqqueue{arguments=Args}) ->
+get_location_mod_by_args(Queue) when ?is_amqqueue(Queue) ->
+ Args = amqqueue:get_arguments(Queue),
case rabbit_misc:table_lookup(Args, <<"x-queue-master-locator">>) of
{_Type, Strategy} ->
case rabbit_queue_location_validator:validate_strategy(Strategy) of
@@ -72,7 +77,7 @@ get_location_mod_by_args(#amqqueue{arguments=Args}) ->
_ -> {error, "x-queue-master-locator undefined"}
end.
-get_location_mod_by_policy(Queue=#amqqueue{}) ->
+get_location_mod_by_policy(Queue) when ?is_amqqueue(Queue) ->
case rabbit_policy:get(<<"queue-master-locator">> , Queue) of
undefined -> {error, "queue-master-locator policy undefined"};
Strategy ->
@@ -82,7 +87,7 @@ get_location_mod_by_policy(Queue=#amqqueue{}) ->
end
end.
-get_location_mod_by_config(#amqqueue{}) ->
+get_location_mod_by_config(Queue) when ?is_amqqueue(Queue) ->
case application:get_env(rabbit, queue_master_locator) of
{ok, Strategy} ->
case rabbit_queue_location_validator:validate_strategy(Strategy) of
@@ -92,7 +97,7 @@ get_location_mod_by_config(#amqqueue{}) ->
_ -> {error, "queue_master_locator undefined"}
end.
-all_nodes(Queue = #amqqueue{}) ->
+all_nodes(Queue) when ?is_amqqueue(Queue) ->
handle_is_mirrored_ha_nodes(rabbit_mirror_queue_misc:is_mirrored_ha_nodes(Queue), Queue).
handle_is_mirrored_ha_nodes(false, _Queue) ->
diff --git a/src/rabbit_queue_master_locator.erl b/src/rabbit_queue_master_locator.erl
new file mode 100644
index 0000000000..eba1e2aefa
--- /dev/null
+++ b/src/rabbit_queue_master_locator.erl
@@ -0,0 +1,28 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_master_locator).
+
+-behaviour(rabbit_registry_class).
+
+-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]).
+
+-callback description() -> [proplists:property()].
+-callback queue_master_location(amqqueue:amqqueue()) ->
+ {'ok', node()} | {'error', term()}.
+
+added_to_rabbit_registry(_Type, _ModuleName) -> ok.
+removed_from_rabbit_registry(_Type) -> ok.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index b4edca7937..45a1c12cac 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -41,6 +41,7 @@
%%-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
+-include("amqqueue.hrl").
-type ra_server_id() :: {Name :: atom(), Node :: node()}.
-type msg_id() :: non_neg_integer().
@@ -49,8 +50,8 @@
-spec handle_event({'ra_event', ra_server_id(), any()}, rabbit_fifo_client:state()) ->
{'internal', Correlators :: [term()], rabbit_fifo_client:state()} |
{rabbit_fifo:client_msg(), rabbit_fifo_client:state()}.
--spec recover([rabbit_types:amqqueue()]) -> [rabbit_types:amqqueue() |
- {'absent', rabbit_types:amqqueue(), atom()}].
+-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() |
+ {'absent', amqqueue:amqqueue(), atom()}].
-spec stop(rabbit_types:vhost()) -> 'ok'.
-spec ack(rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
@@ -59,10 +60,10 @@
-spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'.
--spec info(rabbit_types:amqqueue()) -> rabbit_types:infos().
--spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos().
+-spec info(amqqueue:amqqueue()) -> rabbit_types:infos().
+-spec info(amqqueue:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos().
-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
--spec stat(rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}.
+-spec stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}.
-spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'.
-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}.
@@ -93,8 +94,9 @@ init_state({Name, _}, QName = #resource{}) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
%% This lookup could potentially return an {error, not_found}, but we do not
%% know what to do if the queue has `disappeared`. Let it crash.
- {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes}} =
- rabbit_amqqueue:lookup(QName),
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ Leader = amqqueue:get_pid(Q),
+ Nodes = amqqueue:get_quorum_nodes(Q),
%% Ensure the leader is listed first
Servers0 = [{Name, N} || N <- Nodes],
Servers = [Leader | lists:delete(Leader, Servers0)],
@@ -105,14 +107,15 @@ init_state({Name, _}, QName = #resource{}) ->
handle_event({ra_event, From, Evt}, QState) ->
rabbit_fifo_client:handle_ra_event(From, Evt, QState).
--spec declare(rabbit_types:amqqueue()) ->
- {'new', rabbit_types:amqqueue()} |
- {existing, rabbit_types:amqqueue()}.
-declare(#amqqueue{name = QName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Arguments,
- options = Opts} = Q) ->
+-spec declare(amqqueue:amqqueue()) ->
+ {'new', amqqueue:amqqueue()} |
+ {existing, amqqueue:amqqueue()}.
+declare(Q) when ?amqqueue_is_quorum(Q) ->
+ QName = amqqueue:get_name(Q),
+ Durable = amqqueue:is_durable(Q),
+ AutoDelete = amqqueue:is_auto_delete(Q),
+ Arguments = amqqueue:get_arguments(Q),
+ Opts = amqqueue:get_options(Q),
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
check_invalid_arguments(QName, Arguments),
check_auto_delete(Q),
@@ -122,9 +125,9 @@ declare(#amqqueue{name = QName,
RaName = qname_to_rname(QName),
Id = {RaName, node()},
Nodes = select_quorum_nodes(QuorumSize, rabbit_mnesia:cluster_nodes(all)),
- NewQ0 = Q#amqqueue{pid = Id,
- quorum_nodes = Nodes},
- case rabbit_amqqueue:internal_declare(NewQ0, false) of
+ NewQ0 = amqqueue:set_pid(Q, Id),
+ NewQ1 = amqqueue:set_quorum_nodes(NewQ0, Nodes),
+ case rabbit_amqqueue:internal_declare(NewQ1, false) of
{created, NewQ} ->
RaMachine = ra_machine(NewQ),
case ra:start_cluster(RaName, RaMachine,
@@ -150,8 +153,9 @@ declare(#amqqueue{name = QName,
ra_machine(Q) ->
{module, rabbit_fifo, ra_machine_config(Q)}.
-ra_machine_config(Q = #amqqueue{name = QName,
- pid = {Name, _}}) ->
+ra_machine_config(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ {Name, _} = amqqueue:get_pid(Q),
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
@@ -163,7 +167,8 @@ ra_machine_config(Q = #amqqueue{name = QName,
max_bytes => MaxBytes,
single_active_consumer_on => single_active_consumer_on(Q)}.
-single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
+single_active_consumer_on(Q) ->
+ QArguments = amqqueue:get_arguments(Q),
case rabbit_misc:table_lookup(QArguments, <<"x-single-active-consumer">>) of
{bool, true} -> true;
_ -> false
@@ -200,9 +205,10 @@ local_or_remote_handler(ChPid, Module, Function, Args) ->
end.
become_leader(QName, Name) ->
- Fun = fun(Q1) ->
- Q1#amqqueue{pid = {Name, node()},
- state = live}
+ Fun = fun (Q1) ->
+ amqqueue:set_state(
+ amqqueue:set_pid(Q1, {Name, node()}),
+ live)
end,
%% as this function is called synchronously when a ra node becomes leader
%% we need to ensure there is no chance of blocking as else the ra node
@@ -213,7 +219,8 @@ become_leader(QName, Name) ->
rabbit_amqqueue:update(QName, Fun)
end),
case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{quorum_nodes = Nodes}} ->
+ {ok, Q0} when ?is_amqqueue(Q0) ->
+ Nodes = amqqueue:get_quorum_nodes(Q0),
[rpc:call(Node, ?MODULE, rpc_delete_metrics,
[QName], ?TICK_TIME)
|| Node <- Nodes, Node =/= node()];
@@ -263,9 +270,10 @@ reductions(Name) ->
recover(Queues) ->
[begin
+ {Name, _} = amqqueue:get_pid(Q0),
+ Nodes = amqqueue:get_quorum_nodes(Q0),
case ra:restart_server({Name, node()}) of
ok ->
-
% queue was restarted, good
ok;
{error, Err}
@@ -298,20 +306,24 @@ recover(Queues) ->
%% So many code paths are dependent on this.
{ok, Q} = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Q0),
Q
- end || #amqqueue{pid = {Name, _},
- quorum_nodes = Nodes} = Q0 <- Queues].
+ end || Q0 <- Queues].
stop(VHost) ->
- _ = [ra:stop_server(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)],
+ _ = [begin
+ Pid = amqqueue:get_pid(Q),
+ ra:stop_server(Pid)
+ end || Q <- find_quorum_queues(VHost)],
ok.
--spec delete(#amqqueue{},
+-spec delete(amqqueue:amqqueue(),
boolean(), boolean(),
rabbit_types:username()) ->
{ok, QLen :: non_neg_integer()}.
-delete(#amqqueue{type = quorum, pid = {Name, _},
- name = QName, quorum_nodes = QNodes} = Q,
- _IfUnused, _IfEmpty, ActingUser) ->
+delete(Q,
+ _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
+ {Name, _} = amqqueue:get_pid(Q),
+ QName = amqqueue:get_name(Q),
+ QNodes = amqqueue:get_quorum_nodes(Q),
%% TODO Quorum queue needs to support consumer tracking for IfUnused
Timeout = ?DELETE_TIMEOUT,
{ok, ReadyMsgs, _} = stat(Q),
@@ -388,12 +400,14 @@ reject(false, CTag, MsgIds, QState) ->
credit(CTag, Credit, Drain, QState) ->
rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState).
--spec basic_get(#amqqueue{}, NoAck :: boolean(), rabbit_types:ctag(),
+-spec basic_get(amqqueue:amqqueue(), NoAck :: boolean(), rabbit_types:ctag(),
rabbit_fifo_client:state()) ->
{'ok', 'empty', rabbit_fifo_client:state()} |
{'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
-basic_get(#amqqueue{name = QName, pid = Id, type = quorum}, NoAck,
- CTag0, QState0) ->
+
+basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) ->
+ QName = amqqueue:get_name(Q),
+ Id = amqqueue:get_pid(Q),
CTag = quorum_ctag(CTag0),
Settlement = case NoAck of
true ->
@@ -413,17 +427,20 @@ basic_get(#amqqueue{name = QName, pid = Id, type = quorum}, NoAck,
{error, timeout}
end.
--spec basic_consume(rabbit_types:amqqueue(), NoAck :: boolean(), ChPid :: pid(),
+-spec basic_consume(amqqueue:amqqueue(), NoAck :: boolean(), ChPid :: pid(),
ConsumerPrefetchCount :: non_neg_integer(),
rabbit_types:ctag(), ExclusiveConsume :: boolean(),
Args :: rabbit_framing:amqp_table(), ActingUser :: binary(),
any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-basic_consume(#amqqueue{name = QName, pid = QPid, type = quorum} = Q, NoAck, ChPid,
+
+basic_consume(Q, NoAck, ChPid,
ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args,
- ActingUser, OkMsg, QState0) ->
+ ActingUser, OkMsg, QState0) when ?amqqueue_is_quorum(Q) ->
%% TODO: validate consumer arguments
%% currently quorum queues do not support any arguments
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
maybe_send_reply(ChPid, OkMsg),
ConsumerTag = quorum_ctag(ConsumerTag0),
%% A prefetch count of 0 means no limitation,
@@ -514,9 +531,12 @@ requeue(ConsumerTag, MsgIds, QState) ->
rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState).
cleanup_data_dir() ->
- Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}
- <- rabbit_amqqueue:list_by_type(quorum),
- lists:member(node(), Nodes)],
+ Names = [begin
+ {Name, _} = amqqueue:get_pid(Q),
+ Name
+ end
+ || Q <- rabbit_amqqueue:list_by_type(quorum),
+ lists:member(node(), amqqueue:get_quorum_nodes(Q))],
Registered = ra_directory:list_registered(),
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
not lists:member(Name, Names)],
@@ -552,9 +572,11 @@ status(Vhost, QueueName) ->
QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
RName = qname_to_rname(QName),
case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{type = classic}} ->
+ {ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
- {ok, #amqqueue{pid = {_, Leader}, quorum_nodes = Nodes}} ->
+ {ok, Q} when ?amqqueue_is_quorum(Q) ->
+ {_, Leader} = amqqueue:get_pid(Q),
+ Nodes = amqqueue:get_quorum_nodes(Q),
Info = [{leader, Leader}, {members, Nodes}],
case ets:lookup(ra_state, RName) of
[{_, State}] ->
@@ -569,9 +591,10 @@ status(Vhost, QueueName) ->
add_member(VHost, Name, Node) ->
QName = #resource{virtual_host = VHost, name = Name, kind = queue},
case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{type = classic}} ->
+ {ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
- {ok, #amqqueue{quorum_nodes = QNodes} = Q} ->
+ {ok, Q} when ?amqqueue_is_quorum(Q) ->
+ QNodes = amqqueue:get_quorum_nodes(Q),
case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of
false ->
{error, node_not_running};
@@ -587,8 +610,10 @@ add_member(VHost, Name, Node) ->
E
end.
-add_member(#amqqueue{pid = {RaName, _} = ServerRef, name = QName,
- quorum_nodes = QNodes} = Q, Node) ->
+add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
+ {RaName, _} = ServerRef = amqqueue:get_pid(Q),
+ QName = amqqueue:get_name(Q),
+ QNodes = amqqueue:get_quorum_nodes(Q),
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
ServerId = {RaName, Node},
case ra:start_server(RaName, ServerId, ra_machine(Q),
@@ -597,9 +622,10 @@ add_member(#amqqueue{pid = {RaName, _} = ServerRef, name = QName,
case ra:add_member(ServerRef, ServerId) of
{ok, _, Leader} ->
Fun = fun(Q1) ->
- Q1#amqqueue{quorum_nodes =
- [Node | Q1#amqqueue.quorum_nodes],
- pid = Leader}
+ Q2 = amqqueue:set_quorum_nodes(
+ Q1,
+ [Node | amqqueue:get_quorum_nodes(Q1)]),
+ amqqueue:set_pid(Q2, Leader)
end,
rabbit_misc:execute_mnesia_transaction(
fun() -> rabbit_amqqueue:update(QName, Fun) end),
@@ -615,9 +641,10 @@ add_member(#amqqueue{pid = {RaName, _} = ServerRef, name = QName,
delete_member(VHost, Name, Node) ->
QName = #resource{virtual_host = VHost, name = Name, kind = queue},
case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{type = classic}} ->
+ {ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
- {ok, #amqqueue{quorum_nodes = QNodes} = Q} ->
+ {ok, Q} when ?amqqueue_is_quorum(Q) ->
+ QNodes = amqqueue:get_quorum_nodes(Q),
case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of
false ->
{error, node_not_running};
@@ -633,13 +660,16 @@ delete_member(VHost, Name, Node) ->
E
end.
-delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) ->
+delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
+ QName = amqqueue:get_name(Q),
+ {RaName, _} = amqqueue:get_pid(Q),
ServerId = {RaName, Node},
case ra:leave_and_delete_server(ServerId) of
ok ->
Fun = fun(Q1) ->
- Q1#amqqueue{quorum_nodes =
- lists:delete(Node, Q1#amqqueue.quorum_nodes)}
+ amqqueue:set_quorum_nodes(
+ Q1,
+ lists:delete(Node, amqqueue:get_quorum_nodes(Q1)))
end,
rabbit_misc:execute_mnesia_transaction(
fun() -> rabbit_amqqueue:update(QName, Fun) end),
@@ -654,16 +684,18 @@ dlx_mfa(Q) ->
fun res_arg/2, Q), Q),
DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>,
fun res_arg/2, Q),
- {?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}.
+ {?MODULE, dead_letter_publish, [DLX, DLXRKey, amqqueue:get_name(Q)]}.
init_dlx(undefined, _Q) ->
undefined;
-init_dlx(DLX, #amqqueue{name = QName}) ->
+init_dlx(DLX, Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
rabbit_misc:r(QName, exchange, DLX).
res_arg(_PolVal, ArgVal) -> ArgVal.
-args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) ->
+args_policy_lookup(Name, Resolve, Q) when ?is_amqqueue(Q) ->
+ Args = amqqueue:get_arguments(Q),
AName = <<"x-", Name/binary>>,
case {rabbit_policy:get(Name, Q), rabbit_misc:table_lookup(Args, AName)} of
{undefined, undefined} -> undefined;
@@ -693,29 +725,32 @@ find_quorum_queues(VHost) ->
Node = node(),
mnesia:async_dirty(
fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{vhost = VH,
- pid = Pid,
- type = quorum}
- <- mnesia:table(rabbit_durable_queue),
- VH =:= VHost,
- qnode(Pid) == Node]))
+ qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
+ ?amqqueue_is_quorum(Q),
+ amqqueue:get_vhost(Q) =:= VHost,
+ amqqueue:qnode(Q) == Node]))
end).
-i(name, #amqqueue{name = Name}) -> Name;
-i(durable, #amqqueue{durable = Dur}) -> Dur;
-i(auto_delete, #amqqueue{auto_delete = AD}) -> AD;
-i(arguments, #amqqueue{arguments = Args}) -> Args;
-i(pid, #amqqueue{pid = {Name, _}}) -> whereis(Name);
-i(messages, #amqqueue{pid = {Name, _}}) ->
+i(name, Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q);
+i(durable, Q) when ?is_amqqueue(Q) -> amqqueue:is_durable(Q);
+i(auto_delete, Q) when ?is_amqqueue(Q) -> amqqueue:is_auto_delete(Q);
+i(arguments, Q) when ?is_amqqueue(Q) -> amqqueue:get_arguments(Q);
+i(pid, Q) when ?is_amqqueue(Q) ->
+ {Name, _} = amqqueue:get_pid(Q),
+ whereis(Name);
+i(messages, Q) when ?is_amqqueue(Q) ->
+ {Name, _} = amqqueue:get_pid(Q),
quorum_messages(Name);
-i(messages_ready, #amqqueue{name = QName}) ->
+i(messages_ready, Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
case ets:lookup(queue_coarse_metrics, QName) of
[{_, MR, _, _, _}] ->
MR;
[] ->
0
end;
-i(messages_unacknowledged, #amqqueue{name = QName}) ->
+i(messages_unacknowledged, Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
case ets:lookup(queue_coarse_metrics, QName) of
[{_, _, MU, _, _}] ->
MU;
@@ -737,14 +772,16 @@ i(effective_policy_definition, Q) ->
undefined -> [];
Def -> Def
end;
-i(consumers, #amqqueue{name = QName}) ->
+i(consumers, Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
case ets:lookup(queue_metrics, QName) of
[{_, M, _}] ->
proplists:get_value(consumers, M, 0);
[] ->
0
end;
-i(memory, #amqqueue{pid = {Name, _}}) ->
+i(memory, Q) when ?is_amqqueue(Q) ->
+ {Name, _} = amqqueue:get_pid(Q),
try
{memory, M} = process_info(whereis(Name), memory),
M
@@ -752,33 +789,38 @@ i(memory, #amqqueue{pid = {Name, _}}) ->
error:badarg ->
0
end;
-i(state, #amqqueue{pid = {Name, Node}}) ->
+i(state, Q) when ?is_amqqueue(Q) ->
+ {Name, Node} = amqqueue:get_pid(Q),
%% Check against the leader or last known leader
case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of
{badrpc, _} -> down;
State -> State
end;
-i(local_state, #amqqueue{pid = {Name, _}}) ->
+i(local_state, Q) when ?is_amqqueue(Q) ->
+ {Name, _} = amqqueue:get_pid(Q),
case ets:lookup(ra_state, Name) of
[{_, State}] -> State;
_ -> not_member
end;
-i(garbage_collection, #amqqueue{pid = {Name, _}}) ->
+i(garbage_collection, Q) when ?is_amqqueue(Q) ->
+ {Name, _} = amqqueue:get_pid(Q),
try
rabbit_misc:get_gc_info(whereis(Name))
catch
error:badarg ->
[]
end;
-i(members, #amqqueue{quorum_nodes = Nodes}) ->
- Nodes;
+i(members, Q) when ?is_amqqueue(Q) ->
+ amqqueue:get_quorum_nodes(Q);
i(online, Q) -> online(Q);
i(leader, Q) -> leader(Q);
-i(open_files, #amqqueue{pid = {Name, _},
- quorum_nodes = Nodes}) ->
+i(open_files, Q) when ?is_amqqueue(Q) ->
+ {Name, _} = amqqueue:get_pid(Q),
+ Nodes = amqqueue:get_quorum_nodes(Q),
{Data, _} = rpc:multicall(Nodes, rabbit_quorum_queue, open_files, [Name]),
lists:flatten(Data);
-i(single_active_consumer_pid, #amqqueue{pid = QPid}) ->
+i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
{ok, {_, SacResult}, _} = ra:local_query(QPid,
fun rabbit_fifo:query_single_active_consumer/1),
case SacResult of
@@ -787,7 +829,8 @@ i(single_active_consumer_pid, #amqqueue{pid = QPid}) ->
_ ->
''
end;
-i(single_active_consumer_ctag, #amqqueue{pid = QPid}) ->
+i(single_active_consumer_ctag, Q) when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
{ok, {_, SacResult}, _} = ra:local_query(QPid,
fun rabbit_fifo:query_single_active_consumer/1),
case SacResult of
@@ -807,17 +850,20 @@ open_files(Name) ->
end
end.
-leader(#amqqueue{pid = {Name, Leader}}) ->
+leader(Q) when ?is_amqqueue(Q) ->
+ {Name, Leader} = amqqueue:get_pid(Q),
case is_process_alive(Name, Leader) of
true -> Leader;
false -> ''
end.
-online(#amqqueue{quorum_nodes = Nodes,
- pid = {Name, _Leader}}) ->
+online(Q) when ?is_amqqueue(Q) ->
+ Nodes = amqqueue:get_quorum_nodes(Q),
+ {Name, _} = amqqueue:get_pid(Q),
[Node || Node <- Nodes, is_process_alive(Name, Node)].
-format(#amqqueue{quorum_nodes = Nodes} = Q) ->
+format(Q) when ?is_amqqueue(Q) ->
+ Nodes = amqqueue:get_quorum_nodes(Q),
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].
is_process_alive(Name, Node) ->
@@ -839,11 +885,6 @@ quorum_ctag(Other) ->
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
-qnode(QPid) when is_pid(QPid) ->
- node(QPid);
-qnode({_, Node}) ->
- Node.
-
check_invalid_arguments(QueueName, Args) ->
Keys = [<<"x-expires">>, <<"x-message-ttl">>,
<<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>],
@@ -856,7 +897,8 @@ check_invalid_arguments(QueueName, Args) ->
end || Key <- Keys],
ok.
-check_auto_delete(#amqqueue{auto_delete = true, name = Name}) ->
+check_auto_delete(Q) when ?amqqueue_is_auto_delete(Q) ->
+ Name = amqqueue:get_name(Q),
rabbit_misc:protocol_error(
precondition_failed,
"invalid property 'auto-delete' for ~s",
@@ -864,18 +906,19 @@ check_auto_delete(#amqqueue{auto_delete = true, name = Name}) ->
check_auto_delete(_) ->
ok.
-check_exclusive(#amqqueue{exclusive_owner = none}) ->
+check_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
ok;
-check_exclusive(#amqqueue{name = Name}) ->
+check_exclusive(Q) when ?is_amqqueue(Q) ->
+ Name = amqqueue:get_name(Q),
rabbit_misc:protocol_error(
precondition_failed,
"invalid property 'exclusive-owner' for ~s",
[rabbit_misc:rs(Name)]).
-check_non_durable(#amqqueue{durable = true}) ->
+check_non_durable(Q) when ?amqqueue_is_durable(Q) ->
ok;
-check_non_durable(#amqqueue{name = Name,
- durable = false}) ->
+check_non_durable(Q) when not ?amqqueue_is_durable(Q) ->
+ Name = amqqueue:get_name(Q),
rabbit_misc:protocol_error(
precondition_failed,
"invalid property 'non-durable' for ~s",
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 1fab94fe34..272e3966b2 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -24,7 +24,7 @@
%% for testing purposes
-export([definitions/0]).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
%%----------------------------------------------------------------------------
-type retry() :: boolean().
@@ -349,13 +349,13 @@ definitions() ->
{match, #runtime_parameters{_='_'}}]},
{rabbit_durable_queue,
[{record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)},
+ {attributes, amqqueue:fields()},
{disc_copies, [node()]},
- {match, #amqqueue{name = queue_name_match(), _='_'}}]},
+ {match, amqqueue:pattern_match_on_name(queue_name_match())}]},
{rabbit_queue,
[{record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)},
- {match, #amqqueue{name = queue_name_match(), _='_'}}]}]
+ {attributes, amqqueue:fields()},
+ {match, amqqueue:pattern_match_on_name(queue_name_match())}]}]
++ gm:table_definitions()
++ mirrored_supervisor:table_definitions().
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 6be812dad3..76a8b59e31 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -60,8 +60,6 @@
-rabbit_upgrade({queue_vhost_field, mnesia, [operator_policies]}).
-rabbit_upgrade({topic_permission, mnesia, []}).
-rabbit_upgrade({queue_options, mnesia, [queue_vhost_field]}).
--rabbit_upgrade({queue_type, mnesia, [queue_options]}).
--rabbit_upgrade({queue_quorum_nodes, mnesia, [queue_type]}).
-rabbit_upgrade({exchange_options, mnesia, [operator_policies]}).
%% TODO: move that to feature flags
@@ -103,8 +101,6 @@
-spec operator_policies() -> 'ok'.
-spec queue_vhost_field() -> 'ok'.
-spec queue_options() -> 'ok'.
--spec queue_type() -> 'ok'.
--spec queue_quorum_nodes() -> 'ok'.
-spec exchange_options() -> 'ok'.
-spec remove_explicit_default_exchange_bindings() -> 'ok'.
@@ -584,47 +580,6 @@ queue_options(Table) ->
sync_slave_pids, recoverable_slaves, policy, operator_policy,
gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost, options]).
-queue_type() ->
- ok = queue_type(rabbit_queue),
- ok = queue_type(rabbit_durable_queue),
- ok.
-
-queue_type(Table) ->
- transform(
- Table,
- fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
- Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators,
- State, PolicyVersion, SlavePidsPendingShutdown, VHost, Options}) ->
- {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
- Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators,
- State, PolicyVersion, SlavePidsPendingShutdown, VHost, Options, classic}
- end,
- [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
- sync_slave_pids, recoverable_slaves, policy, operator_policy,
- gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost, options,
- type]).
-
-queue_quorum_nodes() ->
- ok = queue_quorum_nodes(rabbit_queue),
- ok = queue_quorum_nodes(rabbit_durable_queue),
- ok.
-
-queue_quorum_nodes(Table) ->
- transform(
- Table,
- fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
- Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators,
- State, PolicyVersion, SlavePidsPendingShutdown, VHost, Options, Type}) ->
- {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
- Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators,
- State, PolicyVersion, SlavePidsPendingShutdown, VHost, Options, Type,
- undefined}
- end,
- [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
- sync_slave_pids, recoverable_slaves, policy, operator_policy,
- gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost, options,
- type, quorum_nodes]).
-
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 4da073a518..f0ad0eb6e1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -356,8 +356,9 @@
-define(QUEUE, lqueue).
--include("rabbit.hrl").
--include("rabbit_framing.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+-include("amqqueue.hrl").
%%----------------------------------------------------------------------------
@@ -532,8 +533,9 @@ init(Queue, Recover, Callback) ->
fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end,
fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end).
-init(#amqqueue { name = QueueName, durable = IsDurable }, new,
- AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
+init(Q, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) ->
+ QueueName = amqqueue:get_name(Q),
+ IsDurable = amqqueue:is_durable(Q),
IndexState = rabbit_queue_index:init(QueueName,
MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
VHost = QueueName#resource.virtual_host,
@@ -547,8 +549,9 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, VHost), VHost);
%% We can be recovering a transient queue if it crashed
-init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
- AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
+init(Q, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) ->
+ QueueName = amqqueue:get_name(Q),
+ IsDurable = amqqueue:is_durable(Q),
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
VHost = QueueName#resource.virtual_host,
{PersistentClient, ContainsCheckFun} =
@@ -620,7 +623,8 @@ delete_and_terminate(_Reason, State) ->
rabbit_msg_store:client_delete_and_terminate(MSCStateT),
a(State2 #vqstate { msg_store_clients = undefined }).
-delete_crashed(#amqqueue{name = QName}) ->
+delete_crashed(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
ok = rabbit_queue_index:erase(QName).
purge(State = #vqstate { len = Len }) ->
@@ -2825,7 +2829,7 @@ move_messages_to_vhost_store(Queues) ->
%% Move the queue index for each persistent queue to the new store
lists:foreach(
fun(Queue) ->
- #amqqueue{name = QueueName} = Queue,
+ QueueName = amqqueue:get_name(Queue),
rabbit_queue_index:move_to_per_vhost_stores(QueueName)
end,
Queues),
@@ -2938,17 +2942,16 @@ list_persistent_queues() ->
Node = node(),
mnesia:async_dirty(
fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
- pid = Pid}
- <- mnesia:table(rabbit_durable_queue),
- node(Pid) == Node,
- mnesia:read(rabbit_queue, Name, read) =:= []]))
+ qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
+ ?amqqueue_is_classic(Q),
+ amqqueue:qnode(Q) == Node,
+ mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= []]))
end).
read_old_recovery_terms([]) ->
{[], [], ?EMPTY_START_FUN_STATE};
read_old_recovery_terms(Queues) ->
- QueueNames = [Name || #amqqueue{name = Name} <- Queues],
+ QueueNames = [amqqueue:get_name(Q) || Q <- Queues],
{AllTerms, StartFunState} = rabbit_queue_index:read_global_recovery_terms(QueueNames),
Refs = [Ref || Terms <- AllTerms,
Terms /= non_clean_shutdown,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index cf12d04ce8..75f96ac1eb 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -16,7 +16,7 @@
-module(rabbit_vhost).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
%%----------------------------------------------------------------------------
@@ -72,8 +72,8 @@ recover(VHost) ->
ok = rabbit_file:ensure_dir(VHostStubFile),
ok = file:write_file(VHostStubFile, VHost),
Qs = rabbit_amqqueue:recover(VHost),
- ok = rabbit_binding:recover(rabbit_exchange:recover(VHost),
- [QName || #amqqueue{name = QName} <- Qs]),
+ QNames = [amqqueue:get_name(Q) || Q <- Qs],
+ ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames),
ok = rabbit_amqqueue:start(Qs),
%% Start queue mirrors.
ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
@@ -142,8 +142,10 @@ delete(VHostPath, ActingUser) ->
%% notifications which must be sent outside the TX
rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]),
QDelFun = fun (Q) -> rabbit_amqqueue:delete(Q, false, false, ActingUser) end,
- [assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser) ||
- #amqqueue{name = Name} <- rabbit_amqqueue:list(VHostPath)],
+ [begin
+ Name = amqqueue:get_name(Q),
+ assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser)
+ end || Q <- rabbit_amqqueue:list(VHostPath)],
[assert_benign(rabbit_exchange:delete(Name, false, ActingUser), ActingUser) ||
#exchange{name = Name} <- rabbit_exchange:list(VHostPath)],
Funs = rabbit_misc:execute_mnesia_transaction(
@@ -226,7 +228,8 @@ assert_benign({error, not_found}, _) -> ok;
assert_benign({error, {absent, Q, _}}, ActingUser) ->
%% Removing the mnesia entries here is safe. If/when the down node
%% restarts, it will clear out the on-disk storage of the queue.
- case rabbit_amqqueue:internal_delete(Q#amqqueue.name, ActingUser) of
+ QName = amqqueue:get_name(Q),
+ case rabbit_amqqueue:internal_delete(QName, ActingUser) of
ok -> ok;
{error, not_found} -> ok
end.
diff --git a/test/amqqueue_backward_compatibility_SUITE.erl b/test/amqqueue_backward_compatibility_SUITE.erl
new file mode 100644
index 0000000000..05a049c9bb
--- /dev/null
+++ b/test/amqqueue_backward_compatibility_SUITE.erl
@@ -0,0 +1,302 @@
+-module(amqqueue_backward_compatibility_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-include("amqqueue.hrl").
+
+-export([all/0,
+ groups/0,
+ init_per_suite/2,
+ end_per_suite/2,
+ init_per_group/2,
+ end_per_group/2,
+ init_per_testcase/2,
+ end_per_testcase/2,
+
+ new_amqqueue_v1_is_amqqueue/1,
+ new_amqqueue_v2_is_amqqueue/1,
+ random_term_is_not_amqqueue/1,
+
+ amqqueue_v1_is_durable/1,
+ amqqueue_v2_is_durable/1,
+ random_term_is_not_durable/1,
+
+ amqqueue_v1_state_matching/1,
+ amqqueue_v2_state_matching/1,
+ random_term_state_matching/1,
+
+ amqqueue_v1_type_matching/1,
+ amqqueue_v2_type_matching/1,
+ random_term_type_matching/1,
+
+ upgrade_v1_to_v2/1
+ ]).
+
+-define(long_tuple, {random_tuple, a, b, c, d, e, f, g, h, i, j, k, l, m,
+ n, o, p, q, r, s, t, u, v, w, x, y, z}).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ [
+ {parallel_tests, [parallel], [new_amqqueue_v1_is_amqqueue,
+ new_amqqueue_v2_is_amqqueue,
+ random_term_is_not_amqqueue,
+ amqqueue_v1_is_durable,
+ amqqueue_v2_is_durable,
+ random_term_is_not_durable,
+ amqqueue_v1_state_matching,
+ amqqueue_v2_state_matching,
+ random_term_state_matching,
+ amqqueue_v1_type_matching,
+ amqqueue_v2_type_matching,
+ random_term_type_matching]}
+ ].
+
+init_per_suite(_, Config) -> Config.
+end_per_suite(_, Config) -> Config.
+
+init_per_group(_, Config) -> Config.
+end_per_group(_, Config) -> Config.
+
+init_per_testcase(_, Config) -> Config.
+end_per_testcase(_, Config) -> Config.
+
+new_amqqueue_v1_is_amqqueue(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?is_amqqueue(Queue)),
+ ?assert(?is_amqqueue_v1(Queue)),
+ ?assert(not ?is_amqqueue_v2(Queue)),
+ ?assert(?amqqueue_is_classic(Queue)),
+ ?assert(amqqueue:is_classic(Queue)),
+ ?assert(not ?amqqueue_is_quorum(Queue)),
+ ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)),
+ ?assert(?amqqueue_has_valid_pid(Queue)),
+ ?assert(?amqqueue_pid_equals(Queue, self())),
+ ?assert(?amqqueue_pids_are_equal(Queue, Queue)),
+ ?assert(?amqqueue_pid_runs_on_local_node(Queue)),
+ ?assert(amqqueue:qnode(Queue) == node()).
+
+new_amqqueue_v2_is_amqqueue(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2),
+ Queue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(?is_amqqueue(Queue)),
+ ?assert(?is_amqqueue_v2(Queue)),
+ ?assert(not ?is_amqqueue_v1(Queue)),
+ ?assert(?amqqueue_is_classic(Queue)),
+ ?assert(amqqueue:is_classic(Queue)),
+ ?assert(not ?amqqueue_is_quorum(Queue)),
+ ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)),
+ ?assert(?amqqueue_has_valid_pid(Queue)),
+ ?assert(?amqqueue_pid_equals(Queue, self())),
+ ?assert(?amqqueue_pids_are_equal(Queue, Queue)),
+ ?assert(?amqqueue_pid_runs_on_local_node(Queue)),
+ ?assert(amqqueue:qnode(Queue) == node()).
+
+random_term_is_not_amqqueue(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?is_amqqueue(Term)),
+ ?assert(not ?is_amqqueue_v2(Term)),
+ ?assert(not ?is_amqqueue_v1(Term)).
+
+%% -------------------------------------------------------------------
+
+amqqueue_v1_is_durable(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ TransientQueue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ DurableQueue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(not ?amqqueue_is_durable(TransientQueue)),
+ ?assert(?amqqueue_is_durable(DurableQueue)).
+
+amqqueue_v2_is_durable(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ TransientQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ DurableQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(not ?amqqueue_is_durable(TransientQueue)),
+ ?assert(?amqqueue_is_durable(DurableQueue)).
+
+random_term_is_not_durable(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?amqqueue_is_durable(Term)).
+
+%% -------------------------------------------------------------------
+
+amqqueue_v1_state_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue1 = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?amqqueue_state_is(Queue1, live)),
+ Queue2 = amqqueue:set_state(Queue1, stopped),
+ ?assert(?amqqueue_state_is(Queue2, stopped)).
+
+amqqueue_v2_state_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue1 = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(?amqqueue_state_is(Queue1, live)),
+ Queue2 = amqqueue:set_state(Queue1, stopped),
+ ?assert(?amqqueue_state_is(Queue2, stopped)).
+
+random_term_state_matching(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?amqqueue_state_is(Term, live)).
+
+%% -------------------------------------------------------------------
+
+amqqueue_v1_type_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?amqqueue_is_classic(Queue)),
+ ?assert(amqqueue:is_classic(Queue)),
+ ?assert(not ?amqqueue_is_quorum(Queue)).
+
+amqqueue_v2_type_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ ClassicQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(?amqqueue_is_classic(ClassicQueue)),
+ ?assert(amqqueue:is_classic(ClassicQueue)),
+ ?assert(not ?amqqueue_is_quorum(ClassicQueue)),
+ ?assert(not amqqueue:is_quorum(ClassicQueue)),
+ QuorumQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ quorum),
+ ?assert(not ?amqqueue_is_classic(QuorumQueue)),
+ ?assert(not amqqueue:is_classic(QuorumQueue)),
+ ?assert(?amqqueue_is_quorum(QuorumQueue)),
+ ?assert(amqqueue:is_quorum(QuorumQueue)).
+
+random_term_type_matching(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?amqqueue_is_classic(Term)),
+ ?assert(not ?amqqueue_is_quorum(Term)),
+ ?assertException(error, function_clause, amqqueue:is_classic(Term)),
+ ?assertException(error, function_clause, amqqueue:is_quorum(Term)).
+
+%% -------------------------------------------------------------------
+
+upgrade_v1_to_v2(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ OldQueue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?is_amqqueue_v1(OldQueue)),
+ ?assert(not ?is_amqqueue_v2(OldQueue)),
+ NewQueue = amqqueue:upgrade_to(amqqueue_v2, OldQueue),
+ ?assert(not ?is_amqqueue_v1(NewQueue)),
+ ?assert(?is_amqqueue_v2(NewQueue)).
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index 433bc66bff..bc9b2c8ced 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include("amqqueue.hrl").
-compile(export_all).
@@ -686,11 +687,10 @@ bq_variable_queue_delete_msg_store_files_callback(Config) ->
bq_variable_queue_delete_msg_store_files_callback1(Config) ->
ok = restart_msg_store_empty(),
- {new, #amqqueue { pid = QPid, name = QName } = Q} =
- rabbit_amqqueue:declare(
- queue_name(Config,
- <<"bq_variable_queue_delete_msg_store_files_callback-q">>),
- true, false, [], none, <<"acting-user">>),
+ QName0 = queue_name(Config, <<"bq_variable_queue_delete_msg_store_files_callback-q">>),
+ {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
publish_and_confirm(Q, Payload, Count),
@@ -718,9 +718,10 @@ bq_queue_recover(Config) ->
bq_queue_recover1(Config) ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
- {new, #amqqueue { pid = QPid, name = QName } = Q} =
- rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>),
- true, false, [], none, <<"acting-user">>),
+ QName0 = queue_name(Config, <<"bq_queue_recover-q">>),
+ {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
publish_and_confirm(Q, <<>>, Count),
SupPid = get_queue_sup_pid(Q),
@@ -736,7 +737,8 @@ bq_queue_recover1(Config) ->
{ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,
- fun (Q1 = #amqqueue { pid = QPid1 }) ->
+ fun (Q1) when ?is_amqqueue(Q1) ->
+ QPid1 = amqqueue:get_pid(Q1),
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false, Limiter,
@@ -752,7 +754,9 @@ bq_queue_recover1(Config) ->
passed.
%% Return the PID of the given queue's supervisor.
-get_queue_sup_pid(#amqqueue { pid = QPid, name = QName }) ->
+get_queue_sup_pid(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
VHost = QName#resource.virtual_host,
{ok, AmqSup} = rabbit_amqqueue_sup_sup:find_for_vhost(VHost, node(QPid)),
Sups = supervisor:which_children(AmqSup),
@@ -1498,8 +1502,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
end, {VQ, []}, lists:seq(1, Count)).
test_amqqueue(QName, Durable) ->
- (rabbit_amqqueue:pseudo_queue(QName, self()))
- #amqqueue { durable = Durable }.
+ rabbit_amqqueue:pseudo_queue(QName, self(), Durable).
assert_prop(List, Prop, Value) ->
case proplists:get_value(Prop, List)of
diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl
index 9bfa0ae07a..77da6133d8 100644
--- a/test/channel_operation_timeout_SUITE.erl
+++ b/test/channel_operation_timeout_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include("amqqueue.hrl").
-compile([export_all]).
@@ -169,9 +170,16 @@ get_consumers(Config, Node, VHost) when is_atom(Node),
rabbit_ct_broker_helpers:rpc(Config, Node,
rabbit_amqqueue, consumers_all, [VHost]).
-get_amqqueue(Q, []) -> throw({not_found, Q});
-get_amqqueue(Q, [AMQQ = #amqqueue{name = Q} | _]) -> AMQQ;
-get_amqqueue(Q, [_| Rem]) -> get_amqqueue(Q, Rem).
+get_amqqueue(QName0, []) ->
+ throw({not_found, QName0});
+get_amqqueue(QName0, [Q | Rem]) when ?is_amqqueue(Q) ->
+ QName1 = amqqueue:get_name(Q),
+ compare_amqqueue(QName0, QName1, Q, Rem).
+
+compare_amqqueue(QName, QName, Q, _Rem) ->
+ Q;
+compare_amqqueue(QName, _, _, Rem) ->
+ get_amqqueue(QName, Rem).
qconfig(Ch, Name, Ex, Consume, Deliver) ->
[{ch, Ch}, {name, Name}, {ex,Ex}, {consume, Consume}, {deliver, Deliver}].
diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl
index c52dc9ef64..dc30825f8c 100644
--- a/test/cluster_SUITE.erl
+++ b/test/cluster_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include("include/amqqueue.hrl").
-compile(export_all).
@@ -225,9 +226,9 @@ declare_on_dead_queue1(_Config, SecondaryNode) ->
Self = self(),
Pid = spawn(SecondaryNode,
fun () ->
- {new, #amqqueue{name = QueueName, pid = QPid}} =
- rabbit_amqqueue:declare(QueueName, false, false, [],
- none, <<"acting-user">>),
+ {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>),
+ QueueName = ?amqqueue_field_name(Q),
+ QPid = ?amqqueue_field_pid(Q),
exit(QPid, kill),
Self ! {self(), killed, QPid}
end),
@@ -269,12 +270,12 @@ must_exit(Fun) ->
end.
dead_queue_loop(QueueName, OldPid) ->
- {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none,
- <<"acting-user">>),
- case Q#amqqueue.pid of
+ {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>),
+ QPid = ?amqqueue_field_pid(Q),
+ case QPid of
OldPid -> timer:sleep(25),
dead_queue_loop(QueueName, OldPid);
- _ -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
+ _ -> true = rabbit_misc:is_process_alive(QPid),
Q
end.
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index 120257feb9..5ae2fb687c 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -315,14 +315,14 @@ forget_offline_removes_things(Config) ->
forget_promotes_offline_slave(Config) ->
[A, B, C, D] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
- Q = <<"mirrored-queue">>,
- declare(ACh, Q),
- set_ha_policy(Config, Q, A, [B, C]),
- set_ha_policy(Config, Q, A, [C, D]), %% Test add and remove from recoverable_slaves
+ QName = <<"mirrored-queue">>,
+ declare(ACh, QName),
+ set_ha_policy(Config, QName, A, [B, C]),
+ set_ha_policy(Config, QName, A, [C, D]), %% Test add and remove from recoverable_slaves
%% Publish and confirm
amqp_channel:call(ACh, #'confirm.select'{}),
- amqp_channel:cast(ACh, #'basic.publish'{routing_key = Q},
+ amqp_channel:cast(ACh, #'basic.publish'{routing_key = QName},
#amqp_msg{props = #'P_basic'{delivery_mode = 2}}),
amqp_channel:wait_for_confirms(ACh),
@@ -353,26 +353,50 @@ forget_promotes_offline_slave(Config) ->
ok = rabbit_ct_broker_helpers:start_node(Config, D),
DCh2 = rabbit_ct_client_helpers:open_channel(Config, D),
- #'queue.declare_ok'{message_count = 1} = declare(DCh2, Q),
+ #'queue.declare_ok'{message_count = 1} = declare(DCh2, QName),
ok.
-set_ha_policy(Config, Q, Master, Slaves) ->
+set_ha_policy(Config, QName, Master, Slaves) ->
Nodes = [list_to_binary(atom_to_list(N)) || N <- [Master | Slaves]],
- rabbit_ct_broker_helpers:set_ha_policy(Config, Master, Q,
- {<<"nodes">>, Nodes}),
- await_slaves(Q, Master, Slaves).
-
-await_slaves(Q, Master, Slaves) ->
- {ok, #amqqueue{pid = MPid,
- slave_pids = SPids}} =
- rpc:call(Master, rabbit_amqqueue, lookup,
- [rabbit_misc:r(<<"/">>, queue, Q)]),
- ActMaster = node(MPid),
+ HaPolicy = {<<"nodes">>, Nodes},
+ rabbit_ct_broker_helpers:set_ha_policy(Config, Master, QName, HaPolicy),
+ await_slaves(QName, Master, Slaves).
+
+await_slaves(QName, Master, Slaves) ->
+ await_slaves_0(QName, Master, Slaves, 10).
+
+await_slaves_0(QName, Master, Slaves0, Tries) ->
+ {ok, Queue} = await_slaves_lookup_queue(QName, Master),
+ SPids = amqqueue:get_slave_pids(Queue),
+ ActMaster = amqqueue:qnode(Queue),
ActSlaves = lists:usort([node(P) || P <- SPids]),
- case {Master, lists:usort(Slaves)} of
- {ActMaster, ActSlaves} -> ok;
- _ -> timer:sleep(100),
- await_slaves(Q, Master, Slaves)
+ Slaves1 = lists:usort(Slaves0),
+ await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves1, Tries).
+
+await_slaves_1(QName, _ActMaster, _ActSlaves, _Master, _Slaves, 0) ->
+ error({timeout_waiting_for_slaves, QName});
+await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves, Tries) ->
+ case {Master, Slaves} of
+ {ActMaster, ActSlaves} ->
+ ok;
+ _ ->
+ timer:sleep(250),
+ await_slaves_0(QName, Master, Slaves, Tries - 1)
+ end.
+
+await_slaves_lookup_queue(QName, Master) ->
+ await_slaves_lookup_queue(QName, Master, 10).
+
+await_slaves_lookup_queue(QName, _Master, 0) ->
+ error({timeout_looking_up_queue, QName});
+await_slaves_lookup_queue(QName, Master, Tries) ->
+ RpcArgs = [rabbit_misc:r(<<"/">>, queue, QName)],
+ case rpc:call(Master, rabbit_amqqueue, lookup, RpcArgs) of
+ {error, not_found} ->
+ timer:sleep(250),
+ await_slaves_lookup_queue(QName, Master, Tries - 1);
+ {ok, Q} ->
+ {ok, Q}
end.
force_boot(Config) ->
diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl
index 2d91083abc..7b8ec91346 100644
--- a/test/crashing_queues_SUITE.erl
+++ b/test/crashing_queues_SUITE.erl
@@ -217,9 +217,10 @@ kill_queue(Node, QName) ->
await_new_pid(Node, QName, Pid1).
queue_pid(Node, QName) ->
- #amqqueue{pid = QPid,
- state = State,
- name = #resource{virtual_host = VHost}} = lookup(Node, QName),
+ Q = lookup(Node, QName),
+ QPid = amqqueue:get_pid(Q),
+ State = amqqueue:get_state(Q),
+ #resource{virtual_host = VHost} = amqqueue:get_name(Q),
case State of
crashed ->
case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl
index 9db866e3c6..8a0ab98241 100644
--- a/test/priority_queue_SUITE.erl
+++ b/test/priority_queue_SUITE.erl
@@ -366,9 +366,9 @@ info_head_message_timestamp1(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue,
<<"info_head_message_timestamp-queue">>),
Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
- Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 2}]},
+ Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 2}]),
PQ = rabbit_priority_queue,
- BQS1 = PQ:init(Q, new, fun(_, _) -> ok end),
+ BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end),
%% The queue is empty: no timestamp.
true = PQ:is_empty(BQS1),
'' = PQ:info(head_message_timestamp, BQS1),
@@ -415,9 +415,9 @@ info_head_message_timestamp1(_Config) ->
ram_duration(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"ram_duration-queue">>),
Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
- Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 5}]},
+ Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 5}]),
PQ = rabbit_priority_queue,
- BQS1 = PQ:init(Q, new, fun(_, _) -> ok end),
+ BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end),
{_Duration1, BQS2} = PQ:ram_duration(BQS1),
BQS3 = PQ:set_ram_duration_target(infinity, BQS2),
BQS4 = PQ:set_ram_duration_target(1, BQS3),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 9f40bc7b0c..0f9010f204 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -2228,11 +2228,10 @@ assert_queue_type(Server, Q, Expected) ->
Actual = get_queue_type(Server, Q),
Expected = Actual.
-get_queue_type(Server, Q) ->
- QNameRes = rabbit_misc:r(<<"/">>, queue, Q),
- {ok, AMQQueue} =
- rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
- AMQQueue#amqqueue.type.
+get_queue_type(Server, Q0) ->
+ QNameRes = rabbit_misc:r(<<"/">>, queue, Q0),
+ {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
+ amqqueue:get_type(Q1).
wait_for_messages(Config, Stats) ->
wait_for_messages(Config, lists:sort(Stats), 60).
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index 7ae43aa7e1..42ac863ead 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -364,11 +364,9 @@ cluster_queue_metrics(Config) ->
% Synchronize
Name = rabbit_misc:r(VHost, queue, QueueName),
- [#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0,
- ets, lookup,
- [rabbit_queue, Name]),
- ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue,
- sync_mirrors, [QPid]),
+ [Q] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, lookup, [rabbit_queue, Name]),
+ QPid = amqqueue:get_pid(Q),
+ ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, sync_mirrors, [QPid]),
% Check ETS table for data
wait_for(fun () ->
diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl
index e3e282f233..d2db382e30 100644
--- a/test/unit_inbroker_non_parallel_SUITE.erl
+++ b/test/unit_inbroker_non_parallel_SUITE.erl
@@ -568,7 +568,7 @@ head_message_timestamp1(_Config) ->
QRes = rabbit_misc:r(<<"/">>, queue, QName),
{ok, Q1} = rabbit_amqqueue:lookup(QRes),
- QPid = Q1#amqqueue.pid,
+ QPid = amqqueue:get_pid(Q1),
%% Set up event receiver for queue
dummy_event_receiver:start(self(), [node()], [queue_stats]),
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index d028ed3ccf..f4f4971517 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -283,8 +283,7 @@ wait_for_confirms(Unconfirmed) ->
end.
test_amqqueue(Durable) ->
- (rabbit_amqqueue:pseudo_queue(test_queue(), self()))
- #amqqueue { durable = Durable }.
+ rabbit_amqqueue:pseudo_queue(test_queue(), self(), Durable).
assert_prop(List, Prop, Value) ->
case proplists:get_value(Prop, List)of
@@ -709,7 +708,7 @@ head_message_timestamp1(_Config) ->
QRes = rabbit_misc:r(<<"/">>, queue, QName),
{ok, Q1} = rabbit_amqqueue:lookup(QRes),
- QPid = Q1#amqqueue.pid,
+ QPid = amqqueue:get_pid(Q1),
%% Set up event receiver for queue
dummy_event_receiver:start(self(), [node()], [queue_stats]),
@@ -958,7 +957,7 @@ confirms1(_Config) ->
QName2 = DeclareBindDurableQueue(),
%% Get the first one's pid (we'll crash it later)
{ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)),
- QPid1 = Q1#amqqueue.pid,
+ QPid1 = amqqueue:get_pid(Q1),
%% Enable confirms
rabbit_channel:do(Ch, #'confirm.select'{}),
receive
diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl
index 2de5819b54..5b6a6bd6ff 100644
--- a/test/vhost_SUITE.erl
+++ b/test/vhost_SUITE.erl
@@ -354,7 +354,7 @@ node_starts_with_dead_vhosts_and_ignore_slaves(Config) ->
Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
- #amqqueue{sync_slave_pids = [Pid]} = Q,
+ [Pid] = amqqueue:get_sync_slave_pids(Q),
Node1 = node(Pid),