diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2019-06-13 11:56:14 +0200 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2019-06-13 11:57:39 +0200 |
| commit | b7f3fd9a8edfcb5a8c818493440363ce0cc7c790 (patch) | |
| tree | 71f06767d7b15d7babacdffda3b401cedd6de04f | |
| parent | 1f5f8bb0863a3ddf14d2044bab5834ef8adba4b1 (diff) | |
| download | rabbitmq-server-git-b7f3fd9a8edfcb5a8c818493440363ce0cc7c790.tar.gz | |
amqqueue*: Uniformize API
Now, both modules export the same set of functions. This will help with
the backport to `v3.7.x` and thus the life of plugin developers who will
not have to have one copy of their plugin per RabbitMQ version.
| -rw-r--r-- | src/amqqueue.erl | 83 | ||||
| -rw-r--r-- | src/amqqueue_v1.erl | 208 |
2 files changed, 277 insertions, 14 deletions
diff --git a/src/amqqueue.erl b/src/amqqueue.erl index dda2642c34..35e7f0c4c4 100644 --- a/src/amqqueue.erl +++ b/src/amqqueue.erl @@ -19,7 +19,9 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include("amqqueue.hrl"). --export([new/9, +-export([new/8, + new/9, + new_with_version/9, new_with_version/10, fields/0, fields/1, @@ -184,6 +186,40 @@ pid() | none, rabbit_framing:amqp_table(), rabbit_types:vhost() | undefined, + map()) -> amqqueue(). + +new(#resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options) + when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) -> + new(Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + ?amqqueue_v1_type). + +-spec new(rabbit_amqqueue:name(), + pid() | ra_server_id() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, map(), atom()) -> amqqueue(). @@ -238,6 +274,44 @@ new(#resource{kind = queue} = Name, pid() | none, rabbit_framing:amqp_table(), rabbit_types:vhost() | undefined, + map()) -> amqqueue(). + +new_with_version(RecordVersion, + #resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options) + when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) -> + new_with_version(RecordVersion, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + ?amqqueue_v1_type). + +-spec new_with_version +(amqqueue_v1 | amqqueue_v2, + rabbit_amqqueue:name(), + pid() | ra_server_id() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, map(), atom()) -> amqqueue(). @@ -359,6 +433,8 @@ get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> get_exclusive_owner(Queue) -> amqqueue_v1:get_exclusive_owner(Queue). +% gm_pids + -spec get_gm_pids(amqqueue()) -> [{pid(), pid()} | pid()] | none. get_gm_pids(#amqqueue{gm_pids = GMPids}) -> @@ -421,7 +497,7 @@ get_pid(#amqqueue{pid = Pid}) -> Pid; get_pid(Queue) -> amqqueue_v1:get_pid(Queue). -spec set_pid -(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(); +(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(); (amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1(). set_pid(#amqqueue{} = Queue, Pid) -> @@ -509,7 +585,8 @@ set_slave_pids(Queue, SlavePids) -> -spec get_slave_pids_pending_shutdown(amqqueue()) -> [pid()]. -get_slave_pids_pending_shutdown(#amqqueue{slave_pids_pending_shutdown = Slaves}) -> +get_slave_pids_pending_shutdown( + #amqqueue{slave_pids_pending_shutdown = Slaves}) -> Slaves; get_slave_pids_pending_shutdown(Queue) -> amqqueue_v1:get_slave_pids_pending_shutdown(Queue). diff --git a/src/amqqueue_v1.erl b/src/amqqueue_v1.erl index 0a0073075f..9b739026c6 100644 --- a/src/amqqueue_v1.erl +++ b/src/amqqueue_v1.erl @@ -17,9 +17,12 @@ -module(amqqueue_v1). -include_lib("rabbit_common/include/resource.hrl"). +-include("amqqueue.hrl"). -export([new/8, + new/9, new_with_version/9, + new_with_version/10, fields/0, fields/1, field_vhost/0, @@ -37,7 +40,8 @@ % gm_pids get_gm_pids/1, set_gm_pids/2, - % name + get_leader/1, + % name (#resource) get_name/1, set_name/2, % operator_policy @@ -53,6 +57,9 @@ % policy_version get_policy_version/1, set_policy_version/2, + % quorum_nodes + get_quorum_nodes/1, + set_quorum_nodes/2, % recoverable_slaves get_recoverable_slaves/1, set_recoverable_slaves/2, @@ -68,14 +75,19 @@ % sync_slave_pids get_sync_slave_pids/1, set_sync_slave_pids/2, + get_type/1, get_vhost/1, is_amqqueue/1, is_auto_delete/1, is_durable/1, + is_classic/1, + is_quorum/1, pattern_match_all/0, pattern_match_on_name/1, + pattern_match_on_type/1, reset_mirroring_and_decorators/1, set_immutable/1, + qnode/1, macros/0]). -define(record_version, ?MODULE). @@ -192,6 +204,42 @@ new(#resource{kind = queue} = Name, VHost, Options). +-spec new(rabbit_amqqueue:name(), + pid() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, + map(), + ?amqqueue_v1_type) -> amqqueue(). + +new(#resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + ?amqqueue_v1_type) + when (is_pid(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) -> + new( + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options). + -spec new_with_version(amqqueue_v1, rabbit_amqqueue:name(), pid() | none, @@ -227,6 +275,45 @@ new_with_version(?record_version, vhost = VHost, options = Options}. +-spec new_with_version(amqqueue_v1, + rabbit_amqqueue:name(), + pid() | none, + boolean(), + boolean(), + pid() | none, + rabbit_framing:amqp_table(), + rabbit_types:vhost() | undefined, + map(), + ?amqqueue_v1_type) -> amqqueue(). + +new_with_version(?record_version, + #resource{kind = queue} = Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + ?amqqueue_v1_type) + when (is_pid(Pid) orelse Pid =:= none) andalso + is_boolean(Durable) andalso + is_boolean(AutoDelete) andalso + (is_pid(Owner) orelse Owner =:= none) andalso + is_list(Args) andalso + (is_binary(VHost) orelse VHost =:= undefined) andalso + is_map(Options) -> + new_with_version( + ?record_version, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options). + -spec is_amqqueue(any()) -> boolean(). is_amqqueue(#amqqueue{}) -> true; @@ -239,8 +326,7 @@ record_version_to_use() -> -spec upgrade(amqqueue()) -> amqqueue(). -upgrade(#amqqueue{} = Queue) -> - Queue. +upgrade(#amqqueue{} = Queue) -> Queue. -spec upgrade_to(amqqueue_v1, amqqueue()) -> amqqueue(). @@ -260,66 +346,120 @@ set_arguments(#amqqueue{} = Queue, Args) -> % decorators +-spec get_decorators(amqqueue()) -> [atom()] | none | undefined. + get_decorators(#amqqueue{decorators = Decorators}) -> Decorators. +-spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue(). + set_decorators(#amqqueue{} = Queue, Decorators) -> Queue#amqqueue{decorators = Decorators}. +-spec get_exclusive_owner(amqqueue()) -> pid() | none. + get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> Owner. % gm_pids +-spec get_gm_pids(amqqueue()) -> [{pid(), pid()} | pid()] | none. + get_gm_pids(#amqqueue{gm_pids = GMPids}) -> GMPids. +-spec set_gm_pids(amqqueue(), [{pid(), pid()} | pid()] | none) -> amqqueue(). + set_gm_pids(#amqqueue{} = Queue, GMPids) -> Queue#amqqueue{gm_pids = GMPids}. -% name - -get_name(#amqqueue{name = Name}) -> Name. +-spec get_leader(amqqueue_v1()) -> no_return(). -set_name(#amqqueue{} = Queue, Name) -> - Queue#amqqueue{name = Name}. +get_leader(_) -> throw({unsupported, ?record_version, get_leader}). % operator_policy +-spec get_operator_policy(amqqueue()) -> binary() | none | undefined. + get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy. +-spec set_operator_policy(amqqueue(), binary() | none | undefined) -> + amqqueue(). + set_operator_policy(#amqqueue{} = Queue, OpPolicy) -> Queue#amqqueue{operator_policy = OpPolicy}. +% name + +-spec get_name(amqqueue()) -> rabbit_amqqueue:name(). + +get_name(#amqqueue{name = Name}) -> Name. + +-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue(). + +set_name(#amqqueue{} = Queue, Name) -> + Queue#amqqueue{name = Name}. + +-spec get_options(amqqueue()) -> map(). + get_options(#amqqueue{options = Options}) -> Options. % pid +-spec get_pid +(amqqueue_v1:amqqueue_v1()) -> pid() | none. + get_pid(#amqqueue{pid = Pid}) -> Pid. +-spec set_pid +(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1(). + set_pid(#amqqueue{} = Queue, Pid) -> Queue#amqqueue{pid = Pid}. % policy +-spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined. + get_policy(#amqqueue{policy = Policy}) -> Policy. +-spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue(). + set_policy(#amqqueue{} = Queue, Policy) -> Queue#amqqueue{policy = Policy}. % policy_version +-spec get_policy_version(amqqueue()) -> non_neg_integer(). + get_policy_version(#amqqueue{policy_version = PV}) -> PV. +-spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue(). + set_policy_version(#amqqueue{} = Queue, PV) -> Queue#amqqueue{policy_version = PV}. % recoverable_slaves +-spec get_recoverable_slaves(amqqueue()) -> [atom()] | none. + get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) -> Slaves. +-spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue(). + set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> Queue#amqqueue{recoverable_slaves = Slaves}. +% quorum_nodes (new in v2) + +-spec get_quorum_nodes(amqqueue()) -> no_return(). + +get_quorum_nodes(_) -> throw({unsupported, ?record_version, get_quorum_nodes}). + +-spec set_quorum_nodes(amqqueue(), [node()]) -> no_return(). + +set_quorum_nodes(_, _) -> + throw({unsupported, ?record_version, set_quorum_nodes}). + % slave_pids get_slave_pids(#amqqueue{slave_pids = Slaves}) -> @@ -330,7 +470,8 @@ set_slave_pids(#amqqueue{} = Queue, SlavePids) -> % slave_pids_pending_shutdown -get_slave_pids_pending_shutdown(#amqqueue{slave_pids_pending_shutdown = Slaves}) -> +get_slave_pids_pending_shutdown( + #amqqueue{slave_pids_pending_shutdown = Slaves}) -> Slaves. set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> @@ -338,23 +479,55 @@ set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> % state +-spec get_state(amqqueue()) -> atom() | none. + get_state(#amqqueue{state = State}) -> State. -set_state(#amqqueue{} = Queue, State) -> Queue#amqqueue{state = State}. +-spec set_state(amqqueue(), atom() | none) -> amqqueue(). + +set_state(#amqqueue{} = Queue, State) -> + Queue#amqqueue{state = State}. % sync_slave_pids -get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> Pids. +-spec get_sync_slave_pids(amqqueue()) -> [pid()] | none. + +get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> + Pids. + +-spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). set_sync_slave_pids(#amqqueue{} = Queue, Pids) -> Queue#amqqueue{sync_slave_pids = Pids}. +%% New in v2. + +-spec get_type(amqqueue()) -> atom(). + +get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type. + +-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined. + get_vhost(#amqqueue{vhost = VHost}) -> VHost. +-spec is_auto_delete(amqqueue()) -> boolean(). + is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> AutoDelete. +-spec is_durable(amqqueue()) -> boolean(). + is_durable(#amqqueue{durable = Durable}) -> Durable. +-spec is_classic(amqqueue()) -> boolean(). + +is_classic(Queue) -> + get_type(Queue) =:= ?amqqueue_v1_type. + +-spec is_quorum(amqqueue()) -> boolean(). + +is_quorum(Queue) -> + get_type(Queue) =:= quorum. + fields() -> fields(?record_version). fields(?record_version) -> record_info(fields, amqqueue). @@ -370,6 +543,11 @@ pattern_match_all() -> #amqqueue{_ = '_'}. pattern_match_on_name(Name) -> #amqqueue{name = Name, _ = '_'}. +-spec pattern_match_on_type(atom()) -> no_return(). + +pattern_match_on_type(_) -> + throw({unsupported, ?record_version, pattern_match_on_type}). + reset_mirroring_and_decorators(#amqqueue{} = Queue) -> Queue#amqqueue{slave_pids = [], sync_slave_pids = [], @@ -386,6 +564,14 @@ set_immutable(#amqqueue{} = Queue) -> decorators = none, state = none}. +-spec qnode(amqqueue() | pid()) -> node(). + +qnode(Queue) when ?is_amqqueue(Queue) -> + QPid = get_pid(Queue), + qnode(QPid); +qnode(QPid) when is_pid(QPid) -> + node(QPid). + macros() -> io:format( "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n", |
