summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2018-01-22 16:44:00 +0000
committerGitHub <noreply@github.com>2018-01-22 16:44:00 +0000
commit829e149f05a9e01b70909446b722f5817cf0bc39 (patch)
treeda878b350582548acbc173dc34ed482e9597f413
parent0b1417dfe70ef92121077507d6508a5c7d757bc2 (diff)
parent838656c351f7259feac79f73545d96f04cc5c2c2 (diff)
downloadrabbitmq-server-git-829e149f05a9e01b70909446b722f5817cf0bc39.tar.gz
Merge pull request #1478 from rabbitmq/rabbitmq-server-story-154472130
Declare a quorum queue using the queue.declare method
-rw-r--r--src/rabbit_amqqueue.erl22
-rw-r--r--src/rabbit_upgrade_functions.erl22
2 files changed, 42 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ce73460603..7e0b69e2f6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -335,7 +335,8 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
policy_version = 0,
slave_pids_pending_shutdown = [],
vhost = VHost,
- options = #{user => ActingUser}})),
+ options = #{user => ActingUser},
+ type = get_queue_type(Args)})),
Node1 = case rabbit_queue_master_location_misc:get_location(Q) of
{ok, Node0} -> Node0;
@@ -353,6 +354,14 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
[rabbit_misc:rs(QueueName), Node1, Error])
end.
+get_queue_type(Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
+ undefined ->
+ classic;
+ {_, V} ->
+ binary_to_atom(V, utf8)
+ end.
+
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
@@ -577,7 +586,8 @@ declare_args() ->
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
{<<"x-max-priority">>, fun check_non_neg_int_arg/2},
{<<"x-overflow">>, fun check_overflow/2},
- {<<"x-queue-mode">>, fun check_queue_mode/2}].
+ {<<"x-queue-mode">>, fun check_queue_mode/2},
+ {<<"x-queue-type">>, fun check_queue_type/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
@@ -640,6 +650,14 @@ check_queue_mode({longstr, Val}, _Args) ->
check_queue_mode({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
+check_queue_type({longstr, Val}, _Args) ->
+ case lists:member(Val, [<<"classic">>, <<"quorum">>]) of
+ true -> ok;
+ false -> {error, invalid_queue_type}
+ end;
+check_queue_type({Type, _}, _Args) ->
+ {error, {unacceptable_type, Type}}.
+
list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 498db6e01c..6496130071 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -60,6 +60,7 @@
-rabbit_upgrade({queue_vhost_field, mnesia, [operator_policies]}).
-rabbit_upgrade({topic_permission, mnesia, []}).
-rabbit_upgrade({queue_options, mnesia, [queue_vhost_field]}).
+-rabbit_upgrade({queue_type, mnesia, [queue_options]}).
-rabbit_upgrade({exchange_options, mnesia, [operator_policies]}).
%% -------------------------------------------------------------------
@@ -98,6 +99,7 @@
-spec operator_policies() -> 'ok'.
-spec queue_vhost_field() -> 'ok'.
-spec queue_options() -> 'ok'.
+-spec queue_type() -> 'ok'.
-spec exchange_options() -> 'ok'.
@@ -576,6 +578,26 @@ queue_options(Table) ->
sync_slave_pids, recoverable_slaves, policy, operator_policy,
gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost, options]).
+queue_type() ->
+ ok = queue_type(rabbit_queue),
+ ok = queue_type(rabbit_durable_queue),
+ ok.
+
+queue_type(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators,
+ State, PolicyVersion, SlavePidsPendingShutdown, VHost, Options}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators,
+ State, PolicyVersion, SlavePidsPendingShutdown, VHost, Options, classic}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, operator_policy,
+ gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost, options,
+ type]).
+
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal