diff options
| -rw-r--r-- | docs/rabbitmq.conf.example | 12 | ||||
| -rw-r--r-- | priv/schema/rabbit.schema | 7 | ||||
| -rw-r--r-- | src/rabbit_feature_flags.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 36 | ||||
| -rw-r--r-- | test/dead_lettering_SUITE.erl | 22 | ||||
| -rw-r--r-- | test/publisher_confirms_parallel_SUITE.erl | 22 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/rabbitmqctl_integration_SUITE.erl | 10 | ||||
| -rw-r--r-- | test/unit_SUITE.erl | 5 | ||||
| -rw-r--r-- | test/unit_log_config_SUITE.erl | 24 |
12 files changed, 110 insertions, 51 deletions
diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example index b82956a267..42aea4acf5 100644 --- a/docs/rabbitmq.conf.example +++ b/docs/rabbitmq.conf.example @@ -38,19 +38,19 @@ ## and TLS listeners. ## # num_acceptors.tcp = 10 -# num_acceptors.ssl = 1 +# num_acceptors.ssl = 10 -## Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection -## and TLS handshake), in milliseconds. +## Maximum amount of time allowed for the AMQP 0-9-1 and AMQP 1.0 handshake +## (performed after socket connection and TLS handshake) to complete, in milliseconds. ## # handshake_timeout = 10000 ## Set to 'true' to perform reverse DNS lookups when accepting a -## connection. Hostnames will then be shown instead of IP addresses -## in rabbitmqctl and the management plugin. +## connection. rabbitmqctl and management UI will then display hostnames +## instead of IP addresses. Default value is `false`. ## -# reverse_dns_lookups = true +# reverse_dns_lookups = false ## ## Security, Access Control diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index 9bccb9a89e..f24e74c9ef 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -1332,6 +1332,13 @@ end}. {datatype, string} ]}. +{mapping, "log.ra.level", "rabbit.log.categories.ra.level", [ + {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}} +]}. +{mapping, "log.ra.file", "rabbit.log.categories.ra.file", [ + {datatype, string} +]}. + {mapping, "log.default.level", "rabbit.log.categories.default.level", [ {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}} ]}. diff --git a/src/rabbit_feature_flags.erl b/src/rabbit_feature_flags.erl index e5d59b1a47..8ba38179a5 100644 --- a/src/rabbit_feature_flags.erl +++ b/src/rabbit_feature_flags.erl @@ -1132,7 +1132,7 @@ try_to_write_enabled_feature_flags_list(FeatureNames) -> enabled_feature_flags_list_file() -> case application:get_env(rabbit, feature_flags_file) of {ok, Val} -> Val; - _ -> filename:join([rabbit_mnesia:dir(), "feature_flags"]) + undefined -> throw(feature_flags_file_not_set) end. %% ------------------------------------------------------------------- diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 58e91f3d19..2580c1bfc0 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -659,7 +659,7 @@ tick(_Ts, #state{name = Name, EnqueueBytes, CheckoutBytes}, [{mod_call, rabbit_quorum_queue, - update_metrics, [QName, Metrics]}, {aux, emit}]. + handle_tick, [QName, Metrics]}, {aux, emit}]. -spec overview(state()) -> map(). overview(#state{consumers = Cons, diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 5cc73b5d63..ef0991f675 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -73,7 +73,6 @@ {maybe(term()), rabbit_fifo:command()}}, consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() => #consumer{}}, - priority = normal :: normal | low, block_handler = fun() -> ok end :: fun(() -> ok), unblock_handler = fun() -> ok end :: fun(() -> ok), timeout :: non_neg_integer() @@ -710,7 +709,6 @@ consumer_id(ConsumerTag) -> send_command(Server, Correlation, Command, Priority, #state{pending = Pending, - priority = Priority, soft_limit = SftLmt} = State0) -> {Seq, State} = next_seq(State0), ok = ra:pipeline_command(Server, Command, Seq, Priority), @@ -719,20 +717,7 @@ send_command(Server, Correlation, Command, Priority, false -> ok end, {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}}, - priority = Priority, - slow = Tag == slow}}; -%% once a low priority command has been sent it's not possible to then -%% send a normal priority command without risking that commands are -%% re-ordered. From an AMQP 0.9.1 point of view this should only affect -%% channels that _both_ publish and consume as the enqueue operation is the -%% only low priority one that is sent. -send_command(Node, Correlation, Command, normal, - #state{priority = low} = State) -> - send_command(Node, Correlation, Command, low, State); -send_command(Node, Correlation, Command, low, - #state{priority = normal} = State) -> - send_command(Node, Correlation, Command, low, - State#state{priority = low}). + slow = Tag == slow}}. resend_command(Node, Correlation, Command, #state{pending = Pending} = State0) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 8a6d7eee5b..d0d464cda3 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -28,7 +28,7 @@ -export([cluster_state/1, status/2]). -export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). --export([become_leader/2, update_metrics/2]). +-export([become_leader/2, handle_tick/2]). -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). @@ -62,8 +62,9 @@ single_active_consumer_ctag ]). --define(TICK_TIME, 1000). %% the ra server tick time --define(DELETE_TIMEOUT, 5000). %% the ra server tick time +-define(RPC_TIMEOUT, 1000). +-define(TICK_TIMEOUT, 5000). %% the ra server tick time +-define(DELETE_TIMEOUT, 5000). %%---------------------------------------------------------------------------- @@ -124,6 +125,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> friendly_name => FName, initial_members => ServerIds, log_init_args => #{uid => UId}, + tick_timeout => ?TICK_TIMEOUT, machine => RaMachine} end || ServerId <- ServerIds], @@ -220,7 +222,7 @@ become_leader(QName, Name) -> {ok, Q0} when ?is_amqqueue(Q0) -> Nodes = amqqueue:get_quorum_nodes(Q0), [rpc:call(Node, ?MODULE, rpc_delete_metrics, - [QName], ?TICK_TIME) + [QName], ?RPC_TIMEOUT) || Node <- Nodes, Node =/= node()]; _ -> ok @@ -232,7 +234,7 @@ rpc_delete_metrics(QName) -> ets:delete(queue_metrics, QName), ok. -update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> +handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> %% this makes calls to remote processes so cannot be run inside the %% ra server _ = spawn(fun() -> @@ -255,8 +257,26 @@ update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> {messages_unacknowledged, MU}, {reductions, R}]) end), + ok = repair_leader_record(QName), ok. +repair_leader_record(QName) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + Node = node(), + case amqqueue:get_pid(Q) of + {_, Node} -> + %% it's ok - we don't need to do anything + ok; + _ -> + rabbit_log:debug("~s: repairing leader record", + [rabbit_misc:rs(QName)]), + {_, Name} = erlang:process_info(self(), registered_name), + become_leader(QName, Name) + end, + ok. + + + reductions(Name) -> try {reductions, R} = process_info(whereis(Name), reductions), @@ -343,7 +363,7 @@ delete(Q, end, ok = delete_queue_data(QName, ActingUser), rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], - ?TICK_TIME), + ?RPC_TIMEOUT), {ok, ReadyMsgs}; {error, {no_more_servers_to_try, Errs}} -> case lists:all(fun({{error, noproc}, _}) -> true; @@ -824,7 +844,7 @@ i(memory, Q) when ?is_amqqueue(Q) -> i(state, Q) when ?is_amqqueue(Q) -> {Name, Node} = amqqueue:get_pid(Q), %% Check against the leader or last known leader - case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of + case rpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of {badrpc, _} -> down; State -> State end; @@ -899,7 +919,7 @@ format(Q) when ?is_amqqueue(Q) -> [{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}]. is_process_alive(Name, Node) -> - erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)). + erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)). -spec quorum_messages(atom()) -> non_neg_integer(). diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl index f2241f5f6f..72c72b096c 100644 --- a/test/dead_lettering_SUITE.erl +++ b/test/dead_lettering_SUITE.erl @@ -93,10 +93,24 @@ init_per_group(classic_queue, Config) -> [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, false}]); init_per_group(quorum_queue, Config) -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, - {queue_durable, true}]); + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + Ret = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_feature_flags, + is_supported_remotely, + [Nodes, [quorum_queue], 60000]), + case Ret of + true -> + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_feature_flags, enable, [quorum_queue]), + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); + false -> + {skip, "Quorum queues are unsupported"} + end; init_per_group(mirrored_queue, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl index 45484dd92b..aa15f9e420 100644 --- a/test/publisher_confirms_parallel_SUITE.erl +++ b/test/publisher_confirms_parallel_SUITE.erl @@ -74,10 +74,24 @@ init_per_group(classic_queue, Config) -> [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]); init_per_group(quorum_queue, Config) -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, - {queue_durable, true}]); + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + Ret = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_feature_flags, + is_supported_remotely, + [Nodes, [quorum_queue], 60000]), + case Ret of + true -> + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_feature_flags, enable, [quorum_queue]), + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); + false -> + {skip, "Quorum queues are unsupported"} + end; init_per_group(mirrored_queue, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index b8f6a110b3..293a597d14 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -52,7 +52,7 @@ end_per_group(_, Config) -> init_per_testcase(TestCase, Config) -> meck:new(rabbit_quorum_queue, [passthrough]), - meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end), + meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _) -> ok end), meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end), ra_server_sup_sup:remove_all(), diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl index 87d076f694..3e0c474f2e 100644 --- a/test/rabbitmqctl_integration_SUITE.erl +++ b/test/rabbitmqctl_integration_SUITE.erl @@ -17,6 +17,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). -export([all/0 ,groups/0 @@ -166,14 +167,7 @@ assert_ctl_queues(Config, Node, Args, Expected0) -> Expected = lists:sort(Expected0), Got0 = run_list_queues(Config, Node, Args), Got = lists:sort(lists:map(fun hd/1, Got0)), - case Got of - Expected -> - ok; - _ -> - ct:pal(error, "Listing queues on node ~p failed. Expected:~n~p~n~nGot:~n~p~n~n", - [Node, Expected, Got]), - exit({list_queues_unexpected_on, Node, Expected, Got}) - end. + ?assertMatch(Expected, Got). run_list_queues(Config, Node, Args) -> rabbit_ct_broker_helpers:rabbitmqctl_list(Config, Node, ["list_queues"] ++ Args ++ ["name", "--no-table-headers"]). diff --git a/test/unit_SUITE.erl b/test/unit_SUITE.erl index 1c3b453c04..7a41e419e9 100644 --- a/test/unit_SUITE.erl +++ b/test/unit_SUITE.erl @@ -80,6 +80,7 @@ init_per_testcase(TC, Config) when TC =:= decrypt_start_app; TC =:= decrypt_start_app_file; TC =:= decrypt_start_app_undefined -> application:load(rabbit), + application:set_env(rabbit, feature_flags_file, ""), Config; init_per_testcase(_Testcase, Config) -> Config. @@ -262,7 +263,7 @@ decrypt_start_app_undefined(Config) -> rabbit:start_apps([rabbit_shovel_test], #{rabbit => temporary}) catch exit:{bad_configuration, config_entry_decoder} -> ok; - _:_ -> exit(unexpected_exception) + _:Exception -> exit({unexpected_exception, Exception}) end. decrypt_start_app_wrong_passphrase(Config) -> @@ -282,7 +283,7 @@ decrypt_start_app_wrong_passphrase(Config) -> rabbit:start_apps([rabbit_shovel_test], #{rabbit => temporary}) catch exit:{decryption_error,_,_} -> ok; - _:_ -> exit(unexpected_exception) + _:Exception -> exit({unexpected_exception, Exception}) end. rabbitmqctl_encode(_Config) -> diff --git a/test/unit_log_config_SUITE.erl b/test/unit_log_config_SUITE.erl index e49632d06d..584aa76760 100644 --- a/test/unit_log_config_SUITE.erl +++ b/test/unit_log_config_SUITE.erl @@ -125,6 +125,10 @@ sink_rewrite_sinks() -> {rabbit_log_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]}, + {rabbit_log_ldap_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,inherit]}]}]}, {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]}, @@ -210,6 +214,10 @@ sink_handlers_merged_with_lager_extra_sinks_handlers(_) -> {rabbit_log_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]}, + {rabbit_log_ldap_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,inherit]}]}]}, {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]}, @@ -287,6 +295,10 @@ level_sinks() -> {rabbit_log_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]}, + {rabbit_log_ldap_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,inherit]}]}]}, {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,error]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,error]}]}]}, @@ -377,6 +389,10 @@ file_sinks() -> {rabbit_log_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]}, + {rabbit_log_ldap_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,inherit]}]}]}, {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]}, @@ -610,6 +626,10 @@ default_expected_sinks(UpgradeFile) -> {rabbit_log_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]}, + {rabbit_log_ldap_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,inherit]}]}]}, {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]}, @@ -674,6 +694,10 @@ tty_expected_sinks() -> {rabbit_log_lager_event, [{handlers, [{lager_forwarder_backend,[lager_event,inherit]}]}, {rabbit_handlers, [{lager_forwarder_backend,[lager_event,inherit]}]}]}, + {rabbit_log_ldap_lager_event, + [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, + {rabbit_handlers, + [{lager_forwarder_backend,[lager_event,inherit]}]}]}, {rabbit_log_mirroring_lager_event, [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}, {rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]}, |
