diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2019-02-16 15:48:26 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2019-02-16 15:48:26 +0300 |
| commit | 391eb24d1f2fd0d2dc381d2e078496bb9d0d2348 (patch) | |
| tree | 4361903e79cffd431387629e8c5cfd900a4d8631 | |
| parent | 7521ab498a4954eac580e0cf19affd1b9e8ae3d6 (diff) | |
| parent | 111773916b2543541414ec5bd9084b97caaf9e5b (diff) | |
| download | rabbitmq-server-git-391eb24d1f2fd0d2dc381d2e078496bb9d0d2348.tar.gz | |
Merge branch 'master' into rabbitmq-server-1873-binding-recovery
| -rw-r--r-- | docs/rabbitmq.conf.example | 6 | ||||
| -rw-r--r-- | priv/schema/rabbit.schema | 10 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server | 1 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 39 | ||||
| -rw-r--r-- | test/config_schema_SUITE_data/rabbit.snippets | 13 | ||||
| -rw-r--r-- | test/dead_lettering_SUITE.erl | 17 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 40 | ||||
| -rw-r--r-- | test/dynamic_qq_SUITE.erl | 17 | ||||
| -rw-r--r-- | test/publisher_confirms_parallel_SUITE.erl | 17 | ||||
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 13 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 34 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 17 |
12 files changed, 80 insertions, 144 deletions
diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example index 81958c89fd..1aa3943a86 100644 --- a/docs/rabbitmq.conf.example +++ b/docs/rabbitmq.conf.example @@ -509,10 +509,10 @@ # net_ticktime = 60 ## Inter-node communication port range. +## The parameters inet_dist_listen_min and inet_dist_listen_max +## can be configured in the classic config format only. ## Related doc guide: https://www.rabbitmq.com/networking.html#epmd-inet-dist-port-range. -## -# inet_dist_listen_min = 25672 -# inet_dist_listen_max = 25692 + ## ---------------------------------------------------------------------------- ## RabbitMQ Management Plugin diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index 2617f558f7..4cc543d99f 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -1352,16 +1352,6 @@ end}. {validators, ["non_zero_positive_integer"]} ]}. -{mapping, "inet_dist_listen_min", "kernel.inet_dist_listen_min",[ - {datatype, [integer]}, - {validators, ["non_zero_positive_integer"]} -]}. - -{mapping, "inet_dist_listen_max", "kernel.inet_dist_listen_max",[ - {datatype, [integer]}, - {validators, ["non_zero_positive_integer"]} -]}. - % ========================== % sysmon_handler section % ========================== diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index b4863057f0..4bb680cccf 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -187,6 +187,7 @@ RABBITMQ_PRELAUNCH_NODENAME="rabbitmqprelaunch${$}@localhost" NOTIFY_SOCKET= \ RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \ ERL_CRASH_DUMP=$ERL_CRASH_DUMP \ +RABBITMQ_CONFIG_ARG_FILE=$RABBITMQ_CONFIG_ARG_FILE \ RABBITMQ_DIST_PORT=$RABBITMQ_DIST_PORT \ ${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \ -boot "${CLEAN_BOOT_FILE}" \ diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index ab3bc6c819..05db4188ba 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -49,7 +49,6 @@ -type bind_ok_or_error() :: 'ok' | bind_errors() | rabbit_types:error( - 'binding_not_found' | {'binding_invalid', string(), [any()]}). -type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()). -type inner_fun() :: @@ -178,19 +177,15 @@ add(Src, Dst, B, ActingUser) -> lock_resource(Src), lock_resource(Dst), [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]], - case (SrcDurable andalso DstDurable andalso - mnesia:read({rabbit_durable_route, B}) =/= []) of - false -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, - fun mnesia:write/3), - x_callback(transaction, Src, add_binding, B), - Serial = rabbit_exchange:serial(Src), - fun () -> - x_callback(Serial, Src, add_binding, B), - ok = rabbit_event:notify( - binding_created, - info(B) ++ [{user_who_performed_action, ActingUser}]) - end; - true -> rabbit_misc:const({error, binding_not_found}) + ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, + fun mnesia:write/3), + x_callback(transaction, Src, add_binding, B), + Serial = rabbit_exchange:serial(Src), + fun () -> + x_callback(Serial, Src, add_binding, B), + ok = rabbit_event:notify( + binding_created, + info(B) ++ [{user_who_performed_action, ActingUser}]) end. -spec remove(rabbit_types:binding()) -> bind_res(). @@ -208,7 +203,10 @@ remove(Binding, InnerFun, ActingUser) -> case mnesia:read(rabbit_route, B, write) of [] -> case mnesia:read(rabbit_durable_route, B, write) of [] -> rabbit_misc:const(ok); - _ -> rabbit_misc:const({error, binding_not_found}) + %% We still delete the binding and run + %% all post-delete functions if there is only + %% a durable route in the database + _ -> remove(Src, Dst, B, ActingUser) end; _ -> case InnerFun(Src, Dst) of ok -> remove(Src, Dst, B, ActingUser); @@ -275,9 +273,8 @@ list_for_source(SrcName) -> -spec list_for_destination (rabbit_types:binding_destination()) -> bindings(). -list_for_destination(DstName) -> - implicit_for_destination(DstName) ++ - mnesia:async_dirty( +list_for_destination(DstName = #resource{virtual_host = VHostPath}) -> + AllBindings = mnesia:async_dirty( fun() -> Route = #route{binding = #binding{destination = DstName, _ = '_'}}, @@ -285,7 +282,11 @@ list_for_destination(DstName) -> #reverse_route{reverse_binding = B} <- mnesia:match_object(rabbit_reverse_route, reverse_route(Route), read)] - end). + end), + Filtered = lists:filter(fun(#binding{source = S}) -> + S =/= ?DEFAULT_EXCHANGE(VHostPath) + end, AllBindings), + implicit_for_destination(DstName) ++ Filtered. implicit_bindings(VHostPath) -> DstQueues = rabbit_amqqueue:list_names(VHostPath), diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets index 0cd274b757..9afddd9b1e 100644 --- a/test/config_schema_SUITE_data/rabbit.snippets +++ b/test/config_schema_SUITE_data/rabbit.snippets @@ -580,19 +580,6 @@ credential_validator.regexp = ^abc\\d+", ]}], []}, - {kernel_inet_dist_listen_min, - "inet_dist_listen_min = 16000", - [{kernel, [ - {inet_dist_listen_min, 16000} - ]}], - []}, - {kernel_inet_dist_listen_max, - "inet_dist_listen_max = 16100", - [{kernel, [ - {inet_dist_listen_max, 16100} - ]}], - []}, - {log_syslog_settings, "log.syslog = true log.syslog.identity = rabbitmq diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl index 6722958973..c255949d67 100644 --- a/test/dead_lettering_SUITE.erl +++ b/test/dead_lettering_SUITE.erl @@ -93,23 +93,14 @@ init_per_group(classic_queue, Config) -> [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, false}]); init_per_group(quorum_queue, Config) -> - Nodes = rabbit_ct_broker_helpers:get_node_configs( - Config, nodename), - Ret = rabbit_ct_broker_helpers:rpc( - Config, 0, - rabbit_feature_flags, - is_supported_remotely, - [Nodes, [quorum_queue], 60000]), - case Ret of - true -> - ok = rabbit_ct_broker_helpers:rpc( - Config, 0, rabbit_feature_flags, enable, [quorum_queue]), + case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of + ok -> rabbit_ct_helpers:set_config( Config, [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, {queue_durable, true}]); - false -> - {skip, "Quorum queues are unsupported"} + Skip -> + Skip end; init_per_group(mirrored_queue, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 3bdf7bb009..023f55648b 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -61,7 +61,7 @@ groups() -> promote_on_shutdown, promote_on_failure, slave_recovers_after_vhost_failure, - slave_recovers_after_vhost_down_an_up, + slave_recovers_after_vhost_down_and_up, master_migrates_on_vhost_down, slave_recovers_after_vhost_down_and_master_migrated, queue_survive_adding_dead_vhost_mirror @@ -133,7 +133,7 @@ change_policy(Config) -> %% When we first declare a queue with no policy, it's not HA. amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME}), - timer:sleep(100), + timer:sleep(200), assert_slaves(A, ?QNAME, {A, ''}), %% Give it policy "all", it becomes HA and gets all mirrors @@ -417,7 +417,7 @@ slave_recovers_after_vhost_failure(Config) -> ACh = rabbit_ct_client_helpers:open_channel(Config, A), QName = <<"slave_recovers_after_vhost_failure-q">>, amqp_channel:call(ACh, #'queue.declare'{queue = QName}), - timer:sleep(300), + timer:sleep(500), assert_slaves(A, QName, {A, [B]}, [{A, []}]), %% Crash vhost on a node hosting a mirror @@ -426,22 +426,25 @@ slave_recovers_after_vhost_failure(Config) -> assert_slaves(A, QName, {A, [B]}, [{A, []}]). -slave_recovers_after_vhost_down_an_up(Config) -> +slave_recovers_after_vhost_down_and_up(Config) -> [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), rabbit_ct_broker_helpers:set_ha_policy_all(Config), ACh = rabbit_ct_client_helpers:open_channel(Config, A), - QName = <<"slave_recovers_after_vhost_down_an_up-q">>, + QName = <<"slave_recovers_after_vhost_down_and_up-q">>, amqp_channel:call(ACh, #'queue.declare'{queue = QName}), - timer:sleep(100), + timer:sleep(200), assert_slaves(A, QName, {A, [B]}, [{A, []}]), %% Crash vhost on a node hosting a mirror rabbit_ct_broker_helpers:force_vhost_failure(Config, B, <<"/">>), - %% Vhost is down now - false = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, is_vhost_alive, [<<"/">>]), - timer:sleep(300), + %% rabbit_ct_broker_helpers:force_vhost_failure/2 will retry up to 10 times to + %% make sure that the top vhost supervision tree process did go down. MK. + timer:sleep(500), %% Vhost is back up - {ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]), + case rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]) of + {ok, _Sup} -> ok; + {error,{already_started, _Sup}} -> ok + end, assert_slaves(A, QName, {A, [B]}, [{A, []}]). @@ -451,12 +454,12 @@ master_migrates_on_vhost_down(Config) -> ACh = rabbit_ct_client_helpers:open_channel(Config, A), QName = <<"master_migrates_on_vhost_down-q">>, amqp_channel:call(ACh, #'queue.declare'{queue = QName}), - timer:sleep(100), + timer:sleep(200), assert_slaves(A, QName, {A, [B]}, [{A, []}]), %% Crash vhost on the node hosting queue master rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>), - timer:sleep(300), + timer:sleep(500), assert_slaves(A, QName, {B, []}). slave_recovers_after_vhost_down_and_master_migrated(Config) -> @@ -465,16 +468,19 @@ slave_recovers_after_vhost_down_and_master_migrated(Config) -> ACh = rabbit_ct_client_helpers:open_channel(Config, A), QName = <<"slave_recovers_after_vhost_down_and_master_migrated-q">>, amqp_channel:call(ACh, #'queue.declare'{queue = QName}), - timer:sleep(100), + timer:sleep(200), assert_slaves(A, QName, {A, [B]}, [{A, []}]), %% Crash vhost on the node hosting queue master rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>), - timer:sleep(300), + timer:sleep(500), assert_slaves(B, QName, {B, []}), %% Restart the vhost on the node (previously) hosting queue master - {ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]), - timer:sleep(300), + case rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]) of + {ok, _Sup} -> ok; + {error,{already_started, _Sup}} -> ok + end, + timer:sleep(500), assert_slaves(B, QName, {B, [A]}, [{B, []}]). random_policy(Config) -> @@ -569,7 +575,7 @@ assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Att State -> ct:pal("Waiting to leave state ~p~n Waiting for ~p~n", [State, {ExpMNode, ExpSNodes}]), - timer:sleep(100), + timer:sleep(200), assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Attempts - 1) diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl index 89344af30c..c59ddcc1fd 100644 --- a/test/dynamic_qq_SUITE.erl +++ b/test/dynamic_qq_SUITE.erl @@ -87,21 +87,12 @@ init_per_testcase(Testcase, Config) -> Config1, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()), - Nodes = rabbit_ct_broker_helpers:get_node_configs( - Config2, nodename), - Ret = rabbit_ct_broker_helpers:rpc( - Config2, 0, - rabbit_feature_flags, - is_supported_remotely, - [Nodes, [quorum_queue], 60000]), - case Ret of - true -> - ok = rabbit_ct_broker_helpers:rpc( - Config2, 0, rabbit_feature_flags, enable, [quorum_queue]), + case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of + ok -> Config2; - false -> + Skip -> end_per_testcase(Testcase, Config2), - {skip, "Quorum queues are unsupported"} + Skip end. end_per_testcase(Testcase, Config) -> diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl index aa15f9e420..c0c7c3f973 100644 --- a/test/publisher_confirms_parallel_SUITE.erl +++ b/test/publisher_confirms_parallel_SUITE.erl @@ -74,23 +74,14 @@ init_per_group(classic_queue, Config) -> [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]); init_per_group(quorum_queue, Config) -> - Nodes = rabbit_ct_broker_helpers:get_node_configs( - Config, nodename), - Ret = rabbit_ct_broker_helpers:rpc( - Config, 0, - rabbit_feature_flags, - is_supported_remotely, - [Nodes, [quorum_queue], 60000]), - case Ret of - true -> - ok = rabbit_ct_broker_helpers:rpc( - Config, 0, rabbit_feature_flags, enable, [quorum_queue]), + case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of + ok -> rabbit_ct_helpers:set_config( Config, [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, {queue_durable, true}]); - false -> - {skip, "Quorum queues are unsupported"} + Skip -> + Skip end; init_per_group(mirrored_queue, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index 1290c97b23..8d226a4327 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -88,10 +88,15 @@ init_per_group(classic_queue, Config) -> [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]); init_per_group(quorum_queue, Config) -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, - {queue_durable, true}]); + case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of + ok -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); + Skip -> + Skip + end; init_per_group(mirrored_queue, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index e9f49039ed..43a353d0ea 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -151,17 +151,8 @@ init_per_group(Group, Config) -> Config2 = rabbit_ct_helpers:run_steps(Config1b, [fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()), - Nodes = rabbit_ct_broker_helpers:get_node_configs( - Config2, nodename), - Ret = rabbit_ct_broker_helpers:rpc( - Config2, 0, - rabbit_feature_flags, - is_supported_remotely, - [Nodes, [quorum_queue], 60000]), - case Ret of - true -> - ok = rabbit_ct_broker_helpers:rpc( - Config2, 0, rabbit_feature_flags, enable, [quorum_queue]), + case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of + ok -> ok = rabbit_ct_broker_helpers:rpc( Config2, 0, application, set_env, [rabbit, channel_queue_cleanup_interval, 100]), @@ -174,9 +165,9 @@ init_per_group(Group, Config) -> _ -> Config2 end; - false -> + Skip -> end_per_group(Group, Config2), - {skip, "Quorum queues are unsupported"} + Skip end. end_per_group(clustered, Config) -> @@ -206,21 +197,12 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ rabbit_ct_client_helpers:setup_steps() ++ [fun rabbit_ct_broker_helpers:enable_dist_proxy/1, fun rabbit_ct_broker_helpers:cluster_nodes/1]), - Nodes = rabbit_ct_broker_helpers:get_node_configs( - Config3, nodename), - Ret = rabbit_ct_broker_helpers:rpc( - Config3, 0, - rabbit_feature_flags, - is_supported_remotely, - [Nodes, [quorum_queue], 60000]), - case Ret of - true -> - ok = rabbit_ct_broker_helpers:rpc( - Config3, 0, rabbit_feature_flags, enable, [quorum_queue]), + case rabbit_ct_broker_helpers:enable_feature_flag(Config3, quorum_queue) of + ok -> Config3; - false -> + Skip -> end_per_testcase(Testcase, Config3), - {skip, "Quorum queues are unsupported"} + Skip end; init_per_testcase(Testcase, Config) -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 9518e06196..680a576554 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -98,23 +98,14 @@ init_per_group(max_length_classic, Config) -> [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, false}]); init_per_group(max_length_quorum, Config) -> - Nodes = rabbit_ct_broker_helpers:get_node_configs( - Config, nodename), - Ret = rabbit_ct_broker_helpers:rpc( - Config, 0, - rabbit_feature_flags, - is_supported_remotely, - [Nodes, [quorum_queue], 60000]), - case Ret of - true -> - ok = rabbit_ct_broker_helpers:rpc( - Config, 0, rabbit_feature_flags, enable, [quorum_queue]), + case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of + ok -> rabbit_ct_helpers:set_config( Config, [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, {queue_durable, true}]); - false -> - {skip, "Quorum queues are unsupported"} + Skip -> + Skip end; init_per_group(max_length_mirrored, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, |
