summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-02-12 01:13:38 +0300
committerMichael Klishin <mklishin@pivotal.io>2019-02-12 01:13:38 +0300
commitf3cd4af7924765b6210dbac064dc4ecd1a5f6c95 (patch)
treeef9c9f16d81c67dc0df72ff4906b3ecae0489c53
parent7afb62dd97400511d5171c7d468d7250e255d3ab (diff)
parent2d2e79ea9cbebd8ae25ce77131b01ddfd8b59b53 (diff)
downloadrabbitmq-server-git-f3cd4af7924765b6210dbac064dc4ecd1a5f6c95.tar.gz
Merge branch 'master' into queues-testing
-rw-r--r--docs/rabbitmq.conf.example12
-rw-r--r--priv/schema/rabbit.schema7
-rw-r--r--src/rabbit_feature_flags.erl2
-rw-r--r--src/rabbit_fifo.erl2
-rw-r--r--src/rabbit_fifo_client.erl17
-rw-r--r--src/rabbit_quorum_queue.erl36
-rw-r--r--test/dead_lettering_SUITE.erl22
-rw-r--r--test/publisher_confirms_parallel_SUITE.erl22
-rw-r--r--test/rabbit_fifo_SUITE.erl2
-rw-r--r--test/rabbitmqctl_integration_SUITE.erl10
-rw-r--r--test/unit_SUITE.erl5
-rw-r--r--test/unit_log_config_SUITE.erl24
12 files changed, 110 insertions, 51 deletions
diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example
index b82956a267..42aea4acf5 100644
--- a/docs/rabbitmq.conf.example
+++ b/docs/rabbitmq.conf.example
@@ -38,19 +38,19 @@
## and TLS listeners.
##
# num_acceptors.tcp = 10
-# num_acceptors.ssl = 1
+# num_acceptors.ssl = 10
-## Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection
-## and TLS handshake), in milliseconds.
+## Maximum amount of time allowed for the AMQP 0-9-1 and AMQP 1.0 handshake
+## (performed after socket connection and TLS handshake) to complete, in milliseconds.
##
# handshake_timeout = 10000
## Set to 'true' to perform reverse DNS lookups when accepting a
-## connection. Hostnames will then be shown instead of IP addresses
-## in rabbitmqctl and the management plugin.
+## connection. rabbitmqctl and management UI will then display hostnames
+## instead of IP addresses. Default value is `false`.
##
-# reverse_dns_lookups = true
+# reverse_dns_lookups = false
##
## Security, Access Control
diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema
index 9bccb9a89e..f24e74c9ef 100644
--- a/priv/schema/rabbit.schema
+++ b/priv/schema/rabbit.schema
@@ -1332,6 +1332,13 @@ end}.
{datatype, string}
]}.
+{mapping, "log.ra.level", "rabbit.log.categories.ra.level", [
+ {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
+]}.
+{mapping, "log.ra.file", "rabbit.log.categories.ra.file", [
+ {datatype, string}
+]}.
+
{mapping, "log.default.level", "rabbit.log.categories.default.level", [
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
]}.
diff --git a/src/rabbit_feature_flags.erl b/src/rabbit_feature_flags.erl
index e5d59b1a47..8ba38179a5 100644
--- a/src/rabbit_feature_flags.erl
+++ b/src/rabbit_feature_flags.erl
@@ -1132,7 +1132,7 @@ try_to_write_enabled_feature_flags_list(FeatureNames) ->
enabled_feature_flags_list_file() ->
case application:get_env(rabbit, feature_flags_file) of
{ok, Val} -> Val;
- _ -> filename:join([rabbit_mnesia:dir(), "feature_flags"])
+ undefined -> throw(feature_flags_file_not_set)
end.
%% -------------------------------------------------------------------
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 58e91f3d19..2580c1bfc0 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -659,7 +659,7 @@ tick(_Ts, #state{name = Name,
EnqueueBytes,
CheckoutBytes},
[{mod_call, rabbit_quorum_queue,
- update_metrics, [QName, Metrics]}, {aux, emit}].
+ handle_tick, [QName, Metrics]}, {aux, emit}].
-spec overview(state()) -> map().
overview(#state{consumers = Cons,
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 5cc73b5d63..ef0991f675 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -73,7 +73,6 @@
{maybe(term()), rabbit_fifo:command()}},
consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
#consumer{}},
- priority = normal :: normal | low,
block_handler = fun() -> ok end :: fun(() -> ok),
unblock_handler = fun() -> ok end :: fun(() -> ok),
timeout :: non_neg_integer()
@@ -710,7 +709,6 @@ consumer_id(ConsumerTag) ->
send_command(Server, Correlation, Command, Priority,
#state{pending = Pending,
- priority = Priority,
soft_limit = SftLmt} = State0) ->
{Seq, State} = next_seq(State0),
ok = ra:pipeline_command(Server, Command, Seq, Priority),
@@ -719,20 +717,7 @@ send_command(Server, Correlation, Command, Priority,
false -> ok
end,
{Tag, State#state{pending = Pending#{Seq => {Correlation, Command}},
- priority = Priority,
- slow = Tag == slow}};
-%% once a low priority command has been sent it's not possible to then
-%% send a normal priority command without risking that commands are
-%% re-ordered. From an AMQP 0.9.1 point of view this should only affect
-%% channels that _both_ publish and consume as the enqueue operation is the
-%% only low priority one that is sent.
-send_command(Node, Correlation, Command, normal,
- #state{priority = low} = State) ->
- send_command(Node, Correlation, Command, low, State);
-send_command(Node, Correlation, Command, low,
- #state{priority = normal} = State) ->
- send_command(Node, Correlation, Command, low,
- State#state{priority = low}).
+ slow = Tag == slow}}.
resend_command(Node, Correlation, Command,
#state{pending = Pending} = State0) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 8a6d7eee5b..d0d464cda3 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -28,7 +28,7 @@
-export([cluster_state/1, status/2]).
-export([update_consumer_handler/8, update_consumer/9]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
--export([become_leader/2, update_metrics/2]).
+-export([become_leader/2, handle_tick/2]).
-export([rpc_delete_metrics/1]).
-export([format/1]).
-export([open_files/1]).
@@ -62,8 +62,9 @@
single_active_consumer_ctag
]).
--define(TICK_TIME, 1000). %% the ra server tick time
--define(DELETE_TIMEOUT, 5000). %% the ra server tick time
+-define(RPC_TIMEOUT, 1000).
+-define(TICK_TIMEOUT, 5000). %% the ra server tick time
+-define(DELETE_TIMEOUT, 5000).
%%----------------------------------------------------------------------------
@@ -124,6 +125,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
friendly_name => FName,
initial_members => ServerIds,
log_init_args => #{uid => UId},
+ tick_timeout => ?TICK_TIMEOUT,
machine => RaMachine}
end || ServerId <- ServerIds],
@@ -220,7 +222,7 @@ become_leader(QName, Name) ->
{ok, Q0} when ?is_amqqueue(Q0) ->
Nodes = amqqueue:get_quorum_nodes(Q0),
[rpc:call(Node, ?MODULE, rpc_delete_metrics,
- [QName], ?TICK_TIME)
+ [QName], ?RPC_TIMEOUT)
|| Node <- Nodes, Node =/= node()];
_ ->
ok
@@ -232,7 +234,7 @@ rpc_delete_metrics(QName) ->
ets:delete(queue_metrics, QName),
ok.
-update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
+handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
%% this makes calls to remote processes so cannot be run inside the
%% ra server
_ = spawn(fun() ->
@@ -255,8 +257,26 @@ update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
{messages_unacknowledged, MU},
{reductions, R}])
end),
+ ok = repair_leader_record(QName),
ok.
+repair_leader_record(QName) ->
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ Node = node(),
+ case amqqueue:get_pid(Q) of
+ {_, Node} ->
+ %% it's ok - we don't need to do anything
+ ok;
+ _ ->
+ rabbit_log:debug("~s: repairing leader record",
+ [rabbit_misc:rs(QName)]),
+ {_, Name} = erlang:process_info(self(), registered_name),
+ become_leader(QName, Name)
+ end,
+ ok.
+
+
+
reductions(Name) ->
try
{reductions, R} = process_info(whereis(Name), reductions),
@@ -343,7 +363,7 @@ delete(Q,
end,
ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
- ?TICK_TIME),
+ ?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
@@ -824,7 +844,7 @@ i(memory, Q) when ?is_amqqueue(Q) ->
i(state, Q) when ?is_amqqueue(Q) ->
{Name, Node} = amqqueue:get_pid(Q),
%% Check against the leader or last known leader
- case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of
+ case rpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
{badrpc, _} -> down;
State -> State
end;
@@ -899,7 +919,7 @@ format(Q) when ?is_amqqueue(Q) ->
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].
is_process_alive(Name, Node) ->
- erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)).
+ erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).
-spec quorum_messages(atom()) -> non_neg_integer().
diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl
index f2241f5f6f..72c72b096c 100644
--- a/test/dead_lettering_SUITE.erl
+++ b/test/dead_lettering_SUITE.erl
@@ -93,10 +93,24 @@ init_per_group(classic_queue, Config) ->
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, false}]);
init_per_group(quorum_queue, Config) ->
- rabbit_ct_helpers:set_config(
- Config,
- [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
- {queue_durable, true}]);
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+ false ->
+ {skip, "Quorum queues are unsupported"}
+ end;
init_per_group(mirrored_queue, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl
index 45484dd92b..aa15f9e420 100644
--- a/test/publisher_confirms_parallel_SUITE.erl
+++ b/test/publisher_confirms_parallel_SUITE.erl
@@ -74,10 +74,24 @@ init_per_group(classic_queue, Config) ->
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]);
init_per_group(quorum_queue, Config) ->
- rabbit_ct_helpers:set_config(
- Config,
- [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
- {queue_durable, true}]);
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+ false ->
+ {skip, "Quorum queues are unsupported"}
+ end;
init_per_group(mirrored_queue, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index b8f6a110b3..293a597d14 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -52,7 +52,7 @@ end_per_group(_, Config) ->
init_per_testcase(TestCase, Config) ->
meck:new(rabbit_quorum_queue, [passthrough]),
- meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end),
+ meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _) -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
fun (_, _) -> ok end),
ra_server_sup_sup:remove_all(),
diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl
index 87d076f694..3e0c474f2e 100644
--- a/test/rabbitmqctl_integration_SUITE.erl
+++ b/test/rabbitmqctl_integration_SUITE.erl
@@ -17,6 +17,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
-export([all/0
,groups/0
@@ -166,14 +167,7 @@ assert_ctl_queues(Config, Node, Args, Expected0) ->
Expected = lists:sort(Expected0),
Got0 = run_list_queues(Config, Node, Args),
Got = lists:sort(lists:map(fun hd/1, Got0)),
- case Got of
- Expected ->
- ok;
- _ ->
- ct:pal(error, "Listing queues on node ~p failed. Expected:~n~p~n~nGot:~n~p~n~n",
- [Node, Expected, Got]),
- exit({list_queues_unexpected_on, Node, Expected, Got})
- end.
+ ?assertMatch(Expected, Got).
run_list_queues(Config, Node, Args) ->
rabbit_ct_broker_helpers:rabbitmqctl_list(Config, Node, ["list_queues"] ++ Args ++ ["name", "--no-table-headers"]).
diff --git a/test/unit_SUITE.erl b/test/unit_SUITE.erl
index 1c3b453c04..7a41e419e9 100644
--- a/test/unit_SUITE.erl
+++ b/test/unit_SUITE.erl
@@ -80,6 +80,7 @@ init_per_testcase(TC, Config) when TC =:= decrypt_start_app;
TC =:= decrypt_start_app_file;
TC =:= decrypt_start_app_undefined ->
application:load(rabbit),
+ application:set_env(rabbit, feature_flags_file, ""),
Config;
init_per_testcase(_Testcase, Config) ->
Config.
@@ -262,7 +263,7 @@ decrypt_start_app_undefined(Config) ->
rabbit:start_apps([rabbit_shovel_test], #{rabbit => temporary})
catch
exit:{bad_configuration, config_entry_decoder} -> ok;
- _:_ -> exit(unexpected_exception)
+ _:Exception -> exit({unexpected_exception, Exception})
end.
decrypt_start_app_wrong_passphrase(Config) ->
@@ -282,7 +283,7 @@ decrypt_start_app_wrong_passphrase(Config) ->
rabbit:start_apps([rabbit_shovel_test], #{rabbit => temporary})
catch
exit:{decryption_error,_,_} -> ok;
- _:_ -> exit(unexpected_exception)
+ _:Exception -> exit({unexpected_exception, Exception})
end.
rabbitmqctl_encode(_Config) ->
diff --git a/test/unit_log_config_SUITE.erl b/test/unit_log_config_SUITE.erl
index e49632d06d..584aa76760 100644
--- a/test/unit_log_config_SUITE.erl
+++ b/test/unit_log_config_SUITE.erl
@@ -125,6 +125,10 @@ sink_rewrite_sinks() ->
{rabbit_log_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]},
+ {rabbit_log_ldap_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,inherit]}]}]},
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]},
@@ -210,6 +214,10 @@ sink_handlers_merged_with_lager_extra_sinks_handlers(_) ->
{rabbit_log_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]},
+ {rabbit_log_ldap_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,inherit]}]}]},
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]},
@@ -287,6 +295,10 @@ level_sinks() ->
{rabbit_log_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]},
+ {rabbit_log_ldap_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,inherit]}]}]},
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,error]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,error]}]}]},
@@ -377,6 +389,10 @@ file_sinks() ->
{rabbit_log_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]},
+ {rabbit_log_ldap_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,inherit]}]}]},
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]},
@@ -610,6 +626,10 @@ default_expected_sinks(UpgradeFile) ->
{rabbit_log_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]},
+ {rabbit_log_ldap_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,inherit]}]}]},
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]},
@@ -674,6 +694,10 @@ tty_expected_sinks() ->
{rabbit_log_lager_event,
[{handlers, [{lager_forwarder_backend,[lager_event,inherit]}]},
{rabbit_handlers, [{lager_forwarder_backend,[lager_event,inherit]}]}]},
+ {rabbit_log_ldap_lager_event,
+ [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
+ {rabbit_handlers,
+ [{lager_forwarder_backend,[lager_event,inherit]}]}]},
{rabbit_log_mirroring_lager_event,
[{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},
{rabbit_handlers,[{lager_forwarder_backend,[lager_event,inherit]}]}]},