diff options
Diffstat (limited to 'deps/rabbit/src/amqqueue.erl')
-rw-r--r-- | deps/rabbit/src/amqqueue.erl | 762 |
1 files changed, 762 insertions, 0 deletions
diff --git a/deps/rabbit/src/amqqueue.erl b/deps/rabbit/src/amqqueue.erl new file mode 100644 index 0000000000..3415ebd073 --- /dev/null +++ b/deps/rabbit/src/amqqueue.erl @@ -0,0 +1,762 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(amqqueue). %% Could become amqqueue_v2 in the future. + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("amqqueue.hrl"). + +-export([new/8, + new/9, + new_with_version/9, + new_with_version/10, + fields/0, + fields/1, + field_vhost/0, + record_version_to_use/0, + upgrade/1, + upgrade_to/2, + % arguments + get_arguments/1, + set_arguments/2, + % decorators + get_decorators/1, + set_decorators/2, + % exclusive_owner + get_exclusive_owner/1, + % gm_pids + get_gm_pids/1, + set_gm_pids/2, + get_leader/1, + % name (#resource) + get_name/1, + set_name/2, + % operator_policy + get_operator_policy/1, + set_operator_policy/2, + get_options/1, + % pid + get_pid/1, + set_pid/2, + % policy + get_policy/1, + set_policy/2, + % policy_version + get_policy_version/1, + set_policy_version/2, + % type_state + get_type_state/1, + set_type_state/2, + % recoverable_slaves + get_recoverable_slaves/1, + set_recoverable_slaves/2, + % slave_pids + get_slave_pids/1, + set_slave_pids/2, + % slave_pids_pending_shutdown + get_slave_pids_pending_shutdown/1, + set_slave_pids_pending_shutdown/2, + % state + get_state/1, + set_state/2, + % sync_slave_pids + get_sync_slave_pids/1, + set_sync_slave_pids/2, + get_type/1, + get_vhost/1, + is_amqqueue/1, + is_auto_delete/1, + is_durable/1, + is_classic/1, + is_quorum/1, + pattern_match_all/0, + pattern_match_on_name/1, + pattern_match_on_type/1, + reset_mirroring_and_decorators/1, + set_immutable/1, + qnode/1, + macros/0]). + +-define(record_version, amqqueue_v2). +-define(is_backwards_compat_classic(T), + (T =:= classic orelse T =:= ?amqqueue_v1_type)). + +-record(amqqueue, { + name :: rabbit_amqqueue:name() | '_', %% immutable + durable :: boolean() | '_', %% immutable + auto_delete :: boolean() | '_', %% immutable + exclusive_owner = none :: pid() | none | '_', %% immutable + arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable + pid :: pid() | ra_server_id() | none | '_', %% durable (just so we + %% know home node) + slave_pids = [] :: [pid()] | none | '_', %% transient + sync_slave_pids = [] :: [pid()] | none| '_',%% transient + recoverable_slaves = [] :: [atom()] | none | '_', %% durable + policy :: binary() | none | undefined | '_', %% durable, implicit + %% update as above + operator_policy :: binary() | none | undefined | '_', %% durable, + %% implicit + %% update + %% as above + gm_pids = [] :: [{pid(), pid()}] | none | '_', %% transient + decorators :: [atom()] | none | undefined | '_', %% transient, + %% recalculated + %% as above + state = live :: atom() | none | '_', %% durable (have we crashed?) + policy_version = 0 :: non_neg_integer() | '_', + slave_pids_pending_shutdown = [] :: [pid()] | '_', + vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index + options = #{} :: map() | '_', + type = ?amqqueue_v1_type :: module() | '_', + type_state = #{} :: map() | '_' + }). + +-type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2(). +-type amqqueue_v2() :: #amqqueue{ + name :: rabbit_amqqueue:name(), + durable :: boolean(), + auto_delete :: boolean(), + exclusive_owner :: pid() | none, + arguments :: rabbit_framing:amqp_table(), + pid :: pid() | ra_server_id() | none, + slave_pids :: [pid()] | none, + sync_slave_pids :: [pid()] | none, + recoverable_slaves :: [atom()] | none, + policy :: binary() | none | undefined, + operator_policy :: binary() | none | undefined, + gm_pids :: [{pid(), pid()}] | none, + decorators :: [atom()] | none | undefined, + state :: atom() | none, + policy_version :: non_neg_integer(), + slave_pids_pending_shutdown :: [pid()], + vhost :: rabbit_types:vhost() | undefined, + options :: map(), + type :: atom(), + type_state :: #{} + }. + +-type ra_server_id() :: {Name :: atom(), Node :: node()}. + +-type amqqueue_pattern() :: amqqueue_v1:amqqueue_v1_pattern() | + amqqueue_v2_pattern(). +-type amqqueue_v2_pattern() :: #amqqueue{ + name :: rabbit_amqqueue:name() | '_', + durable :: '_', + auto_delete :: '_', + exclusive_owner :: '_', + arguments :: '_', + pid :: '_', + slave_pids :: '_', + sync_slave_pids :: '_', + recoverable_slaves :: '_', + policy :: '_', + operator_policy :: '_', + gm_pids :: '_', + decorators :: '_', + state :: '_', + policy_version :: '_', + slave_pids_pending_shutdown :: '_', + vhost :: '_', + options :: '_', + type :: atom() | '_', + type_state :: '_' + }. + +-export_type([amqqueue/0, + amqqueue_v2/0, + amqqueue_pattern/0, + amqqueue_v2_pattern/0, + ra_server_id/0]). + +-spec new(rabbit_amqqueue:name(), + pid() | ra_server_id() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, + map()) -> amqqueue(). + +new(#resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options) + when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) -> + new(Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + ?amqqueue_v1_type). + +-spec new(rabbit_amqqueue:name(), + pid() | ra_server_id() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, + map(), + atom()) -> amqqueue(). + +new(#resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + Type) + when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) andalso + is_atom(Type) -> + case record_version_to_use() of + ?record_version -> + new_with_version( + ?record_version, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + Type); + _ -> + amqqueue_v1:new( + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + Type) + end. + +-spec new_with_version +(amqqueue_v1 | amqqueue_v2, + rabbit_amqqueue:name(), + pid() | ra_server_id() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, + map()) -> amqqueue(). + +new_with_version(RecordVersion, + #resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options) + when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) -> + new_with_version(RecordVersion, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + ?amqqueue_v1_type). + +-spec new_with_version +(amqqueue_v1 | amqqueue_v2, + rabbit_amqqueue:name(), + pid() | ra_server_id() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, + map(), + atom()) -> amqqueue(). + +new_with_version(?record_version, + #resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + Type) + when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) andalso + is_atom(Type) -> + #amqqueue{name = Name, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = Pid, + vhost = VHost, + options = Options, + type = ensure_type_compat(Type)}; +new_with_version(Version, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + Type) + when ?is_backwards_compat_classic(Type) -> + amqqueue_v1:new_with_version( + Version, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options). + +-spec is_amqqueue(any()) -> boolean(). + +is_amqqueue(#amqqueue{}) -> true; +is_amqqueue(Queue) -> amqqueue_v1:is_amqqueue(Queue). + +-spec record_version_to_use() -> amqqueue_v1 | amqqueue_v2. + +record_version_to_use() -> + case rabbit_feature_flags:is_enabled(quorum_queue) of + true -> ?record_version; + false -> amqqueue_v1:record_version_to_use() + end. + +-spec upgrade(amqqueue()) -> amqqueue(). + +upgrade(#amqqueue{} = Queue) -> Queue; +upgrade(OldQueue) -> upgrade_to(record_version_to_use(), OldQueue). + +-spec upgrade_to +(amqqueue_v2, amqqueue()) -> amqqueue_v2(); +(amqqueue_v1, amqqueue_v1:amqqueue_v1()) -> amqqueue_v1:amqqueue_v1(). + +upgrade_to(?record_version, #amqqueue{} = Queue) -> + Queue; +upgrade_to(?record_version, OldQueue) -> + Fields = erlang:tuple_to_list(OldQueue) ++ [?amqqueue_v1_type, + undefined], + #amqqueue{} = erlang:list_to_tuple(Fields); +upgrade_to(Version, OldQueue) -> + amqqueue_v1:upgrade_to(Version, OldQueue). + +% arguments + +-spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table(). + +get_arguments(#amqqueue{arguments = Args}) -> + Args; +get_arguments(Queue) -> + amqqueue_v1:get_arguments(Queue). + +-spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue(). + +set_arguments(#amqqueue{} = Queue, Args) -> + Queue#amqqueue{arguments = Args}; +set_arguments(Queue, Args) -> + amqqueue_v1:set_arguments(Queue, Args). + +% decorators + +-spec get_decorators(amqqueue()) -> [atom()] | none | undefined. + +get_decorators(#amqqueue{decorators = Decorators}) -> + Decorators; +get_decorators(Queue) -> + amqqueue_v1:get_decorators(Queue). + +-spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue(). + +set_decorators(#amqqueue{} = Queue, Decorators) -> + Queue#amqqueue{decorators = Decorators}; +set_decorators(Queue, Decorators) -> + amqqueue_v1:set_decorators(Queue, Decorators). + +-spec get_exclusive_owner(amqqueue()) -> pid() | none. + +get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> + Owner; +get_exclusive_owner(Queue) -> + amqqueue_v1:get_exclusive_owner(Queue). + +% gm_pids + +-spec get_gm_pids(amqqueue()) -> [{pid(), pid()}] | none. + +get_gm_pids(#amqqueue{gm_pids = GMPids}) -> + GMPids; +get_gm_pids(Queue) -> + amqqueue_v1:get_gm_pids(Queue). + +-spec set_gm_pids(amqqueue(), [{pid(), pid()}] | none) -> amqqueue(). + +set_gm_pids(#amqqueue{} = Queue, GMPids) -> + Queue#amqqueue{gm_pids = GMPids}; +set_gm_pids(Queue, GMPids) -> + amqqueue_v1:set_gm_pids(Queue, GMPids). + +-spec get_leader(amqqueue_v2()) -> node(). + +get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader. + +% operator_policy + +-spec get_operator_policy(amqqueue()) -> binary() | none | undefined. + +get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy; +get_operator_policy(Queue) -> amqqueue_v1:get_operator_policy(Queue). + +-spec set_operator_policy(amqqueue(), binary() | none | undefined) -> + amqqueue(). + +set_operator_policy(#amqqueue{} = Queue, Policy) -> + Queue#amqqueue{operator_policy = Policy}; +set_operator_policy(Queue, Policy) -> + amqqueue_v1:set_operator_policy(Queue, Policy). + +% name + +-spec get_name(amqqueue()) -> rabbit_amqqueue:name(). + +get_name(#amqqueue{name = Name}) -> Name; +get_name(Queue) -> amqqueue_v1:get_name(Queue). + +-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue(). + +set_name(#amqqueue{} = Queue, Name) -> + Queue#amqqueue{name = Name}; +set_name(Queue, Name) -> + amqqueue_v1:set_name(Queue, Name). + +-spec get_options(amqqueue()) -> map(). + +get_options(#amqqueue{options = Options}) -> Options; +get_options(Queue) -> amqqueue_v1:get_options(Queue). + +% pid + +-spec get_pid +(amqqueue_v2()) -> pid() | ra_server_id() | none; +(amqqueue_v1:amqqueue_v1()) -> pid() | none. + +get_pid(#amqqueue{pid = Pid}) -> Pid; +get_pid(Queue) -> amqqueue_v1:get_pid(Queue). + +-spec set_pid +(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(); +(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1(). + +set_pid(#amqqueue{} = Queue, Pid) -> + Queue#amqqueue{pid = Pid}; +set_pid(Queue, Pid) -> + amqqueue_v1:set_pid(Queue, Pid). + +% policy + +-spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined. + +get_policy(#amqqueue{policy = Policy}) -> Policy; +get_policy(Queue) -> amqqueue_v1:get_policy(Queue). + +-spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue(). + +set_policy(#amqqueue{} = Queue, Policy) -> + Queue#amqqueue{policy = Policy}; +set_policy(Queue, Policy) -> + amqqueue_v1:set_policy(Queue, Policy). + +% policy_version + +-spec get_policy_version(amqqueue()) -> non_neg_integer(). + +get_policy_version(#amqqueue{policy_version = PV}) -> + PV; +get_policy_version(Queue) -> + amqqueue_v1:get_policy_version(Queue). + +-spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue(). + +set_policy_version(#amqqueue{} = Queue, PV) -> + Queue#amqqueue{policy_version = PV}; +set_policy_version(Queue, PV) -> + amqqueue_v1:set_policy_version(Queue, PV). + +% recoverable_slaves + +-spec get_recoverable_slaves(amqqueue()) -> [atom()] | none. + +get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) -> + Slaves; +get_recoverable_slaves(Queue) -> + amqqueue_v1:get_recoverable_slaves(Queue). + +-spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue(). + +set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> + Queue#amqqueue{recoverable_slaves = Slaves}; +set_recoverable_slaves(Queue, Slaves) -> + amqqueue_v1:set_recoverable_slaves(Queue, Slaves). + +% type_state (new in v2) + +-spec get_type_state(amqqueue()) -> map(). +get_type_state(#amqqueue{type_state = TState}) -> + TState; +get_type_state(_) -> + #{}. + +-spec set_type_state(amqqueue(), map()) -> amqqueue(). +set_type_state(#amqqueue{} = Queue, TState) -> + Queue#amqqueue{type_state = TState}; +set_type_state(Queue, _TState) -> + Queue. + +% slave_pids + +-spec get_slave_pids(amqqueue()) -> [pid()] | none. + +get_slave_pids(#amqqueue{slave_pids = Slaves}) -> + Slaves; +get_slave_pids(Queue) -> + amqqueue_v1:get_slave_pids(Queue). + +-spec set_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). + +set_slave_pids(#amqqueue{} = Queue, SlavePids) -> + Queue#amqqueue{slave_pids = SlavePids}; +set_slave_pids(Queue, SlavePids) -> + amqqueue_v1:set_slave_pids(Queue, SlavePids). + +% slave_pids_pending_shutdown + +-spec get_slave_pids_pending_shutdown(amqqueue()) -> [pid()]. + +get_slave_pids_pending_shutdown( + #amqqueue{slave_pids_pending_shutdown = Slaves}) -> + Slaves; +get_slave_pids_pending_shutdown(Queue) -> + amqqueue_v1:get_slave_pids_pending_shutdown(Queue). + +-spec set_slave_pids_pending_shutdown(amqqueue(), [pid()]) -> amqqueue(). + +set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> + Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}; +set_slave_pids_pending_shutdown(Queue, SlavePids) -> + amqqueue_v1:set_slave_pids_pending_shutdown(Queue, SlavePids). + +% state + +-spec get_state(amqqueue()) -> atom() | none. + +get_state(#amqqueue{state = State}) -> State; +get_state(Queue) -> amqqueue_v1:get_state(Queue). + +-spec set_state(amqqueue(), atom() | none) -> amqqueue(). + +set_state(#amqqueue{} = Queue, State) -> + Queue#amqqueue{state = State}; +set_state(Queue, State) -> + amqqueue_v1:set_state(Queue, State). + +% sync_slave_pids + +-spec get_sync_slave_pids(amqqueue()) -> [pid()] | none. + +get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> + Pids; +get_sync_slave_pids(Queue) -> + amqqueue_v1:get_sync_slave_pids(Queue). + +-spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). + +set_sync_slave_pids(#amqqueue{} = Queue, Pids) -> + Queue#amqqueue{sync_slave_pids = Pids}; +set_sync_slave_pids(Queue, Pids) -> + amqqueue_v1:set_sync_slave_pids(Queue, Pids). + +%% New in v2. + +-spec get_type(amqqueue()) -> atom(). + +get_type(#amqqueue{type = Type}) -> Type; +get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type. + +-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined. + +get_vhost(#amqqueue{vhost = VHost}) -> VHost; +get_vhost(Queue) -> amqqueue_v1:get_vhost(Queue). + +-spec is_auto_delete(amqqueue()) -> boolean(). + +is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> + AutoDelete; +is_auto_delete(Queue) -> + amqqueue_v1:is_auto_delete(Queue). + +-spec is_durable(amqqueue()) -> boolean(). + +is_durable(#amqqueue{durable = Durable}) -> Durable; +is_durable(Queue) -> amqqueue_v1:is_durable(Queue). + +-spec is_classic(amqqueue()) -> boolean(). + +is_classic(Queue) -> + get_type(Queue) =:= ?amqqueue_v1_type. + +-spec is_quorum(amqqueue()) -> boolean(). + +is_quorum(Queue) -> + get_type(Queue) =:= rabbit_quorum_queue. + +fields() -> + case record_version_to_use() of + ?record_version -> fields(?record_version); + _ -> amqqueue_v1:fields() + end. + +fields(?record_version) -> record_info(fields, amqqueue); +fields(Version) -> amqqueue_v1:fields(Version). + +field_vhost() -> + case record_version_to_use() of + ?record_version -> #amqqueue.vhost; + _ -> amqqueue_v1:field_vhost() + end. + +-spec pattern_match_all() -> amqqueue_pattern(). + +pattern_match_all() -> + case record_version_to_use() of + ?record_version -> #amqqueue{_ = '_'}; + _ -> amqqueue_v1:pattern_match_all() + end. + +-spec pattern_match_on_name(rabbit_amqqueue:name()) -> amqqueue_pattern(). + +pattern_match_on_name(Name) -> + case record_version_to_use() of + ?record_version -> #amqqueue{name = Name, _ = '_'}; + _ -> amqqueue_v1:pattern_match_on_name(Name) + end. + +-spec pattern_match_on_type(atom()) -> amqqueue_pattern(). + +pattern_match_on_type(Type) -> + case record_version_to_use() of + ?record_version -> + #amqqueue{type = Type, _ = '_'}; + _ when ?is_backwards_compat_classic(Type) -> + amqqueue_v1:pattern_match_all(); + %% FIXME: We try a pattern which should never match when the + %% `quorum_queue` feature flag is not enabled yet. Is there + %% a better solution? + _ -> + amqqueue_v1:pattern_match_on_name( + rabbit_misc:r(<<0>>, queue, <<0>>)) + end. + +-spec reset_mirroring_and_decorators(amqqueue()) -> amqqueue(). + +reset_mirroring_and_decorators(#amqqueue{} = Queue) -> + Queue#amqqueue{slave_pids = [], + sync_slave_pids = [], + gm_pids = [], + decorators = undefined}; +reset_mirroring_and_decorators(Queue) -> + amqqueue_v1:reset_mirroring_and_decorators(Queue). + +-spec set_immutable(amqqueue()) -> amqqueue(). + +set_immutable(#amqqueue{} = Queue) -> + Queue#amqqueue{pid = none, + slave_pids = [], + sync_slave_pids = none, + recoverable_slaves = none, + gm_pids = none, + policy = none, + decorators = none, + state = none}; +set_immutable(Queue) -> + amqqueue_v1:set_immutable(Queue). + +-spec qnode(amqqueue() | pid() | ra_server_id()) -> node(). + +qnode(Queue) when ?is_amqqueue(Queue) -> + QPid = get_pid(Queue), + qnode(QPid); +qnode(QPid) when is_pid(QPid) -> + node(QPid); +qnode({_, Node}) -> + Node. + +% private + +macros() -> + io:format( + "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n", + [?record_version, record_info(size, amqqueue)]), + %% The field number starts at 2 because the first element is the + %% record name. + macros(record_info(fields, amqqueue), 2). + +macros([Field | Rest], I) -> + io:format( + "-define(~s_field_~s(Q), element(~b, Q)).~n", + [?record_version, Field, I]), + macros(Rest, I + 1); +macros([], _) -> + ok. + +ensure_type_compat(classic) -> + ?amqqueue_v1_type; +ensure_type_compat(Type) -> + Type. |