summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmq.conf.example6
-rw-r--r--priv/schema/rabbit.schema10
-rwxr-xr-xscripts/rabbitmq-server1
-rw-r--r--src/rabbit_binding.erl39
-rw-r--r--test/config_schema_SUITE_data/rabbit.snippets13
-rw-r--r--test/dead_lettering_SUITE.erl17
-rw-r--r--test/dynamic_ha_SUITE.erl40
-rw-r--r--test/dynamic_qq_SUITE.erl17
-rw-r--r--test/publisher_confirms_parallel_SUITE.erl17
-rw-r--r--test/queue_parallel_SUITE.erl13
-rw-r--r--test/quorum_queue_SUITE.erl34
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl17
12 files changed, 80 insertions, 144 deletions
diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example
index 81958c89fd..1aa3943a86 100644
--- a/docs/rabbitmq.conf.example
+++ b/docs/rabbitmq.conf.example
@@ -509,10 +509,10 @@
# net_ticktime = 60
## Inter-node communication port range.
+## The parameters inet_dist_listen_min and inet_dist_listen_max
+## can be configured in the classic config format only.
## Related doc guide: https://www.rabbitmq.com/networking.html#epmd-inet-dist-port-range.
-##
-# inet_dist_listen_min = 25672
-# inet_dist_listen_max = 25692
+
## ----------------------------------------------------------------------------
## RabbitMQ Management Plugin
diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema
index 2617f558f7..4cc543d99f 100644
--- a/priv/schema/rabbit.schema
+++ b/priv/schema/rabbit.schema
@@ -1352,16 +1352,6 @@ end}.
{validators, ["non_zero_positive_integer"]}
]}.
-{mapping, "inet_dist_listen_min", "kernel.inet_dist_listen_min",[
- {datatype, [integer]},
- {validators, ["non_zero_positive_integer"]}
-]}.
-
-{mapping, "inet_dist_listen_max", "kernel.inet_dist_listen_max",[
- {datatype, [integer]},
- {validators, ["non_zero_positive_integer"]}
-]}.
-
% ==========================
% sysmon_handler section
% ==========================
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index b4863057f0..4bb680cccf 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -187,6 +187,7 @@ RABBITMQ_PRELAUNCH_NODENAME="rabbitmqprelaunch${$}@localhost"
NOTIFY_SOCKET= \
RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \
ERL_CRASH_DUMP=$ERL_CRASH_DUMP \
+RABBITMQ_CONFIG_ARG_FILE=$RABBITMQ_CONFIG_ARG_FILE \
RABBITMQ_DIST_PORT=$RABBITMQ_DIST_PORT \
${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \
-boot "${CLEAN_BOOT_FILE}" \
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index ab3bc6c819..05db4188ba 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -49,7 +49,6 @@
-type bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error(
- 'binding_not_found' |
{'binding_invalid', string(), [any()]}).
-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
-type inner_fun() ::
@@ -178,19 +177,15 @@ add(Src, Dst, B, ActingUser) ->
lock_resource(Src),
lock_resource(Dst),
[SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
- case (SrcDurable andalso DstDurable andalso
- mnesia:read({rabbit_durable_route, B}) =/= []) of
- false -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
- fun mnesia:write/3),
- x_callback(transaction, Src, add_binding, B),
- Serial = rabbit_exchange:serial(Src),
- fun () ->
- x_callback(Serial, Src, add_binding, B),
- ok = rabbit_event:notify(
- binding_created,
- info(B) ++ [{user_who_performed_action, ActingUser}])
- end;
- true -> rabbit_misc:const({error, binding_not_found})
+ ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
+ fun mnesia:write/3),
+ x_callback(transaction, Src, add_binding, B),
+ Serial = rabbit_exchange:serial(Src),
+ fun () ->
+ x_callback(Serial, Src, add_binding, B),
+ ok = rabbit_event:notify(
+ binding_created,
+ info(B) ++ [{user_who_performed_action, ActingUser}])
end.
-spec remove(rabbit_types:binding()) -> bind_res().
@@ -208,7 +203,10 @@ remove(Binding, InnerFun, ActingUser) ->
case mnesia:read(rabbit_route, B, write) of
[] -> case mnesia:read(rabbit_durable_route, B, write) of
[] -> rabbit_misc:const(ok);
- _ -> rabbit_misc:const({error, binding_not_found})
+ %% We still delete the binding and run
+ %% all post-delete functions if there is only
+ %% a durable route in the database
+ _ -> remove(Src, Dst, B, ActingUser)
end;
_ -> case InnerFun(Src, Dst) of
ok -> remove(Src, Dst, B, ActingUser);
@@ -275,9 +273,8 @@ list_for_source(SrcName) ->
-spec list_for_destination
(rabbit_types:binding_destination()) -> bindings().
-list_for_destination(DstName) ->
- implicit_for_destination(DstName) ++
- mnesia:async_dirty(
+list_for_destination(DstName = #resource{virtual_host = VHostPath}) ->
+ AllBindings = mnesia:async_dirty(
fun() ->
Route = #route{binding = #binding{destination = DstName,
_ = '_'}},
@@ -285,7 +282,11 @@ list_for_destination(DstName) ->
#reverse_route{reverse_binding = B} <-
mnesia:match_object(rabbit_reverse_route,
reverse_route(Route), read)]
- end).
+ end),
+ Filtered = lists:filter(fun(#binding{source = S}) ->
+ S =/= ?DEFAULT_EXCHANGE(VHostPath)
+ end, AllBindings),
+ implicit_for_destination(DstName) ++ Filtered.
implicit_bindings(VHostPath) ->
DstQueues = rabbit_amqqueue:list_names(VHostPath),
diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets
index 0cd274b757..9afddd9b1e 100644
--- a/test/config_schema_SUITE_data/rabbit.snippets
+++ b/test/config_schema_SUITE_data/rabbit.snippets
@@ -580,19 +580,6 @@ credential_validator.regexp = ^abc\\d+",
]}],
[]},
- {kernel_inet_dist_listen_min,
- "inet_dist_listen_min = 16000",
- [{kernel, [
- {inet_dist_listen_min, 16000}
- ]}],
- []},
- {kernel_inet_dist_listen_max,
- "inet_dist_listen_max = 16100",
- [{kernel, [
- {inet_dist_listen_max, 16100}
- ]}],
- []},
-
{log_syslog_settings,
"log.syslog = true
log.syslog.identity = rabbitmq
diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl
index 6722958973..c255949d67 100644
--- a/test/dead_lettering_SUITE.erl
+++ b/test/dead_lettering_SUITE.erl
@@ -93,23 +93,14 @@ init_per_group(classic_queue, Config) ->
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, false}]);
init_per_group(quorum_queue, Config) ->
- Nodes = rabbit_ct_broker_helpers:get_node_configs(
- Config, nodename),
- Ret = rabbit_ct_broker_helpers:rpc(
- Config, 0,
- rabbit_feature_flags,
- is_supported_remotely,
- [Nodes, [quorum_queue], 60000]),
- case Ret of
- true ->
- ok = rabbit_ct_broker_helpers:rpc(
- Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
+ ok ->
rabbit_ct_helpers:set_config(
Config,
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
{queue_durable, true}]);
- false ->
- {skip, "Quorum queues are unsupported"}
+ Skip ->
+ Skip
end;
init_per_group(mirrored_queue, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 3bdf7bb009..023f55648b 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -61,7 +61,7 @@ groups() ->
promote_on_shutdown,
promote_on_failure,
slave_recovers_after_vhost_failure,
- slave_recovers_after_vhost_down_an_up,
+ slave_recovers_after_vhost_down_and_up,
master_migrates_on_vhost_down,
slave_recovers_after_vhost_down_and_master_migrated,
queue_survive_adding_dead_vhost_mirror
@@ -133,7 +133,7 @@ change_policy(Config) ->
%% When we first declare a queue with no policy, it's not HA.
amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME}),
- timer:sleep(100),
+ timer:sleep(200),
assert_slaves(A, ?QNAME, {A, ''}),
%% Give it policy "all", it becomes HA and gets all mirrors
@@ -417,7 +417,7 @@ slave_recovers_after_vhost_failure(Config) ->
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
QName = <<"slave_recovers_after_vhost_failure-q">>,
amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
- timer:sleep(300),
+ timer:sleep(500),
assert_slaves(A, QName, {A, [B]}, [{A, []}]),
%% Crash vhost on a node hosting a mirror
@@ -426,22 +426,25 @@ slave_recovers_after_vhost_failure(Config) ->
assert_slaves(A, QName, {A, [B]}, [{A, []}]).
-slave_recovers_after_vhost_down_an_up(Config) ->
+slave_recovers_after_vhost_down_and_up(Config) ->
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
rabbit_ct_broker_helpers:set_ha_policy_all(Config),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
- QName = <<"slave_recovers_after_vhost_down_an_up-q">>,
+ QName = <<"slave_recovers_after_vhost_down_and_up-q">>,
amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
- timer:sleep(100),
+ timer:sleep(200),
assert_slaves(A, QName, {A, [B]}, [{A, []}]),
%% Crash vhost on a node hosting a mirror
rabbit_ct_broker_helpers:force_vhost_failure(Config, B, <<"/">>),
- %% Vhost is down now
- false = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, is_vhost_alive, [<<"/">>]),
- timer:sleep(300),
+ %% rabbit_ct_broker_helpers:force_vhost_failure/2 will retry up to 10 times to
+ %% make sure that the top vhost supervision tree process did go down. MK.
+ timer:sleep(500),
%% Vhost is back up
- {ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]),
+ case rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]) of
+ {ok, _Sup} -> ok;
+ {error,{already_started, _Sup}} -> ok
+ end,
assert_slaves(A, QName, {A, [B]}, [{A, []}]).
@@ -451,12 +454,12 @@ master_migrates_on_vhost_down(Config) ->
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
QName = <<"master_migrates_on_vhost_down-q">>,
amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
- timer:sleep(100),
+ timer:sleep(200),
assert_slaves(A, QName, {A, [B]}, [{A, []}]),
%% Crash vhost on the node hosting queue master
rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>),
- timer:sleep(300),
+ timer:sleep(500),
assert_slaves(A, QName, {B, []}).
slave_recovers_after_vhost_down_and_master_migrated(Config) ->
@@ -465,16 +468,19 @@ slave_recovers_after_vhost_down_and_master_migrated(Config) ->
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
QName = <<"slave_recovers_after_vhost_down_and_master_migrated-q">>,
amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
- timer:sleep(100),
+ timer:sleep(200),
assert_slaves(A, QName, {A, [B]}, [{A, []}]),
%% Crash vhost on the node hosting queue master
rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>),
- timer:sleep(300),
+ timer:sleep(500),
assert_slaves(B, QName, {B, []}),
%% Restart the vhost on the node (previously) hosting queue master
- {ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]),
- timer:sleep(300),
+ case rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]) of
+ {ok, _Sup} -> ok;
+ {error,{already_started, _Sup}} -> ok
+ end,
+ timer:sleep(500),
assert_slaves(B, QName, {B, [A]}, [{B, []}]).
random_policy(Config) ->
@@ -569,7 +575,7 @@ assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Att
State ->
ct:pal("Waiting to leave state ~p~n Waiting for ~p~n",
[State, {ExpMNode, ExpSNodes}]),
- timer:sleep(100),
+ timer:sleep(200),
assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes},
PermittedIntermediate,
Attempts - 1)
diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl
index 89344af30c..c59ddcc1fd 100644
--- a/test/dynamic_qq_SUITE.erl
+++ b/test/dynamic_qq_SUITE.erl
@@ -87,21 +87,12 @@ init_per_testcase(Testcase, Config) ->
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()),
- Nodes = rabbit_ct_broker_helpers:get_node_configs(
- Config2, nodename),
- Ret = rabbit_ct_broker_helpers:rpc(
- Config2, 0,
- rabbit_feature_flags,
- is_supported_remotely,
- [Nodes, [quorum_queue], 60000]),
- case Ret of
- true ->
- ok = rabbit_ct_broker_helpers:rpc(
- Config2, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of
+ ok ->
Config2;
- false ->
+ Skip ->
end_per_testcase(Testcase, Config2),
- {skip, "Quorum queues are unsupported"}
+ Skip
end.
end_per_testcase(Testcase, Config) ->
diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl
index aa15f9e420..c0c7c3f973 100644
--- a/test/publisher_confirms_parallel_SUITE.erl
+++ b/test/publisher_confirms_parallel_SUITE.erl
@@ -74,23 +74,14 @@ init_per_group(classic_queue, Config) ->
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]);
init_per_group(quorum_queue, Config) ->
- Nodes = rabbit_ct_broker_helpers:get_node_configs(
- Config, nodename),
- Ret = rabbit_ct_broker_helpers:rpc(
- Config, 0,
- rabbit_feature_flags,
- is_supported_remotely,
- [Nodes, [quorum_queue], 60000]),
- case Ret of
- true ->
- ok = rabbit_ct_broker_helpers:rpc(
- Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
+ ok ->
rabbit_ct_helpers:set_config(
Config,
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
{queue_durable, true}]);
- false ->
- {skip, "Quorum queues are unsupported"}
+ Skip ->
+ Skip
end;
init_per_group(mirrored_queue, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index 1290c97b23..8d226a4327 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -88,10 +88,15 @@ init_per_group(classic_queue, Config) ->
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]);
init_per_group(quorum_queue, Config) ->
- rabbit_ct_helpers:set_config(
- Config,
- [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
- {queue_durable, true}]);
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
+ ok ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+ Skip ->
+ Skip
+ end;
init_per_group(mirrored_queue, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index e9f49039ed..43a353d0ea 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -151,17 +151,8 @@ init_per_group(Group, Config) ->
Config2 = rabbit_ct_helpers:run_steps(Config1b,
[fun merge_app_env/1 ] ++
rabbit_ct_broker_helpers:setup_steps()),
- Nodes = rabbit_ct_broker_helpers:get_node_configs(
- Config2, nodename),
- Ret = rabbit_ct_broker_helpers:rpc(
- Config2, 0,
- rabbit_feature_flags,
- is_supported_remotely,
- [Nodes, [quorum_queue], 60000]),
- case Ret of
- true ->
- ok = rabbit_ct_broker_helpers:rpc(
- Config2, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of
+ ok ->
ok = rabbit_ct_broker_helpers:rpc(
Config2, 0, application, set_env,
[rabbit, channel_queue_cleanup_interval, 100]),
@@ -174,9 +165,9 @@ init_per_group(Group, Config) ->
_ ->
Config2
end;
- false ->
+ Skip ->
end_per_group(Group, Config2),
- {skip, "Quorum queues are unsupported"}
+ Skip
end.
end_per_group(clustered, Config) ->
@@ -206,21 +197,12 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ
rabbit_ct_client_helpers:setup_steps() ++
[fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
fun rabbit_ct_broker_helpers:cluster_nodes/1]),
- Nodes = rabbit_ct_broker_helpers:get_node_configs(
- Config3, nodename),
- Ret = rabbit_ct_broker_helpers:rpc(
- Config3, 0,
- rabbit_feature_flags,
- is_supported_remotely,
- [Nodes, [quorum_queue], 60000]),
- case Ret of
- true ->
- ok = rabbit_ct_broker_helpers:rpc(
- Config3, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config3, quorum_queue) of
+ ok ->
Config3;
- false ->
+ Skip ->
end_per_testcase(Testcase, Config3),
- {skip, "Quorum queues are unsupported"}
+ Skip
end;
init_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index 9518e06196..680a576554 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -98,23 +98,14 @@ init_per_group(max_length_classic, Config) ->
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, false}]);
init_per_group(max_length_quorum, Config) ->
- Nodes = rabbit_ct_broker_helpers:get_node_configs(
- Config, nodename),
- Ret = rabbit_ct_broker_helpers:rpc(
- Config, 0,
- rabbit_feature_flags,
- is_supported_remotely,
- [Nodes, [quorum_queue], 60000]),
- case Ret of
- true ->
- ok = rabbit_ct_broker_helpers:rpc(
- Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
+ ok ->
rabbit_ct_helpers:set_config(
Config,
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
{queue_durable, true}]);
- false ->
- {skip, "Quorum queues are unsupported"}
+ Skip ->
+ Skip
end;
init_per_group(max_length_mirrored, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,