summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2019-06-13 11:56:14 +0200
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2019-06-13 11:57:39 +0200
commitb7f3fd9a8edfcb5a8c818493440363ce0cc7c790 (patch)
tree71f06767d7b15d7babacdffda3b401cedd6de04f
parent1f5f8bb0863a3ddf14d2044bab5834ef8adba4b1 (diff)
downloadrabbitmq-server-git-b7f3fd9a8edfcb5a8c818493440363ce0cc7c790.tar.gz
amqqueue*: Uniformize API
Now, both modules export the same set of functions. This will help with the backport to `v3.7.x` and thus the life of plugin developers who will not have to have one copy of their plugin per RabbitMQ version.
-rw-r--r--src/amqqueue.erl83
-rw-r--r--src/amqqueue_v1.erl208
2 files changed, 277 insertions, 14 deletions
diff --git a/src/amqqueue.erl b/src/amqqueue.erl
index dda2642c34..35e7f0c4c4 100644
--- a/src/amqqueue.erl
+++ b/src/amqqueue.erl
@@ -19,7 +19,9 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include("amqqueue.hrl").
--export([new/9,
+-export([new/8,
+ new/9,
+ new_with_version/9,
new_with_version/10,
fields/0,
fields/1,
@@ -184,6 +186,40 @@
pid() | none,
rabbit_framing:amqp_table(),
rabbit_types:vhost() | undefined,
+ map()) -> amqqueue().
+
+new(#resource{kind = queue} = Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options)
+ when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso
+ is_boolean(Durable) andalso
+ is_boolean(AutoDelete) andalso
+ (is_pid(Owner) orelse Owner =:= none) andalso
+ is_list(Args) andalso
+ (is_binary(VHost) orelse VHost =:= undefined) andalso
+ is_map(Options) ->
+ new(Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options,
+ ?amqqueue_v1_type).
+
+-spec new(rabbit_amqqueue:name(),
+ pid() | ra_server_id() | none,
+ boolean(),
+ boolean(),
+ pid() | none,
+ rabbit_framing:amqp_table(),
+ rabbit_types:vhost() | undefined,
map(),
atom()) -> amqqueue().
@@ -238,6 +274,44 @@ new(#resource{kind = queue} = Name,
pid() | none,
rabbit_framing:amqp_table(),
rabbit_types:vhost() | undefined,
+ map()) -> amqqueue().
+
+new_with_version(RecordVersion,
+ #resource{kind = queue} = Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options)
+ when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso
+ is_boolean(Durable) andalso
+ is_boolean(AutoDelete) andalso
+ (is_pid(Owner) orelse Owner =:= none) andalso
+ is_list(Args) andalso
+ (is_binary(VHost) orelse VHost =:= undefined) andalso
+ is_map(Options) ->
+ new_with_version(RecordVersion,
+ Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options,
+ ?amqqueue_v1_type).
+
+-spec new_with_version
+(amqqueue_v1 | amqqueue_v2,
+ rabbit_amqqueue:name(),
+ pid() | ra_server_id() | none,
+ boolean(),
+ boolean(),
+ pid() | none,
+ rabbit_framing:amqp_table(),
+ rabbit_types:vhost() | undefined,
map(),
atom()) -> amqqueue().
@@ -359,6 +433,8 @@ get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) ->
get_exclusive_owner(Queue) ->
amqqueue_v1:get_exclusive_owner(Queue).
+% gm_pids
+
-spec get_gm_pids(amqqueue()) -> [{pid(), pid()} | pid()] | none.
get_gm_pids(#amqqueue{gm_pids = GMPids}) ->
@@ -421,7 +497,7 @@ get_pid(#amqqueue{pid = Pid}) -> Pid;
get_pid(Queue) -> amqqueue_v1:get_pid(Queue).
-spec set_pid
-(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2();
+(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2();
(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1().
set_pid(#amqqueue{} = Queue, Pid) ->
@@ -509,7 +585,8 @@ set_slave_pids(Queue, SlavePids) ->
-spec get_slave_pids_pending_shutdown(amqqueue()) -> [pid()].
-get_slave_pids_pending_shutdown(#amqqueue{slave_pids_pending_shutdown = Slaves}) ->
+get_slave_pids_pending_shutdown(
+ #amqqueue{slave_pids_pending_shutdown = Slaves}) ->
Slaves;
get_slave_pids_pending_shutdown(Queue) ->
amqqueue_v1:get_slave_pids_pending_shutdown(Queue).
diff --git a/src/amqqueue_v1.erl b/src/amqqueue_v1.erl
index 0a0073075f..9b739026c6 100644
--- a/src/amqqueue_v1.erl
+++ b/src/amqqueue_v1.erl
@@ -17,9 +17,12 @@
-module(amqqueue_v1).
-include_lib("rabbit_common/include/resource.hrl").
+-include("amqqueue.hrl").
-export([new/8,
+ new/9,
new_with_version/9,
+ new_with_version/10,
fields/0,
fields/1,
field_vhost/0,
@@ -37,7 +40,8 @@
% gm_pids
get_gm_pids/1,
set_gm_pids/2,
- % name
+ get_leader/1,
+ % name (#resource)
get_name/1,
set_name/2,
% operator_policy
@@ -53,6 +57,9 @@
% policy_version
get_policy_version/1,
set_policy_version/2,
+ % quorum_nodes
+ get_quorum_nodes/1,
+ set_quorum_nodes/2,
% recoverable_slaves
get_recoverable_slaves/1,
set_recoverable_slaves/2,
@@ -68,14 +75,19 @@
% sync_slave_pids
get_sync_slave_pids/1,
set_sync_slave_pids/2,
+ get_type/1,
get_vhost/1,
is_amqqueue/1,
is_auto_delete/1,
is_durable/1,
+ is_classic/1,
+ is_quorum/1,
pattern_match_all/0,
pattern_match_on_name/1,
+ pattern_match_on_type/1,
reset_mirroring_and_decorators/1,
set_immutable/1,
+ qnode/1,
macros/0]).
-define(record_version, ?MODULE).
@@ -192,6 +204,42 @@ new(#resource{kind = queue} = Name,
VHost,
Options).
+-spec new(rabbit_amqqueue:name(),
+ pid() | none,
+ boolean(),
+ boolean(),
+ pid() | none,
+ rabbit_framing:amqp_table(),
+ rabbit_types:vhost() | undefined,
+ map(),
+ ?amqqueue_v1_type) -> amqqueue().
+
+new(#resource{kind = queue} = Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options,
+ ?amqqueue_v1_type)
+ when (is_pid(Pid) orelse Pid =:= none) andalso
+ is_boolean(Durable) andalso
+ is_boolean(AutoDelete) andalso
+ (is_pid(Owner) orelse Owner =:= none) andalso
+ is_list(Args) andalso
+ (is_binary(VHost) orelse VHost =:= undefined) andalso
+ is_map(Options) ->
+ new(
+ Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options).
+
-spec new_with_version(amqqueue_v1,
rabbit_amqqueue:name(),
pid() | none,
@@ -227,6 +275,45 @@ new_with_version(?record_version,
vhost = VHost,
options = Options}.
+-spec new_with_version(amqqueue_v1,
+ rabbit_amqqueue:name(),
+ pid() | none,
+ boolean(),
+ boolean(),
+ pid() | none,
+ rabbit_framing:amqp_table(),
+ rabbit_types:vhost() | undefined,
+ map(),
+ ?amqqueue_v1_type) -> amqqueue().
+
+new_with_version(?record_version,
+ #resource{kind = queue} = Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options,
+ ?amqqueue_v1_type)
+ when (is_pid(Pid) orelse Pid =:= none) andalso
+ is_boolean(Durable) andalso
+ is_boolean(AutoDelete) andalso
+ (is_pid(Owner) orelse Owner =:= none) andalso
+ is_list(Args) andalso
+ (is_binary(VHost) orelse VHost =:= undefined) andalso
+ is_map(Options) ->
+ new_with_version(
+ ?record_version,
+ Name,
+ Pid,
+ Durable,
+ AutoDelete,
+ Owner,
+ Args,
+ VHost,
+ Options).
+
-spec is_amqqueue(any()) -> boolean().
is_amqqueue(#amqqueue{}) -> true;
@@ -239,8 +326,7 @@ record_version_to_use() ->
-spec upgrade(amqqueue()) -> amqqueue().
-upgrade(#amqqueue{} = Queue) ->
- Queue.
+upgrade(#amqqueue{} = Queue) -> Queue.
-spec upgrade_to(amqqueue_v1, amqqueue()) -> amqqueue().
@@ -260,66 +346,120 @@ set_arguments(#amqqueue{} = Queue, Args) ->
% decorators
+-spec get_decorators(amqqueue()) -> [atom()] | none | undefined.
+
get_decorators(#amqqueue{decorators = Decorators}) -> Decorators.
+-spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue().
+
set_decorators(#amqqueue{} = Queue, Decorators) ->
Queue#amqqueue{decorators = Decorators}.
+-spec get_exclusive_owner(amqqueue()) -> pid() | none.
+
get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> Owner.
% gm_pids
+-spec get_gm_pids(amqqueue()) -> [{pid(), pid()} | pid()] | none.
+
get_gm_pids(#amqqueue{gm_pids = GMPids}) -> GMPids.
+-spec set_gm_pids(amqqueue(), [{pid(), pid()} | pid()] | none) -> amqqueue().
+
set_gm_pids(#amqqueue{} = Queue, GMPids) ->
Queue#amqqueue{gm_pids = GMPids}.
-% name
-
-get_name(#amqqueue{name = Name}) -> Name.
+-spec get_leader(amqqueue_v1()) -> no_return().
-set_name(#amqqueue{} = Queue, Name) ->
- Queue#amqqueue{name = Name}.
+get_leader(_) -> throw({unsupported, ?record_version, get_leader}).
% operator_policy
+-spec get_operator_policy(amqqueue()) -> binary() | none | undefined.
+
get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy.
+-spec set_operator_policy(amqqueue(), binary() | none | undefined) ->
+ amqqueue().
+
set_operator_policy(#amqqueue{} = Queue, OpPolicy) ->
Queue#amqqueue{operator_policy = OpPolicy}.
+% name
+
+-spec get_name(amqqueue()) -> rabbit_amqqueue:name().
+
+get_name(#amqqueue{name = Name}) -> Name.
+
+-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue().
+
+set_name(#amqqueue{} = Queue, Name) ->
+ Queue#amqqueue{name = Name}.
+
+-spec get_options(amqqueue()) -> map().
+
get_options(#amqqueue{options = Options}) -> Options.
% pid
+-spec get_pid
+(amqqueue_v1:amqqueue_v1()) -> pid() | none.
+
get_pid(#amqqueue{pid = Pid}) -> Pid.
+-spec set_pid
+(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1().
+
set_pid(#amqqueue{} = Queue, Pid) ->
Queue#amqqueue{pid = Pid}.
% policy
+-spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined.
+
get_policy(#amqqueue{policy = Policy}) -> Policy.
+-spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue().
+
set_policy(#amqqueue{} = Queue, Policy) ->
Queue#amqqueue{policy = Policy}.
% policy_version
+-spec get_policy_version(amqqueue()) -> non_neg_integer().
+
get_policy_version(#amqqueue{policy_version = PV}) ->
PV.
+-spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue().
+
set_policy_version(#amqqueue{} = Queue, PV) ->
Queue#amqqueue{policy_version = PV}.
% recoverable_slaves
+-spec get_recoverable_slaves(amqqueue()) -> [atom()] | none.
+
get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) ->
Slaves.
+-spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue().
+
set_recoverable_slaves(#amqqueue{} = Queue, Slaves) ->
Queue#amqqueue{recoverable_slaves = Slaves}.
+% quorum_nodes (new in v2)
+
+-spec get_quorum_nodes(amqqueue()) -> no_return().
+
+get_quorum_nodes(_) -> throw({unsupported, ?record_version, get_quorum_nodes}).
+
+-spec set_quorum_nodes(amqqueue(), [node()]) -> no_return().
+
+set_quorum_nodes(_, _) ->
+ throw({unsupported, ?record_version, set_quorum_nodes}).
+
% slave_pids
get_slave_pids(#amqqueue{slave_pids = Slaves}) ->
@@ -330,7 +470,8 @@ set_slave_pids(#amqqueue{} = Queue, SlavePids) ->
% slave_pids_pending_shutdown
-get_slave_pids_pending_shutdown(#amqqueue{slave_pids_pending_shutdown = Slaves}) ->
+get_slave_pids_pending_shutdown(
+ #amqqueue{slave_pids_pending_shutdown = Slaves}) ->
Slaves.
set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) ->
@@ -338,23 +479,55 @@ set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) ->
% state
+-spec get_state(amqqueue()) -> atom() | none.
+
get_state(#amqqueue{state = State}) -> State.
-set_state(#amqqueue{} = Queue, State) -> Queue#amqqueue{state = State}.
+-spec set_state(amqqueue(), atom() | none) -> amqqueue().
+
+set_state(#amqqueue{} = Queue, State) ->
+ Queue#amqqueue{state = State}.
% sync_slave_pids
-get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> Pids.
+-spec get_sync_slave_pids(amqqueue()) -> [pid()] | none.
+
+get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) ->
+ Pids.
+
+-spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue().
set_sync_slave_pids(#amqqueue{} = Queue, Pids) ->
Queue#amqqueue{sync_slave_pids = Pids}.
+%% New in v2.
+
+-spec get_type(amqqueue()) -> atom().
+
+get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type.
+
+-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined.
+
get_vhost(#amqqueue{vhost = VHost}) -> VHost.
+-spec is_auto_delete(amqqueue()) -> boolean().
+
is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> AutoDelete.
+-spec is_durable(amqqueue()) -> boolean().
+
is_durable(#amqqueue{durable = Durable}) -> Durable.
+-spec is_classic(amqqueue()) -> boolean().
+
+is_classic(Queue) ->
+ get_type(Queue) =:= ?amqqueue_v1_type.
+
+-spec is_quorum(amqqueue()) -> boolean().
+
+is_quorum(Queue) ->
+ get_type(Queue) =:= quorum.
+
fields() -> fields(?record_version).
fields(?record_version) -> record_info(fields, amqqueue).
@@ -370,6 +543,11 @@ pattern_match_all() -> #amqqueue{_ = '_'}.
pattern_match_on_name(Name) -> #amqqueue{name = Name, _ = '_'}.
+-spec pattern_match_on_type(atom()) -> no_return().
+
+pattern_match_on_type(_) ->
+ throw({unsupported, ?record_version, pattern_match_on_type}).
+
reset_mirroring_and_decorators(#amqqueue{} = Queue) ->
Queue#amqqueue{slave_pids = [],
sync_slave_pids = [],
@@ -386,6 +564,14 @@ set_immutable(#amqqueue{} = Queue) ->
decorators = none,
state = none}.
+-spec qnode(amqqueue() | pid()) -> node().
+
+qnode(Queue) when ?is_amqqueue(Queue) ->
+ QPid = get_pid(Queue),
+ qnode(QPid);
+qnode(QPid) when is_pid(QPid) ->
+ node(QPid).
+
macros() ->
io:format(
"-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n",