diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 22 |
2 files changed, 42 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ce73460603..7e0b69e2f6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -335,7 +335,8 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args, policy_version = 0, slave_pids_pending_shutdown = [], vhost = VHost, - options = #{user => ActingUser}})), + options = #{user => ActingUser}, + type = get_queue_type(Args)})), Node1 = case rabbit_queue_master_location_misc:get_location(Q) of {ok, Node0} -> Node0; @@ -353,6 +354,14 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args, [rabbit_misc:rs(QueueName), Node1, Error]) end. +get_queue_type(Args) -> + case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of + undefined -> + classic; + {_, V} -> + binary_to_atom(V, utf8) + end. + internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> @@ -577,7 +586,8 @@ declare_args() -> {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}, {<<"x-max-priority">>, fun check_non_neg_int_arg/2}, {<<"x-overflow">>, fun check_overflow/2}, - {<<"x-queue-mode">>, fun check_queue_mode/2}]. + {<<"x-queue-mode">>, fun check_queue_mode/2}, + {<<"x-queue-type">>, fun check_queue_type/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. @@ -640,6 +650,14 @@ check_queue_mode({longstr, Val}, _Args) -> check_queue_mode({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. +check_queue_type({longstr, Val}, _Args) -> + case lists:member(Val, [<<"classic">>, <<"quorum">>]) of + true -> ok; + false -> {error, invalid_queue_type} + end; +check_queue_type({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). list_names() -> mnesia:dirty_all_keys(rabbit_queue). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 498db6e01c..6496130071 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -60,6 +60,7 @@ -rabbit_upgrade({queue_vhost_field, mnesia, [operator_policies]}). -rabbit_upgrade({topic_permission, mnesia, []}). -rabbit_upgrade({queue_options, mnesia, [queue_vhost_field]}). +-rabbit_upgrade({queue_type, mnesia, [queue_options]}). -rabbit_upgrade({exchange_options, mnesia, [operator_policies]}). %% ------------------------------------------------------------------- @@ -98,6 +99,7 @@ -spec operator_policies() -> 'ok'. -spec queue_vhost_field() -> 'ok'. -spec queue_options() -> 'ok'. +-spec queue_type() -> 'ok'. -spec exchange_options() -> 'ok'. @@ -576,6 +578,26 @@ queue_options(Table) -> sync_slave_pids, recoverable_slaves, policy, operator_policy, gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost, options]). +queue_type() -> + ok = queue_type(rabbit_queue), + ok = queue_type(rabbit_durable_queue), + ok. + +queue_type(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators, + State, PolicyVersion, SlavePidsPendingShutdown, VHost, Options}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators, + State, PolicyVersion, SlavePidsPendingShutdown, VHost, Options, classic} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, recoverable_slaves, policy, operator_policy, + gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost, options, + type]). + %% Prior to 3.6.0, passwords were hashed using MD5, this populates %% existing records with said default. Users created with 3.6.0+ will %% have internal_user.hashing_algorithm populated by the internal |
