diff options
36 files changed, 1117 insertions, 2393 deletions
diff --git a/scripts/rabbitmq-defaults b/scripts/rabbitmq-defaults index fdcf624d1b..a4d78e6986 100755 --- a/scripts/rabbitmq-defaults +++ b/scripts/rabbitmq-defaults @@ -47,7 +47,7 @@ PLUGINS_DIR="${RABBITMQ_HOME}/plugins" # RABBIT_HOME can contain a version number, so default plugins # directory can be hard to find if we want to package some plugin # separately. When RABBITMQ_HOME points to a standard location where -# it's usally being installed by package managers, we add +# it's usually being installed by package managers, we add # "/usr/lib/rabbitmq/plugins" to plugin search path. case "$RABBITMQ_HOME" in /usr/lib/rabbitmq/*) diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index d3014ecd66..f1624eddf9 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -220,6 +220,10 @@ rmq_normalize_path_var RABBITMQ_PID_FILE [ "x" = "x$RABBITMQ_BOOT_MODULE" ] && RABBITMQ_BOOT_MODULE=${BOOT_MODULE} +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand +rmq_normalize_path_var RABBITMQ_PLUGINS_EXPAND_DIR + [ "x" != "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE_source=environment [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} rmq_normalize_path_var RABBITMQ_ENABLED_PLUGINS_FILE @@ -232,9 +236,11 @@ rmq_normalize_path_var RABBITMQ_PLUGINS_DIR [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} [ "x" != "x$RABBITMQ_LOGS" ] && export RABBITMQ_LOGS_source=environment [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log" +[ "x" = "x$RABBITMQ_UPGRADE_LOG" ] && RABBITMQ_UPGRADE_LOG="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}_upgrade.log" -rmq_normalize_path_var \ - RABBITMQ_LOGS +rmq_normalize_path_var RABBITMQ_LOGS + +rmq_normalize_path_var RABBITMQ_UPGRADE_LOG [ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} @@ -247,9 +253,11 @@ rmq_check_if_shared_with_mnesia \ RABBITMQ_CONFIG_FILE \ RABBITMQ_LOG_BASE \ RABBITMQ_PID_FILE \ + RABBITMQ_PLUGINS_EXPAND_DIR \ RABBITMQ_ENABLED_PLUGINS_FILE \ RABBITMQ_PLUGINS_DIR \ - RABBITMQ_LOGS + RABBITMQ_LOGS \ + RABBITMQ_UPGRADE_LOG ##--- End of overridden <var_name> variables diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat index 9426019003..56b2f69b2d 100644 --- a/scripts/rabbitmq-env.bat +++ b/scripts/rabbitmq-env.bat @@ -262,6 +262,20 @@ if "!RABBITMQ_BOOT_MODULE!"=="" ( )
)
+REM [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR}
+REM [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand
+if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
+ if "!PLUGINS_EXPAND_DIR!"=="" (
+ set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_MNESIA_BASE!\!RABBITMQ_NODENAME!-plugins-expand
+ ) else (
+ set RABBITMQ_PLUGINS_EXPAND_DIR=!PLUGINS_EXPAND_DIR!
+ )
+)
+REM FIXME: RabbitMQ removes and recreates RABBITMQ_PLUGINS_EXPAND_DIR
+REM itself. Therefore we can't create it here in advance and escape the
+REM directory name, and RABBITMQ_PLUGINS_EXPAND_DIR must not contain
+REM non-US-ASCII characters.
+
REM [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
if "!ENABLED_PLUGINS_FILE!"=="" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index ce731516d4..41d1a81332 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -155,14 +155,16 @@ RABBITMQ_LISTEN_ARG= [ "x" != "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_LISTEN_ARG="-rabbit tcp_listeners [{\""${RABBITMQ_NODE_IP_ADDRESS}"\","${RABBITMQ_NODE_PORT}"}]" # If $RABBITMQ_LOGS is '-', send all log messages to stdout. This is -# particularily useful for Docker images. +# particularly useful for Docker images. if [ "$RABBITMQ_LOGS" = '-' ]; then SASL_ERROR_LOGGER=tty RABBIT_LAGER_HANDLER=tty + RABBITMQ_LAGER_HANDLER_UPGRADE=tty else SASL_ERROR_LOGGER=false RABBIT_LAGER_HANDLER='"'${RABBITMQ_LOGS}'"' + RABBITMQ_LAGER_HANDLER_UPGRADE='"'${RABBITMQ_UPGRADE_LOG}'"' fi # Bump ETS table limit to 50000 @@ -216,8 +218,10 @@ start_rabbitmq_server() { -sasl sasl_error_logger "$SASL_ERROR_LOGGER" \ -rabbit lager_log_root "\"$RABBITMQ_LOG_BASE\"" \ -rabbit lager_handler "$RABBIT_LAGER_HANDLER" \ + -rabbit lager_handler_upgrade "$RABBITMQ_LAGER_HANDLER_UPGRADE" \ -rabbit enabled_plugins_file "\"$RABBITMQ_ENABLED_PLUGINS_FILE\"" \ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ + -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ -os_mon start_cpu_sup false \ -os_mon start_disksup false \ -os_mon start_memsup false \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 5e9b3667bc..1fc872cd9a 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -119,7 +119,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( )
REM If $RABBITMQ_LOGS is '-', send all log messages to stdout. This is
-REM particularily useful for Docker images.
+REM particularly useful for Docker images.
if "!RABBITMQ_LOGS!" == "-" (
set SASL_ERROR_LOGGER=tty
@@ -174,6 +174,7 @@ if "!ENV_OK!"=="false" ( -rabbit lager_handler !RABBIT_LAGER_HANDLER! ^
-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 24a7ab128c..5022ec020a 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -251,6 +251,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -rabbit lager_handler !RABBIT_LAGER_HANDLER! ^
-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-rabbit windows_service_config \""!RABBITMQ_CONFIG_FILE:\=/!"\" ^
-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
diff --git a/src/rabbit.erl b/src/rabbit.erl index d895d57ff2..e121fb3e2e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -153,6 +153,14 @@ {requires, core_initialized}, {enables, routing_ready}]}). +-rabbit_boot_step({upgrade_queues, + [{description, "per-vhost message store migration"}, + {mfa, {rabbit_upgrade, + maybe_migrate_queues_to_per_vhost_storage, + []}}, + {requires, [core_initialized]}, + {enables, recovery}]}). + -rabbit_boot_step({recovery, [{description, "exchange, queue and binding recovery"}, {mfa, {rabbit, recover, []}}, diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index dd64c6f1c8..daf2c167fa 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -16,7 +16,7 @@ %% There are two types of alarms handled by this module: %% %% * per-node resource (disk, memory) alarms for the whole cluster. If any node -%% has an alarm, then all publishing should be disabled througout the +%% has an alarm, then all publishing should be disabled across the %% cluster until all alarms clear. When a node sets such an alarm, %% this information is automatically propagated throughout the cluster. %% `#alarms.alarmed_nodes' is being used to track this type of alarms. diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl index 65e8563ddf..f6d005be90 100644 --- a/src/rabbit_cli.erl +++ b/src/rabbit_cli.erl @@ -154,7 +154,7 @@ start_distribution_anon(TriesLeft, _) -> start_distribution_anon(TriesLeft - 1, Reason) end. -%% Tries to start distribution with random name choosen from limited list of candidates - to +%% Tries to start distribution with random name chosen from limited list of candidates - to %% prevent atom table pollution on target nodes. start_distribution() -> rabbit_nodes:ensure_epmd(), diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl deleted file mode 100644 index d96c1dd476..0000000000 --- a/src/rabbit_control_main.erl +++ /dev/null @@ -1,1110 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -%% - --module(rabbit_control_main). --include("rabbit.hrl"). --include("rabbit_cli.hrl"). --include("rabbit_misc.hrl"). - --export([start/0, stop/0, parse_arguments/2, action/5, action/6, - sync_queue/1, cancel_sync_queue/1, become/1, - purge_queue/1]). - --import(rabbit_misc, [rpc_call/4, rpc_call/5]). - --define(EXTERNAL_CHECK_INTERVAL, 1000). - --define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node), ?TIMEOUT_DEF]). - --define(COMMANDS, - [stop, - stop_app, - start_app, - wait, - reset, - force_reset, - rotate_logs, - hipe_compile, - - {join_cluster, [?RAM_DEF]}, - change_cluster_node_type, - update_cluster_nodes, - {forget_cluster_node, [?OFFLINE_DEF]}, - rename_cluster_node, - force_boot, - cluster_status, - {sync_queue, [?VHOST_DEF]}, - {cancel_sync_queue, [?VHOST_DEF]}, - {purge_queue, [?VHOST_DEF]}, - - add_user, - delete_user, - change_password, - clear_password, - authenticate_user, - set_user_tags, - list_users, - - add_vhost, - delete_vhost, - list_vhosts, - {set_permissions, [?VHOST_DEF]}, - {clear_permissions, [?VHOST_DEF]}, - {list_permissions, [?VHOST_DEF]}, - list_user_permissions, - - {set_parameter, [?VHOST_DEF]}, - {clear_parameter, [?VHOST_DEF]}, - {list_parameters, [?VHOST_DEF]}, - - set_global_parameter, - clear_global_parameter, - list_global_parameters, - - {set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]}, - {clear_policy, [?VHOST_DEF]}, - {set_operator_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]}, - {clear_operator_policy, [?VHOST_DEF]}, - {list_policies, [?VHOST_DEF]}, - {list_operator_policies, [?VHOST_DEF]}, - - {set_vhost_limits, [?VHOST_DEF]}, - {clear_vhost_limits, [?VHOST_DEF]}, - {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF, ?LOCAL_DEF]}, - {list_exchanges, [?VHOST_DEF]}, - {list_bindings, [?VHOST_DEF]}, - {list_connections, [?VHOST_DEF]}, - list_channels, - {list_consumers, [?VHOST_DEF]}, - status, - environment, - report, - set_cluster_name, - eval, - node_health_check, - - close_connection, - {trace_on, [?VHOST_DEF]}, - {trace_off, [?VHOST_DEF]}, - set_vm_memory_high_watermark, - set_disk_free_limit, - help, - {encode, [?DECODE_DEF, ?CIPHER_DEF, ?HASH_DEF, ?ITERATIONS_DEF, ?LIST_CIPHERS_DEF, ?LIST_HASHES_DEF]} - ]). - --define(GLOBAL_QUERIES, - [{"Connections", rabbit_networking, connection_info_all, - connection_info_keys}, - {"Channels", rabbit_channel, info_all, info_keys}]). - --define(VHOST_QUERIES, - [{"Queues", rabbit_amqqueue, info_all, info_keys}, - {"Exchanges", rabbit_exchange, info_all, info_keys}, - {"Bindings", rabbit_binding, info_all, info_keys}, - {"Consumers", rabbit_amqqueue, consumers_all, consumer_info_keys}, - {"Permissions", rabbit_auth_backend_internal, list_vhost_permissions, - vhost_perms_info_keys}, - {"Policies", rabbit_policy, list_formatted, info_keys}, - {"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]). - --define(COMMANDS_NOT_REQUIRING_APP, - [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs, - join_cluster, change_cluster_node_type, update_cluster_nodes, - forget_cluster_node, rename_cluster_node, cluster_status, status, - environment, eval, force_boot, help, hipe_compile, encode]). - -%% [Command | {Command, DefaultTimeoutInMilliSeconds}] --define(COMMANDS_WITH_TIMEOUT, - [list_user_permissions, list_policies, list_queues, list_exchanges, - list_bindings, list_connections, list_channels, list_consumers, - list_vhosts, list_parameters, list_global_parameters, - purge_queue, - {node_health_check, 70000}]). - -%%---------------------------------------------------------------------------- - --spec start() -> no_return(). --spec stop() -> 'ok'. --spec action - (atom(), node(), [string()], [{string(), any()}], - fun ((string(), [any()]) -> 'ok')) -> - 'ok'. - --spec action - (atom(), node(), [string()], [{string(), any()}], - fun ((string(), [any()]) -> 'ok'), timeout()) -> - 'ok'. - -%%---------------------------------------------------------------------------- - -start() -> - rabbit_cli:main( - fun (Args, NodeStr) -> - parse_arguments(Args, NodeStr) - end, - fun (Command, Node, Args, Opts) -> - Quiet = proplists:get_bool(?QUIET_OPT, Opts), - Inform = case Quiet of - true -> fun (_Format, _Args1) -> ok end; - false -> fun (Format, Args1) -> - io:format(Format ++ " ...~n", Args1) - end - end, - try - T = case get_timeout(Command, Opts) of - {ok, Timeout} -> - Timeout; - {error, _} -> - %% since this is an error with user input, ignore the quiet - %% setting - io:format("Failed to parse provided timeout value, using ~s~n", [?RPC_TIMEOUT]), - ?RPC_TIMEOUT - end, - do_action(Command, Node, Args, Opts, Inform, T) - catch _:E -> E - end - end, rabbit_ctl_usage). - -parse_arguments(CmdLine, NodeStr) -> - rabbit_cli:parse_arguments( - ?COMMANDS, ?GLOBAL_DEFS(NodeStr), ?NODE_OPT, CmdLine). - -print_report(Node, {Descr, Module, InfoFun, KeysFun}) -> - io:format("~s:~n", [Descr]), - print_report0(Node, {Module, InfoFun, KeysFun}, []). - -print_report(Node, {Descr, Module, InfoFun, KeysFun}, VHostArg) -> - io:format("~s on ~s:~n", [Descr, VHostArg]), - print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg). - -print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) -> - case rpc_call(Node, Module, InfoFun, VHostArg) of - [_|_] = Results -> InfoItems = rpc_call(Node, Module, KeysFun, []), - display_row([atom_to_list(I) || I <- InfoItems]), - display_info_list(Results, InfoItems); - _ -> ok - end, - io:nl(). - -get_timeout(Command, Opts) -> - Default = case proplists:lookup(Command, ?COMMANDS_WITH_TIMEOUT) of - none -> - infinity; - {Command, true} -> - ?RPC_TIMEOUT; - {Command, D} -> - D - end, - Result = case proplists:get_value(?TIMEOUT_OPT, Opts, Default) of - use_default -> - parse_timeout(Default); - Value -> - parse_timeout(Value) - end, - Result. - - -parse_number(N) when is_list(N) -> - try list_to_integer(N) of - Val -> Val - catch error:badarg -> - %% could have been a float, give it - %% another shot - list_to_float(N) - end. - -parse_timeout("infinity") -> - {ok, infinity}; -parse_timeout(infinity) -> - {ok, infinity}; -parse_timeout(N) when is_list(N) -> - try parse_number(N) of - M -> - Y = case M >= 0 of - true -> round(M) * 1000; - false -> ?RPC_TIMEOUT - end, - {ok, Y} - catch error:badarg -> - {error, infinity} - end; -parse_timeout(N) -> - {ok, N}. - -announce_timeout(infinity, _Inform) -> - %% no-op - ok; -announce_timeout(Timeout, Inform) when is_number(Timeout) -> - Inform("Timeout: ~w seconds", [Timeout/1000]), - ok. - -stop() -> - ok. - -%%---------------------------------------------------------------------------- - -do_action(Command, Node, Args, Opts, Inform, Timeout) -> - case lists:member(Command, ?COMMANDS_NOT_REQUIRING_APP) of - false -> - case ensure_app_running(Node) of - ok -> - case proplists:lookup(Command, ?COMMANDS_WITH_TIMEOUT) of - {Command, _} -> - announce_timeout(Timeout, Inform), - action(Command, Node, Args, Opts, Inform, Timeout); - none -> - action(Command, Node, Args, Opts, Inform) - end; - E -> E - end; - true -> - action(Command, Node, Args, Opts, Inform) - end. - -action(stop, Node, Args, _Opts, Inform) -> - Inform("Stopping and halting node ~p", [Node]), - Res = call(Node, {rabbit, stop_and_halt, []}), - case {Res, Args} of - {ok, [PidFile]} -> wait_for_process_death( - read_pid_file(PidFile, false)); - {ok, [_, _| _]} -> exit({badarg, Args}); - _ -> ok - end, - Res; - -action(stop_app, Node, [], _Opts, Inform) -> - Inform("Stopping rabbit application on node ~p", [Node]), - call(Node, {rabbit, stop, []}); - -action(start_app, Node, [], _Opts, Inform) -> - Inform("Starting node ~p", [Node]), - call(Node, {rabbit, start, []}); - -action(reset, Node, [], _Opts, Inform) -> - Inform("Resetting node ~p", [Node]), - require_mnesia_stopped(Node, - fun() -> - call(Node, {rabbit_mnesia, reset, []}) - end); - -action(force_reset, Node, [], _Opts, Inform) -> - Inform("Forcefully resetting node ~p", [Node]), - require_mnesia_stopped(Node, - fun() -> - call(Node, {rabbit_mnesia, force_reset, []}) - end); - -action(join_cluster, Node, [ClusterNodeS], Opts, Inform) -> - ClusterNode = list_to_atom(ClusterNodeS), - NodeType = case proplists:get_bool(?RAM_OPT, Opts) of - true -> ram; - false -> disc - end, - Inform("Clustering node ~p with ~p", [Node, ClusterNode]), - require_mnesia_stopped(Node, - fun() -> - rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, NodeType]) - end); - -action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) -> - Inform("Turning ~p into a ram node", [Node]), - require_mnesia_stopped(Node, - fun() -> - rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram]) - end); -action(change_cluster_node_type, Node, [Type], _Opts, Inform) - when Type =:= "disc" orelse Type =:= "disk" -> - Inform("Turning ~p into a disc node", [Node]), - require_mnesia_stopped(Node, - fun() -> - rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc]) - end); - -action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) -> - ClusterNode = list_to_atom(ClusterNodeS), - Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]), - require_mnesia_stopped(Node, - fun() -> - rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode]) - end); - -action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) -> - ClusterNode = list_to_atom(ClusterNodeS), - RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts), - Inform("Removing node ~p from cluster", [ClusterNode]), - case RemoveWhenOffline of - true -> become(Node), - rabbit_mnesia:forget_cluster_node(ClusterNode, true); - false -> rpc_call(Node, rabbit_mnesia, forget_cluster_node, - [ClusterNode, false]) - end; - -action(rename_cluster_node, Node, NodesS, _Opts, Inform) -> - Nodes = split_list([list_to_atom(N) || N <- NodesS]), - Inform("Renaming cluster nodes:~n~s~n", - [lists:flatten([rabbit_misc:format(" ~s -> ~s~n", [F, T]) || - {F, T} <- Nodes])]), - rabbit_mnesia_rename:rename(Node, Nodes); - -action(force_boot, Node, [], _Opts, Inform) -> - Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]), - case rabbit:is_running(Node) of - false -> rabbit_mnesia:force_load_next_boot(); - true -> {error, rabbit_running} - end; - -action(sync_queue, Node, [Q], Opts, Inform) -> - VHost = proplists:get_value(?VHOST_OPT, Opts), - QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)), - Inform("Synchronising ~s", [rabbit_misc:rs(QName)]), - rpc_call(Node, rabbit_control_main, sync_queue, [QName]); - -action(cancel_sync_queue, Node, [Q], Opts, Inform) -> - VHost = proplists:get_value(?VHOST_OPT, Opts), - QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)), - Inform("Stopping synchronising ~s", [rabbit_misc:rs(QName)]), - rpc_call(Node, rabbit_control_main, cancel_sync_queue, [QName]); - -action(wait, Node, [PidFile], _Opts, Inform) -> - Inform("Waiting for ~p", [Node]), - wait_for_application(Node, PidFile, rabbit_and_plugins, Inform); -action(wait, Node, [PidFile, App], _Opts, Inform) -> - Inform("Waiting for ~p on ~p", [App, Node]), - wait_for_application(Node, PidFile, list_to_atom(App), Inform); - -action(status, Node, [], _Opts, Inform) -> - Inform("Status of node ~p", [Node]), - display_call_result(Node, {rabbit, status, []}); - -action(cluster_status, Node, [], _Opts, Inform) -> - Inform("Cluster status of node ~p", [Node]), - Status = unsafe_rpc(Node, rabbit_mnesia, status, []), - io:format("~p~n", [Status ++ [{alarms, - [alarms_by_node(Name) || Name <- nodes_in_cluster(Node)]}]]), - ok; - -action(environment, Node, _App, _Opts, Inform) -> - Inform("Application environment of node ~p", [Node]), - display_call_result(Node, {rabbit, environment, []}); - -action(rotate_logs, Node, [], _Opts, Inform) -> - Inform("Rotating logs for node ~p", [Node]), - call(Node, {rabbit, rotate_logs, []}); - -action(hipe_compile, _Node, [TargetDir], _Opts, _Inform) -> - ok = application:load(rabbit), - case rabbit_hipe:can_hipe_compile() of - true -> - {ok, _, _} = rabbit_hipe:compile_to_directory(TargetDir), - ok; - false -> - {error, "HiPE compilation is not supported"} - end; - -action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) -> - Inform("Closing connection \"~s\"", [PidStr]), - rpc_call(Node, rabbit_networking, close_connection, - [rabbit_misc:string_to_pid(PidStr), Explanation]); - -action(add_user, Node, Args = [Username, _Password], _Opts, Inform) -> - Inform("Creating user \"~s\"", [Username]), - call(Node, {rabbit_auth_backend_internal, add_user, Args}); - -action(delete_user, Node, Args = [_Username], _Opts, Inform) -> - Inform("Deleting user \"~s\"", Args), - call(Node, {rabbit_auth_backend_internal, delete_user, Args}); - -action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> - Inform("Changing password for user \"~s\"", [Username]), - call(Node, {rabbit_auth_backend_internal, change_password, Args}); - -action(clear_password, Node, Args = [Username], _Opts, Inform) -> - Inform("Clearing password for user \"~s\"", [Username]), - call(Node, {rabbit_auth_backend_internal, clear_password, Args}); - -action(authenticate_user, Node, Args = [Username, _Password], _Opts, Inform) -> - Inform("Authenticating user \"~s\"", [Username]), - call(Node, {rabbit_access_control, check_user_pass_login, Args}); - -action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) -> - Tags = [list_to_atom(T) || T <- TagsStr], - Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]), - rpc_call(Node, rabbit_auth_backend_internal, set_tags, - [list_to_binary(Username), Tags]); - -action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> - Inform("Creating vhost \"~s\"", Args), - call(Node, {rabbit_vhost, add, Args}); - -action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> - Inform("Deleting vhost \"~s\"", Args), - call(Node, {rabbit_vhost, delete, Args}); - -action(trace_on, Node, [], Opts, Inform) -> - VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Starting tracing for vhost \"~s\"", [VHost]), - rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]); - -action(trace_off, Node, [], Opts, Inform) -> - VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Stopping tracing for vhost \"~s\"", [VHost]), - rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]); - -action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> - Frac = list_to_float(case string:chr(Arg, $.) of - 0 -> Arg ++ ".0"; - _ -> Arg - end), - Inform("Setting memory threshold on ~p to ~p", [Node, Frac]), - rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]); - -action(set_vm_memory_high_watermark, Node, ["absolute", Arg], _Opts, Inform) -> - case rabbit_resource_monitor_misc:parse_information_unit(Arg) of - {ok, Limit} -> - Inform("Setting memory threshold on ~p to ~p bytes", [Node, Limit]), - rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, - [{absolute, Limit}]); - {error, parse_error} -> - {error_string, rabbit_misc:format( - "Unable to parse absolute memory limit value ~p", [Arg])} - end; - -action(set_disk_free_limit, Node, [Arg], _Opts, Inform) -> - case rabbit_resource_monitor_misc:parse_information_unit(Arg) of - {ok, Limit} -> - Inform("Setting disk free limit on ~p to ~p bytes", [Node, Limit]), - rpc_call(Node, rabbit_disk_monitor, set_disk_free_limit, [Limit]); - {error, parse_error} -> - {error_string, rabbit_misc:format( - "Unable to parse disk free limit value ~p", [Arg])} - end; - -action(set_disk_free_limit, Node, ["mem_relative", Arg], _Opts, Inform) -> - Frac = list_to_float(case string:chr(Arg, $.) of - 0 -> Arg ++ ".0"; - _ -> Arg - end), - Inform("Setting disk free limit on ~p to ~p of total RAM", [Node, Frac]), - rpc_call(Node, - rabbit_disk_monitor, - set_disk_free_limit, - [{mem_relative, Frac}]); - - -action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> - VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Setting permissions for user \"~s\" in vhost \"~s\"", - [Username, VHost]), - call(Node, {rabbit_auth_backend_internal, set_permissions, - [Username, VHost, CPerm, WPerm, RPerm]}); - -action(clear_permissions, Node, [Username], Opts, Inform) -> - VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Clearing permissions for user \"~s\" in vhost \"~s\"", - [Username, VHost]), - call(Node, {rabbit_auth_backend_internal, clear_permissions, - [Username, VHost]}); - -action(set_parameter, Node, [Component, Key, Value], Opts, Inform) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Setting runtime parameter ~p for component ~p to ~p", - [Key, Component, Value]), - rpc_call( - Node, rabbit_runtime_parameters, parse_set, - [VHostArg, list_to_binary(Component), list_to_binary(Key), Value, none]); - -action(clear_parameter, Node, [Component, Key], Opts, Inform) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]), - rpc_call(Node, rabbit_runtime_parameters, clear, [VHostArg, - list_to_binary(Component), - list_to_binary(Key)]); - -action(set_global_parameter, Node, [Key, Value], _Opts, Inform) -> - Inform("Setting global runtime parameter ~p to ~p", [Key, Value]), - rpc_call( - Node, rabbit_runtime_parameters, parse_set_global, - [rabbit_data_coercion:to_atom(Key), rabbit_data_coercion:to_binary(Value)] - ); - -action(clear_global_parameter, Node, [Key], _Opts, Inform) -> - Inform("Clearing global runtime parameter ~p", [Key]), - rpc_call( - Node, rabbit_runtime_parameters, clear_global, - [rabbit_data_coercion:to_atom(Key)] - ); - -action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) -> - Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p", - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts), - ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)), - Inform(Msg, [Key, Pattern, Defn, PriorityArg]), - Res = rpc_call( - Node, rabbit_policy, parse_set, - [VHostArg, list_to_binary(Key), list_to_binary(Pattern), list_to_binary(Defn), list_to_binary(PriorityArg), ApplyToArg]), - case Res of - {error, Format, Args} when is_list(Format) andalso is_list(Args) -> - {error_string, rabbit_misc:format(Format, Args)}; - _ -> - Res - end; - -action(clear_policy, Node, [Key], Opts, Inform) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Clearing policy ~p", [Key]), - rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]); - -action(set_operator_policy, Node, [Key, Pattern, Defn], Opts, Inform) -> - Msg = "Setting operator policy override ~p for pattern ~p to ~p with priority ~p", - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts), - ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)), - Inform(Msg, [Key, Pattern, Defn, PriorityArg]), - Res = rpc_call( - Node, rabbit_policy, parse_set_op, - [VHostArg, list_to_binary(Key), list_to_binary(Pattern), list_to_binary(Defn), list_to_binary(PriorityArg), ApplyToArg]), - case Res of - {error, Format, Args} when is_list(Format) andalso is_list(Args) -> - {error_string, rabbit_misc:format(Format, Args)}; - _ -> - Res - end; - -action(clear_operator_policy, Node, [Key], Opts, Inform) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Clearing operator policy ~p", [Key]), - rpc_call(Node, rabbit_policy, delete_op, [VHostArg, list_to_binary(Key)]); - -action(set_vhost_limits, Node, [Defn], Opts, Inform) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Setting vhost limits for vhost ~p", [VHostArg]), - rpc_call(Node, rabbit_vhost_limit, parse_set, [VHostArg, Defn]), - ok; - -action(clear_vhost_limits, Node, [], Opts, Inform) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Clearing vhost ~p limits", [VHostArg]), - rpc_call(Node, rabbit_vhost_limit, clear, [VHostArg]); - -action(report, Node, _Args, _Opts, Inform) -> - Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), - [begin ok = action(Action, N, [], [], Inform), io:nl() end || - N <- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]), - Action <- [status, cluster_status, environment]], - VHosts = unsafe_rpc(Node, rabbit_vhost, list, []), - [print_report(Node, Q) || Q <- ?GLOBAL_QUERIES], - [print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts], - ok; - -action(set_cluster_name, Node, [Name], _Opts, Inform) -> - Inform("Setting cluster name to ~s", [Name]), - rpc_call(Node, rabbit_nodes, set_cluster_name, [list_to_binary(Name)]); - -action(eval, Node, [Expr], _Opts, _Inform) -> - case erl_scan:string(Expr) of - {ok, Scanned, _} -> - case erl_parse:parse_exprs(Scanned) of - {ok, Parsed} -> {value, Value, _} = - unsafe_rpc( - Node, erl_eval, exprs, [Parsed, []]), - io:format("~p~n", [Value]), - ok; - {error, E} -> {error_string, format_parse_error(E)} - end; - {error, E, _} -> - {error_string, format_parse_error(E)} - end; - -action(help, _Node, _Args, _Opts, _Inform) -> - io:format("~s", [rabbit_ctl_usage:usage()]); - -action(encode, _Node, Args, Opts, _Inform) -> - ListCiphers = lists:member({?LIST_CIPHERS_OPT, true}, Opts), - ListHashes = lists:member({?LIST_HASHES_OPT, true}, Opts), - Decode = lists:member({?DECODE_OPT, true}, Opts), - Cipher = list_to_atom(proplists:get_value(?CIPHER_OPT, Opts)), - Hash = list_to_atom(proplists:get_value(?HASH_OPT, Opts)), - Iterations = list_to_integer(proplists:get_value(?ITERATIONS_OPT, Opts)), - - {_, Msg} = rabbit_control_pbe:encode(ListCiphers, ListHashes, Decode, Cipher, Hash, Iterations, Args), - io:format(Msg ++ "~n"); - -action(Command, Node, Args, Opts, Inform) -> - %% For backward compatibility, run commands accepting a timeout with - %% the default timeout. - action(Command, Node, Args, Opts, Inform, ?RPC_TIMEOUT). - -action(purge_queue, _Node, [], _Opts, _Inform, _Timeout) -> - {error, "purge_queue takes queue name as an argument"}; - -action(purge_queue, Node, [Q], Opts, Inform, Timeout) -> - VHost = proplists:get_value(?VHOST_OPT, Opts), - QRes = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)), - Inform("Purging ~s", [rabbit_misc:rs(QRes)]), - rpc_call(Node, rabbit_control_main, purge_queue, [QRes], Timeout); - -action(list_users, Node, [], _Opts, Inform, Timeout) -> - Inform("Listing users", []), - call_emitter(Node, {rabbit_auth_backend_internal, list_users, []}, - rabbit_auth_backend_internal:user_info_keys(), - [{timeout, Timeout}, to_bin_utf8]); - -action(list_permissions, Node, [], Opts, Inform, Timeout) -> - VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Listing permissions in vhost \"~s\"", [VHost]), - call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, - rabbit_auth_backend_internal:vhost_perms_info_keys(), - [{timeout, Timeout}, to_bin_utf8, is_escaped]); - -action(list_parameters, Node, [], Opts, Inform, Timeout) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Listing runtime parameters", []), - call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, - rabbit_runtime_parameters:info_keys(), - [{timeout, Timeout}]); - -action(list_global_parameters, Node, [], _Opts, Inform, Timeout) -> - Inform("Listing global runtime parameters", []), - call_emitter(Node, {rabbit_runtime_parameters, list_global_formatted, []}, - rabbit_runtime_parameters:global_info_keys(), - [{timeout, Timeout}]); - -action(list_policies, Node, [], Opts, Inform, Timeout) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Listing policies", []), - call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]}, - rabbit_policy:info_keys(), - [{timeout, Timeout}]); - -action(list_operator_policies, Node, [], Opts, Inform, Timeout) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Listing policies", []), - call_emitter(Node, {rabbit_policy, list_formatted_op, [VHostArg]}, - rabbit_policy:info_keys(), - [{timeout, Timeout}]); - - -action(list_vhosts, Node, Args, _Opts, Inform, Timeout) -> - Inform("Listing vhosts", []), - ArgAtoms = default_if_empty(Args, [name]), - call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms, - [{timeout, Timeout}, to_bin_utf8]); - -action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) -> - {error_string, - "list_user_permissions expects a username argument, but none provided."}; -action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) -> - Inform("Listing permissions for user ~p", Args), - call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, - rabbit_auth_backend_internal:user_perms_info_keys(), - [{timeout, Timeout}, to_bin_utf8, is_escaped]); - -action(list_queues, Node, Args, Opts, Inform, Timeout) -> - case rabbit_cli:mutually_exclusive_flags( - Opts, all, [{?ONLINE_OPT, online} - ,{?OFFLINE_OPT, offline} - ,{?LOCAL_OPT, local}]) of - {ok, Filter} -> - Inform("Listing queues", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [name, messages]), - - %% Data for emission - Nodes = nodes_in_cluster(Node, Timeout), - ChunksOpt = {chunks, get_number_of_chunks(Filter, Nodes)}, - TimeoutOpt = {timeout, Timeout}, - EmissionRef = make_ref(), - EmissionRefOpt = {ref, EmissionRef}, - - case Filter of - all -> - start_emission(Node, {rabbit_amqqueue, emit_info_all, - [Nodes, VHostArg, ArgAtoms]}, - [TimeoutOpt, EmissionRefOpt]), - start_emission(Node, {rabbit_amqqueue, emit_info_down, - [VHostArg, ArgAtoms]}, - [TimeoutOpt, EmissionRefOpt]); - online -> - start_emission(Node, {rabbit_amqqueue, emit_info_all, - [Nodes, VHostArg, ArgAtoms]}, - [TimeoutOpt, EmissionRefOpt]); - offline -> - start_emission(Node, {rabbit_amqqueue, emit_info_down, - [VHostArg, ArgAtoms]}, - [TimeoutOpt, EmissionRefOpt]); - local -> - start_emission(Node, {rabbit_amqqueue, emit_info_local, - [VHostArg, ArgAtoms]}, - [TimeoutOpt, EmissionRefOpt]) - end, - display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]); - {error, ErrStr} -> - {error_string, ErrStr} - end; - -action(list_exchanges, Node, Args, Opts, Inform, Timeout) -> - Inform("Listing exchanges", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [name, type]), - call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, - ArgAtoms, [{timeout, Timeout}]); - -action(list_bindings, Node, Args, Opts, Inform, Timeout) -> - Inform("Listing bindings", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [source_name, source_kind, - destination_name, destination_kind, - routing_key, arguments]), - call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, - ArgAtoms, [{timeout, Timeout}]); - -action(list_connections, Node, Args, _Opts, Inform, Timeout) -> - Inform("Listing connections", []), - ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), - Nodes = nodes_in_cluster(Node, Timeout), - call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]}, - ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]); - -action(list_channels, Node, Args, _Opts, Inform, Timeout) -> - Inform("Listing channels", []), - ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, - messages_unacknowledged]), - Nodes = nodes_in_cluster(Node, Timeout), - call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms, - [{timeout, Timeout}, {chunks, length(Nodes)}]); - -action(list_consumers, Node, _Args, Opts, Inform, Timeout) -> - Inform("Listing consumers", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Nodes = nodes_in_cluster(Node, Timeout), - call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]}, - rabbit_amqqueue:consumer_info_keys(), - [{timeout, Timeout}, {chunks, length(Nodes)}]); - -action(node_health_check, Node, _Args, _Opts, Inform, Timeout) -> - Inform("Checking health of node ~p", [Node]), - case rabbit_health_check:node(Node, Timeout) of - ok -> - io:format("Health check passed~n"), - ok; - Other -> - Other - end. - -format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). - -sync_queue(Q) -> - rabbit_mirror_queue_misc:sync_queue(Q). - -cancel_sync_queue(Q) -> - rabbit_mirror_queue_misc:cancel_sync_queue(Q). - -purge_queue(Q) -> - rabbit_amqqueue:with( - Q, fun(Q1) -> - rabbit_amqqueue:purge(Q1), - ok - end). - -%%---------------------------------------------------------------------------- - -require_mnesia_stopped(Node, Fun) -> - case Fun() of - {error, mnesia_unexpectedly_running} -> - {error_string, rabbit_misc:format( - " Mnesia is still running on node ~p. - Please stop the node with rabbitmqctl stop_app first.", [Node])}; - Other -> Other - end. - -wait_for_application(Node, PidFile, Application, Inform) -> - Pid = read_pid_file(PidFile, true), - Inform("pid is ~s", [Pid]), - wait_for_application(Node, Pid, Application). - -wait_for_application(Node, Pid, rabbit_and_plugins) -> - wait_for_startup(Node, Pid); -wait_for_application(Node, Pid, Application) -> - while_process_is_alive( - Node, Pid, fun() -> rabbit_nodes:is_running(Node, Application) end). - -wait_for_startup(Node, Pid) -> - while_process_is_alive( - Node, Pid, fun() -> rpc:call(Node, rabbit, await_startup, []) =:= ok end). - -while_process_is_alive(Node, Pid, Activity) -> - case rabbit_misc:is_os_process_alive(Pid) of - true -> case Activity() of - true -> ok; - false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), - while_process_is_alive(Node, Pid, Activity) - end; - false -> {error, process_not_running} - end. - -wait_for_process_death(Pid) -> - case rabbit_misc:is_os_process_alive(Pid) of - true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), - wait_for_process_death(Pid); - false -> ok - end. - -read_pid_file(PidFile, Wait) -> - case {file:read_file(PidFile), Wait} of - {{ok, Bin}, _} -> - S = binary_to_list(Bin), - {match, [PidS]} = re:run(S, "[^\\s]+", - [{capture, all, list}]), - try list_to_integer(PidS) - catch error:badarg -> - exit({error, {garbage_in_pid_file, PidFile}}) - end, - PidS; - {{error, enoent}, true} -> - timer:sleep(?EXTERNAL_CHECK_INTERVAL), - read_pid_file(PidFile, Wait); - {{error, _} = E, _} -> - exit({error, {could_not_read_pid, E}}) - end. - -become(BecomeNode) -> - error_logger:tty(false), - case net_adm:ping(BecomeNode) of - pong -> exit({node_running, BecomeNode}); - pang -> ok = net_kernel:stop(), - io:format(" * Impersonating node: ~s...", [BecomeNode]), - {ok, _} = rabbit_cli:start_distribution(BecomeNode), - io:format(" done~n", []), - Dir = mnesia:system_info(directory), - io:format(" * Mnesia directory : ~s~n", [Dir]) - end. - -%%---------------------------------------------------------------------------- - -default_if_empty(List, Default) when is_list(List) -> - if List == [] -> Default; - true -> [list_to_atom(X) || X <- List] - end. - -display_info_message_row(IsEscaped, Result, InfoItemKeys) -> - display_row([format_info_item( - case proplists:lookup(X, Result) of - none when is_list(Result), length(Result) > 0 -> - exit({error, {bad_info_key, X}}); - none -> Result; - {X, Value} -> Value - end, IsEscaped) || X <- InfoItemKeys]). - -display_info_message(IsEscaped, InfoItemKeys) -> - fun ([], _) -> - ok; - ([FirstResult|_] = List, _) when is_list(FirstResult) -> - lists:foreach(fun(Result) -> - display_info_message_row(IsEscaped, Result, InfoItemKeys) - end, - List), - ok; - (Result, _) -> - display_info_message_row(IsEscaped, Result, InfoItemKeys), - ok - end. - -display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach( - fun (Result) -> display_row( - [format_info_item(proplists:get_value(X, Result), true) - || X <- InfoItemKeys]) - end, lists:sort(Results)), - ok; -display_info_list(Other, _) -> - Other. - -display_row(Row) -> - io:fwrite(string:join(Row, "\t")), - io:nl(). - --define(IS_U8(X), (X >= 0 andalso X =< 255)). --define(IS_U16(X), (X >= 0 andalso X =< 65535)). - -format_info_item(#resource{name = Name}, IsEscaped) -> - escape(Name, IsEscaped); -format_info_item({N1, N2, N3, N4} = Value, _IsEscaped) when - ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) -> - rabbit_misc:ntoa(Value); -format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value, _IsEscaped) when - ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4), - ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) -> - rabbit_misc:ntoa(Value); -format_info_item(Value, _IsEscaped) when is_pid(Value) -> - rabbit_misc:pid_to_string(Value); -format_info_item(Value, IsEscaped) when is_binary(Value) -> - escape(Value, IsEscaped); -format_info_item(Value, IsEscaped) when is_atom(Value) -> - escape(atom_to_list(Value), IsEscaped); -format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = - Value, IsEscaped) when is_binary(TableEntryKey) andalso - is_atom(TableEntryType) -> - io_lib:format("~1000000000000p", [prettify_amqp_table(Value, IsEscaped)]); -format_info_item([T | _] = Value, IsEscaped) - when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse - is_list(T) -> - "[" ++ - lists:nthtail(2, lists:append( - [", " ++ format_info_item(E, IsEscaped) - || E <- Value])) ++ "]"; -format_info_item({Key, Value}, IsEscaped) -> - "{" ++ io_lib:format("~p", [Key]) ++ ", " ++ - format_info_item(Value, IsEscaped) ++ "}"; -format_info_item(Value, _IsEscaped) -> - io_lib:format("~w", [Value]). - -display_call_result(Node, MFA) -> - case call(Node, MFA) of - {badrpc, _} = Res -> throw(Res); - Res -> io:format("~p~n", [Res]), - ok - end. - -unsafe_rpc(Node, Mod, Fun, Args) -> - unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT). - -unsafe_rpc(Node, Mod, Fun, Args, Timeout) -> - case rpc_call(Node, Mod, Fun, Args, Timeout) of - {badrpc, _} = Res -> throw(Res); - Normal -> Normal - end. - -ensure_app_running(Node) -> - case call(Node, {rabbit, is_running, []}) of - true -> ok; - false -> {error_string, - rabbit_misc:format( - "rabbit application is not running on node ~s.~n" - " * Suggestion: start it with \"rabbitmqctl start_app\" " - "and try again", [Node])}; - Other -> Other - end. - -call(Node, {Mod, Fun, Args}) -> - rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). - -call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) -> - Ref = start_emission(Node, {Mod, Fun, Args}, Opts), - display_emission_result(Ref, InfoKeys, Opts). - -start_emission(Node, {Mod, Fun, Args}, Opts) -> - ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false), - Timeout = proplists:get_value(timeout, Opts, infinity), - Ref = proplists:get_value(ref, Opts, make_ref()), - rabbit_control_misc:spawn_emitter_caller( - Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8), - Ref, self(), Timeout), - Ref. - -display_emission_result(Ref, InfoKeys, Opts) -> - IsEscaped = proplists:get_value(is_escaped, Opts, false), - Chunks = proplists:get_value(chunks, Opts, 1), - Timeout = proplists:get_value(timeout, Opts, infinity), - EmissionStatus = rabbit_control_misc:wait_for_info_messages( - self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks), - emission_to_action_result(EmissionStatus). - -%% Convert rabbit_control_misc:wait_for_info_messages/6 return value -%% into form expected by rabbit_cli:main/3. -emission_to_action_result({ok, ok}) -> - ok; -emission_to_action_result({error, Error}) -> - Error. - -prepare_call_args(Args, ToBinUtf8) -> - case ToBinUtf8 of - true -> valid_utf8_args(Args); - false -> Args - end. - -valid_utf8_args(Args) -> - lists:map(fun list_to_binary_utf8/1, Args). - -list_to_binary_utf8(L) -> - B = list_to_binary(L), - case rabbit_binary_parser:validate_utf8(B) of - ok -> B; - error -> throw({error, {not_utf_8, L}}) - end. - -%% escape does C-style backslash escaping of non-printable ASCII -%% characters. We don't escape characters above 127, since they may -%% form part of UTF-8 strings. - -escape(Atom, IsEscaped) when is_atom(Atom) -> - escape(atom_to_list(Atom), IsEscaped); -escape(Bin, IsEscaped) when is_binary(Bin) -> - escape(binary_to_list(Bin), IsEscaped); -escape(L, false) when is_list(L) -> - escape_char(lists:reverse(L), []); -escape(L, true) when is_list(L) -> - L. - -escape_char([$\\ | T], Acc) -> - escape_char(T, [$\\, $\\ | Acc]); -escape_char([X | T], Acc) when X >= 32, X /= 127 -> - escape_char(T, [X | Acc]); -escape_char([X | T], Acc) -> - escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3), - $0 + (X band 7) | Acc]); -escape_char([], Acc) -> - Acc. - -prettify_amqp_table(Table, IsEscaped) -> - [{escape(K, IsEscaped), prettify_typed_amqp_value(T, V, IsEscaped)} - || {K, T, V} <- Table]. - -prettify_typed_amqp_value(longstr, Value, IsEscaped) -> - escape(Value, IsEscaped); -prettify_typed_amqp_value(table, Value, IsEscaped) -> - prettify_amqp_table(Value, IsEscaped); -prettify_typed_amqp_value(array, Value, IsEscaped) -> - [prettify_typed_amqp_value(T, V, IsEscaped) || {T, V} <- Value]; -prettify_typed_amqp_value(_Type, Value, _IsEscaped) -> - Value. - -split_list([]) -> []; -split_list([_]) -> exit(even_list_needed); -split_list([A, B | T]) -> [{A, B} | split_list(T)]. - -nodes_in_cluster(Node) -> - unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT). - -nodes_in_cluster(Node, Timeout) -> - unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout). - -alarms_by_node(Name) -> - case rpc_call(Name, rabbit, status, []) of - {badrpc,nodedown} -> {Name, [nodedown]}; - Status -> - {_, As} = lists:keyfind(alarms, 1, Status), - {Name, As} - end. - -get_number_of_chunks(all, Nodes) -> - length(Nodes) + 1; -get_number_of_chunks(online, Nodes) -> - length(Nodes); -get_number_of_chunks(offline, _) -> - 1; -get_number_of_chunks(local, _) -> - 1. diff --git a/src/rabbit_lager.erl b/src/rabbit_lager.erl index 8beee10846..c1ed613088 100644 --- a/src/rabbit_lager.erl +++ b/src/rabbit_lager.erl @@ -210,7 +210,7 @@ configure_lager() -> %% messages to the default sink. To know the list of expected extra %% sinks, we look at the 'lager_extra_sinks' compilation option. Sinks0 = application:get_env(lager, extra_sinks, []), - Sinks1 = configure_extra_sinks(Sinks0, + Sinks1 = configure_extra_sinks(Sinks0, [error_logger | list_expected_sinks()]), %% TODO Waiting for basho/lager#303 %% Sinks2 = lists:keystore(error_logger_lager_event, 1, Sinks1, @@ -231,11 +231,7 @@ configure_lager() -> configure_extra_sinks(Sinks, [SinkName | Rest]) -> Sink0 = proplists:get_value(SinkName, Sinks, []), Sink1 = case proplists:is_defined(handlers, Sink0) of - false -> lists:keystore(handlers, 1, Sink0, - {handlers, - [{lager_forwarder_backend, - lager_util:make_internal_sink_name(lager) - }]}); + false -> default_sink_config(SinkName, Sink0); true -> Sink0 end, Sinks1 = lists:keystore(SinkName, 1, Sinks, {SinkName, Sink1}), @@ -243,6 +239,17 @@ configure_extra_sinks(Sinks, [SinkName | Rest]) -> configure_extra_sinks(Sinks, []) -> Sinks. +default_sink_config(rabbit_log_upgrade_lager_event, Sink) -> + Handlers = lager_handlers(application:get_env(rabbit, + lager_handler_upgrade, + tty)), + lists:keystore(handlers, 1, Sink, {handlers, Handlers}); +default_sink_config(_, Sink) -> + lists:keystore(handlers, 1, Sink, + {handlers, + [{lager_forwarder_backend, + lager_util:make_internal_sink_name(lager)}]}). + list_expected_sinks() -> case application:get_env(rabbit, lager_extra_sinks) of {ok, List} -> diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index be5f0146b6..22181ce8b7 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -78,6 +78,7 @@ make_internal_sink_name(rabbit_log_channel) -> rabbit_log_channel_lager_event; make_internal_sink_name(rabbit_log_mirroring) -> rabbit_log_mirroring_lager_event; make_internal_sink_name(rabbit_log_queue) -> rabbit_log_queue_lager_event; make_internal_sink_name(rabbit_log_federation) -> rabbit_log_federation_lager_event; +make_internal_sink_name(rabbit_log_upgrade) -> rabbit_log_upgrade_lager_event; make_internal_sink_name(Category) -> lager_util:make_internal_sink_name(Category). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 51deed8597..9d1282b936 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -877,7 +877,7 @@ check_rabbit_consistency(Remote) -> %% that a `reset' would leave it in. We cannot simply check if the %% mnesia tables aren't there because restarted RAM nodes won't have %% tables while still being non-virgin. What we do instead is to -%% check if the mnesia directory is non existant or empty, with the +%% check if the mnesia directory is non existent or empty, with the %% exception of the cluster status files, which will be there thanks to %% `rabbit_node_monitor:prepare_cluster_status_file/0'. is_virgin_node() -> diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl index 2d7e0f56b6..bcaaf117ff 100644 --- a/src/rabbit_mnesia_rename.erl +++ b/src/rabbit_mnesia_rename.erl @@ -70,13 +70,13 @@ rename(Node, NodeMapList) -> ok = rabbit_mnesia:copy_db(mnesia_copy_dir()), %% And make the actual changes - rabbit_control_main:become(FromNode), + become(FromNode), take_backup(before_backup_name()), convert_backup(NodeMap, before_backup_name(), after_backup_name()), ok = rabbit_file:write_term_file(rename_config_name(), [{FromNode, ToNode}]), convert_config_files(NodeMap), - rabbit_control_main:become(ToNode), + become(ToNode), restore_backup(after_backup_name()), ok after @@ -267,3 +267,15 @@ transform_table(Table, Map, Key) -> [Term] = mnesia:read(Table, Key, write), ok = mnesia:write(Table, update_term(Map, Term), write), transform_table(Table, Map, mnesia:next(Table, Key)). + +become(BecomeNode) -> + error_logger:tty(false), + case net_adm:ping(BecomeNode) of + pong -> exit({node_running, BecomeNode}); + pang -> ok = net_kernel:stop(), + io:format(" * Impersonating node: ~s...", [BecomeNode]), + {ok, _} = rabbit_cli:start_distribution(BecomeNode), + io:format(" done~n", []), + Dir = mnesia:system_info(directory), + io:format(" * Mnesia directory : ~s~n", [Dir]) + end. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 8e2b1c0d49..6c9eb92cff 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/4, successfully_recovered_state/1, +-export([start_link/4, start_global_store_link/4, successfully_recovered_state/1, client_init/4, client_terminate/1, client_delete_and_terminate/1, client_ref/1, close_all_indicated/1, write/3, write_flow/3, read/2, contains/2, remove/2]). @@ -63,7 +63,7 @@ %% the module for index ops, %% rabbit_msg_store_ets_index by default index_module, - %% %% where are messages? + %% where are messages? index_state, %% current file name as number current_file, @@ -91,8 +91,6 @@ flying_ets, %% set of dying clients dying_clients, - %% index of file positions for client death messages - dying_client_index, %% map of references of all registered clients %% to callbacks clients, @@ -265,7 +263,7 @@ %% updated. %% %% On non-clean startup, we scan the files we discover, dealing with -%% the possibilites of a crash having occured during a compaction +%% the possibilites of a crash having occurred during a compaction %% (this consists of tidyup - the compaction is deliberately designed %% such that data is duplicated on disk rather than risking it being %% lost), and rebuild the file summary and index ETS table. @@ -310,7 +308,7 @@ %% From this reasoning, we do have a bound on the number of times the %% message is rewritten. From when it is inserted, there can be no %% files inserted between it and the head of the queue, and the worst -%% case is that everytime it is rewritten, it moves one position lower +%% case is that every time it is rewritten, it moves one position lower %% in the file (for it to stay at the same position requires that %% there are no holes beneath it, which means truncate would be used %% and so it would not be rewritten at all). Thus this seems to @@ -352,7 +350,7 @@ %% because in the event of the same message being sent to several %% different queues, there is the possibility of one queue writing and %% removing the message before other queues write it at all. Thus -%% accomodating 0-reference counts allows us to avoid unnecessary +%% accommodating 0-reference counts allows us to avoid unnecessary %% writes here. Of course, there are complications: the file to which %% the message has already been written could be locked pending %% deletion or GC, which means we have to rewrite the message as the @@ -474,15 +472,20 @@ %% public API %%---------------------------------------------------------------------------- -start_link(Server, Dir, ClientRefs, StartupFunState) -> - gen_server2:start_link({local, Server}, ?MODULE, - [Server, Dir, ClientRefs, StartupFunState], +start_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) -> + gen_server2:start_link(?MODULE, + [Name, Dir, ClientRefs, StartupFunState], + [{timeout, infinity}]). + +start_global_store_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) -> + gen_server2:start_link({local, Name}, ?MODULE, + [Name, Dir, ClientRefs, StartupFunState], [{timeout, infinity}]). successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). -client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> +client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom(Server) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = gen_server2:call( @@ -522,7 +525,7 @@ write_flow(MsgId, Msg, %% rabbit_amqqueue_process process via the %% rabbit_variable_queue. We are accessing the %% rabbit_amqqueue_process process dictionary. - credit_flow:send(whereis(Server), CreditDiscBound), + credit_flow:send(Server, CreditDiscBound), client_write(MsgId, Msg, flow, CState). write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). @@ -548,7 +551,7 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds], server_cast(CState, {remove, CRef, MsgIds}). -set_maximum_since_use(Server, Age) -> +set_maximum_since_use(Server, Age) when is_pid(Server); is_atom(Server) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). %%---------------------------------------------------------------------------- @@ -699,27 +702,25 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, end. clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, - dying_clients = DyingClients, - dying_client_index = DyingIndex }) -> - ets:delete(DyingIndex, CRef), + dying_clients = DyingClients }) -> State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), - dying_clients = sets:del_element(CRef, DyingClients) }. + dying_clients = maps:remove(CRef, DyingClients) }. %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -init([Server, BaseDir, ClientRefs, StartupFunState]) -> +init([Name, BaseDir, ClientRefs, StartupFunState]) -> process_flag(trap_exit, true), ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - Dir = filename:join(BaseDir, atom_to_list(Server)), + Dir = filename:join(BaseDir, atom_to_list(Name)), - {ok, IndexModule} = application:get_env(msg_store_index_module), - rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]), + {ok, IndexModule} = application:get_env(rabbit, msg_store_index_module), + rabbit_log:info("~tp: using ~p to provide index~n", [Dir, IndexModule]), AttemptFileSummaryRecovery = case ClientRefs of @@ -738,7 +739,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> {CleanShutdown, IndexState, ClientRefs1} = recover_index_and_client_refs(IndexModule, FileSummaryRecovered, - ClientRefs, Dir, Server), + ClientRefs, Dir), Clients = dict:from_list( [{CRef, {undefined, undefined, undefined}} || CRef <- ClientRefs1]), @@ -755,10 +756,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]), - DyingIndex = ets:new(rabbit_msg_store_dying_client_index, - [set, public, {keypos, #dying_client.client_ref}]), - {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), + {ok, FileSizeLimit} = application:get_env(rabbit, msg_store_file_size_limit), {ok, GCPid} = rabbit_msg_store_gc:start_link( #gc_state { dir = Dir, @@ -787,8 +786,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, flying_ets = FlyingEts, - dying_clients = sets:new(), - dying_client_index = DyingIndex, + dying_clients = #{}, clients = Clients, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, @@ -866,14 +864,14 @@ handle_call({contains, MsgId}, From, State) -> handle_cast({client_dying, CRef}, State = #msstate { dying_clients = DyingClients, - dying_client_index = DyingIndex, current_file_handle = CurHdl, current_file = CurFile }) -> - DyingClients1 = sets:add_element(CRef, DyingClients), {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), - true = ets:insert_new(DyingIndex, #dying_client{client_ref = CRef, - file = CurFile, - offset = CurOffset}), + DyingClients1 = maps:put(CRef, + #dying_client{client_ref = CRef, + file = CurFile, + offset = CurOffset}, + DyingClients), noreply(State #msstate { dying_clients = DyingClients1 }); handle_cast({client_delete, CRef}, @@ -995,12 +993,25 @@ terminate(_Reason, State = #msstate { index_state = IndexState, State2 end, State3 = close_all_handles(State1), - ok = store_file_summary(FileSummaryEts, Dir), + case store_file_summary(FileSummaryEts, Dir) of + ok -> ok; + {error, FSErr} -> + rabbit_log:error("Unable to store file summary" + " for vhost message store for directory ~p~n" + "Error: ~p~n", + [Dir, FSErr]) + end, [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts, FlyingEts]], IndexModule:terminate(IndexState), - ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, - {index_module, IndexModule}], Dir), + case store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, + {index_module, IndexModule}], Dir) of + ok -> ok; + {error, RTErr} -> + rabbit_log:error("Unable to save message store recovery terms" + "for directory ~p~nError: ~p~n", + [Dir, RTErr]) + end, State3 #msstate { index_state = undefined, current_file_handle = undefined }. @@ -1357,17 +1368,15 @@ blind_confirm(CRef, MsgIds, ActionTaken, State) -> %% msg and thus should be ignored. Note that this (correctly) returns %% false when testing to remove the death msg itself. should_mask_action(CRef, MsgId, - State = #msstate { dying_clients = DyingClients, - dying_client_index = DyingIndex }) -> - case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of - {false, Location} -> + State = #msstate{dying_clients = DyingClients}) -> + case {maps:find(CRef, DyingClients), index_lookup(MsgId, State)} of + {error, Location} -> {false, Location}; - {true, not_found} -> + {{ok, _}, not_found} -> {true, not_found}; - {true, #msg_location { file = File, offset = Offset, - ref_count = RefCount } = Location} -> - [#dying_client { file = DeathFile, offset = DeathOffset }] = - ets:lookup(DyingIndex, CRef), + {{ok, Client}, #msg_location { file = File, offset = Offset, + ref_count = RefCount } = Location} -> + #dying_client{file = DeathFile, offset = DeathOffset} = Client, {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of {true, _} -> true; {false, 0} -> false_if_increment; @@ -1538,16 +1547,16 @@ index_delete_by_file(File, #msstate { index_module = Index, %% shutdown and recovery %%---------------------------------------------------------------------------- -recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) -> +recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir) -> {false, IndexModule:new(Dir), []}; -recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) -> - rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]), +recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir) -> + rabbit_log:warning("~tp : rebuilding indices from scratch~n", [Dir]), {false, IndexModule:new(Dir), []}; -recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> +recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir) -> Fresh = fun (ErrorMsg, ErrorArgs) -> - rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n" + rabbit_log:warning("~tp : " ++ ErrorMsg ++ "~n" "rebuilding indices from scratch~n", - [Server | ErrorArgs]), + [Dir | ErrorArgs]), {false, IndexModule:new(Dir), []} end, case read_recovery_terms(Dir) of @@ -1582,7 +1591,7 @@ read_recovery_terms(Dir) -> end. store_file_summary(Tid, Dir) -> - ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME), + ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME), [{extended_info, [object_count]}]). recover_file_summary(false, _Dir) -> diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl index 76ef112069..0e8b7174e2 100644 --- a/src/rabbit_msg_store_ets_index.erl +++ b/src/rabbit_msg_store_ets_index.erl @@ -74,6 +74,12 @@ delete_by_file(File, State) -> ok. terminate(#state { table = MsgLocations, dir = Dir }) -> - ok = ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME), - [{extended_info, [object_count]}]), + case ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME), + [{extended_info, [object_count]}]) of + ok -> ok; + {error, Err} -> + rabbit_log:error("Unable to save message store index" + " for directory ~p.~nError: ~p~n", + [Dir, Err]) + end, ets:delete(MsgLocations). diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl new file mode 100644 index 0000000000..0209e88cf7 --- /dev/null +++ b/src/rabbit_msg_store_vhost_sup.erl @@ -0,0 +1,93 @@ +-module(rabbit_msg_store_vhost_sup). + +-behaviour(supervisor2). + +-export([start_link/3, init/1, add_vhost/2, delete_vhost/2, + client_init/5, successfully_recovered_state/2]). + +%% Internal +-export([start_store_for_vhost/4]). + +start_link(Name, VhostsClientRefs, StartupFunState) when is_map(VhostsClientRefs); + VhostsClientRefs == undefined -> + supervisor2:start_link({local, Name}, ?MODULE, + [Name, VhostsClientRefs, StartupFunState]). + +init([Name, VhostsClientRefs, StartupFunState]) -> + ets:new(Name, [named_table, public]), + {ok, {{simple_one_for_one, 1, 1}, + [{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_store_for_vhost, + [Name, VhostsClientRefs, StartupFunState]}, + transient, infinity, supervisor, [rabbit_msg_store]}]}}. + + +add_vhost(Name, VHost) -> + supervisor2:start_child(Name, [VHost]). + +start_store_for_vhost(Name, VhostsClientRefs, StartupFunState, VHost) -> + case vhost_store_pid(Name, VHost) of + no_pid -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + ok = rabbit_file:ensure_dir(VHostDir), + rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]), + VhostRefs = refs_for_vhost(VHost, VhostsClientRefs), + case rabbit_msg_store:start_link(Name, VHostDir, VhostRefs, StartupFunState) of + {ok, Pid} -> + ets:insert(Name, {VHost, Pid}), + {ok, Pid}; + Other -> Other + end; + Pid when is_pid(Pid) -> + {error, {already_started, Pid}} + end. + +refs_for_vhost(_, undefined) -> undefined; +refs_for_vhost(VHost, Refs) -> + case maps:find(VHost, Refs) of + {ok, Val} -> Val; + error -> [] + end. + + +delete_vhost(Name, VHost) -> + case vhost_store_pid(Name, VHost) of + no_pid -> ok; + Pid when is_pid(Pid) -> + supervisor2:terminate_child(Name, Pid), + cleanup_vhost_store(Name, VHost, Pid) + end, + ok. + +client_init(Name, Ref, MsgOnDiskFun, CloseFDsFun, VHost) -> + VHostPid = maybe_start_store_for_vhost(Name, VHost), + rabbit_msg_store:client_init(VHostPid, Ref, MsgOnDiskFun, CloseFDsFun). + +maybe_start_store_for_vhost(Name, VHost) -> + case add_vhost(Name, VHost) of + {ok, Pid} -> Pid; + {error, {already_started, Pid}} -> Pid; + Error -> throw(Error) + end. + +vhost_store_pid(Name, VHost) -> + case ets:lookup(Name, VHost) of + [] -> no_pid; + [{VHost, Pid}] -> + case erlang:is_process_alive(Pid) of + true -> Pid; + false -> + cleanup_vhost_store(Name, VHost, Pid), + no_pid + end + end. + +cleanup_vhost_store(Name, VHost, Pid) -> + ets:delete_object(Name, {VHost, Pid}). + +successfully_recovered_state(Name, VHost) -> + case vhost_store_pid(Name, VHost) of + no_pid -> + throw({message_store_not_started, Name, VHost}); + Pid when is_pid(Pid) -> + rabbit_msg_store:successfully_recovered_state(Pid) + end. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 9da68b7640..54f4180244 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -67,22 +67,15 @@ ensure(FileJustChanged0) -> {error, {enabled_plugins_mismatch, FileJustChanged, OurFile}} end. +%% @doc Prepares the file system and installs all enabled plugins. setup() -> - case application:get_env(rabbit, plugins_expand_dir) of - {ok, ExpandDir} -> - case filelib:is_dir(ExpandDir) of - true -> - rabbit_log:info( - "\"~s\" is no longer used to expand plugins.~n" - "RabbitMQ still manages this directory " - "but will stop doing so in the future.", [ExpandDir]), - - _ = delete_recursively(ExpandDir); - false -> - ok - end; - undefined -> - ok + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + + %% Eliminate the contents of the destination directory + case delete_recursively(ExpandDir) of + ok -> ok; + {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir, + [ExpandDir, E1]}}) end, {ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file), @@ -135,61 +128,10 @@ extract_schema(#plugin{type = dir, location = Location}, SchemaDir) -> %% @doc Lists the plugins which are currently running. active() -> - LoadedPluginNames = maybe_keep_required_deps(false, loaded_plugin_names()), + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + InstalledPlugins = plugin_names(list(ExpandDir)), [App || {App, _, _} <- rabbit_misc:which_applications(), - lists:member(App, LoadedPluginNames)]. - -loaded_plugin_names() -> - {ok, PluginsPath} = application:get_env(rabbit, plugins_dir), - PluginsDirs = split_path(PluginsPath), - lists:flatmap( - fun(PluginsDir) -> - PluginsDirComponents = filename:split(PluginsDir), - loaded_plugin_names(code:get_path(), PluginsDirComponents, []) - end, - PluginsDirs). - -loaded_plugin_names([Path | OtherPaths], PluginsDirComponents, PluginNames) -> - case lists:sublist(filename:split(Path), length(PluginsDirComponents)) of - PluginsDirComponents -> - case build_plugin_name_from_code_path(Path) of - undefined -> - loaded_plugin_names( - OtherPaths, PluginsDirComponents, PluginNames); - PluginName -> - loaded_plugin_names( - OtherPaths, PluginsDirComponents, - [list_to_atom(PluginName) | PluginNames]) - end; - _ -> - loaded_plugin_names(OtherPaths, PluginsDirComponents, PluginNames) - end; -loaded_plugin_names([], _, PluginNames) -> - PluginNames. - -build_plugin_name_from_code_path(Path) -> - AppPath = case filelib:is_dir(Path) of - true -> - case filelib:wildcard(filename:join(Path, "*.app")) of - [AP | _] -> AP; - [] -> undefined - end; - false -> - EZ = filename:dirname(filename:dirname(Path)), - case filelib:is_regular(EZ) of - true -> - case find_app_path_in_ez(EZ) of - {ok, AP} -> AP; - _ -> undefined - end; - _ -> - undefined - end - end, - case AppPath of - undefined -> undefined; - _ -> filename:basename(AppPath, ".app") - end. + lists:member(App, InstalledPlugins)]. %% @doc Get the list of plugins which are ready to be enabled. list(PluginsPath) -> @@ -279,19 +221,25 @@ running_plugins() -> %%---------------------------------------------------------------------------- prepare_plugins(Enabled) -> - AllPlugins = installed_plugins(), + {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir), + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + + AllPlugins = list(PluginsDistDir), Wanted = dependencies(false, Enabled, AllPlugins), WantedPlugins = lookup_plugins(Wanted, AllPlugins), {ValidPlugins, Problems} = validate_plugins(WantedPlugins), maybe_warn_about_invalid_plugins(Problems), + case filelib:ensure_dir(ExpandDir ++ "/") of + ok -> ok; + {error, E2} -> throw({error, {cannot_create_plugins_expand_dir, + [ExpandDir, E2]}}) + end, + [prepare_plugin(Plugin, ExpandDir) || Plugin <- ValidPlugins], - [prepare_dir_plugin(ValidPlugin) || ValidPlugin <- ValidPlugins], + [prepare_dir_plugin(PluginAppDescPath) || + PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")], Wanted. -installed_plugins() -> - {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir), - list(PluginsDistDir). - maybe_warn_about_invalid_plugins([]) -> ok; maybe_warn_about_invalid_plugins(InvalidPlugins) -> @@ -404,60 +352,40 @@ is_version_supported(Version, ExpectedVersions) -> end. clean_plugins(Plugins) -> - [clean_plugin(Plugin) || Plugin <- Plugins]. + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + [clean_plugin(Plugin, ExpandDir) || Plugin <- Plugins]. -clean_plugin(Plugin) -> +clean_plugin(Plugin, ExpandDir) -> {ok, Mods} = application:get_key(Plugin, modules), - PluginEbinDir = code:lib_dir(Plugin, ebin), - application:unload(Plugin), [begin code:soft_purge(Mod), code:delete(Mod), false = code:is_loaded(Mod) end || Mod <- Mods], - - code:del_path(PluginEbinDir). - -plugin_ebin_dir(#plugin{type = ez, location = Location}) -> - case find_app_path_in_ez(Location) of - {ok, AppPath} -> - filename:join(Location, filename:dirname(AppPath)); - {error, Reason} -> - {error, Reason} - end; -plugin_ebin_dir(#plugin{type = dir, location = Location}) -> - filename:join(Location, "ebin"). - -prepare_dir_plugin(#plugin{name = Name} = Plugin) -> - PluginEbinDir = case plugin_ebin_dir(Plugin) of - {error, Reason} -> - throw({plugin_ebin_dir_not_found, Name, Reason}); - Dir -> - Dir - end, - case code:add_patha(PluginEbinDir) of - true -> - case filelib:wildcard(PluginEbinDir++ "/*.beam") of - [] -> + delete_recursively(rabbit_misc:format("~s/~s", [ExpandDir, Plugin])). + +prepare_dir_plugin(PluginAppDescPath) -> + PluginEbinDir = filename:dirname(PluginAppDescPath), + Plugin = filename:basename(PluginAppDescPath, ".app"), + code:add_patha(PluginEbinDir), + case filelib:wildcard(PluginEbinDir++ "/*.beam") of + [] -> + ok; + [BeamPath | _] -> + Module = list_to_atom(filename:basename(BeamPath, ".beam")), + case code:ensure_loaded(Module) of + {module, _} -> ok; - [BeamPath | _] -> - Module = list_to_atom(filename:basename(BeamPath, ".beam")), - case code:ensure_loaded(Module) of - {module, _} -> - ok; - {error, badfile} -> - rabbit_log:error("Failed to enable plugin \"~s\": " - "it may have been built with an " - "incompatible (more recent?) " - "version of Erlang~n", [Name]), - throw({plugin_built_with_incompatible_erlang, Name}); - Error -> - throw({plugin_module_unloadable, Name, Error}) - end - end; - {error, bad_directory} -> - throw({plugin_ebin_path_incorrect, Name, PluginEbinDir}) + {error, badfile} -> + rabbit_log:error("Failed to enable plugin \"~s\": " + "it may have been built with an " + "incompatible (more recent?) " + "version of Erlang~n", [Plugin]), + throw({plugin_built_with_incompatible_erlang, Plugin}); + Error -> + throw({plugin_module_unloadable, Plugin, Error}) + end end. %%---------------------------------------------------------------------------- @@ -468,6 +396,12 @@ delete_recursively(Fn) -> {error, {Path, E}} -> {error, {cannot_delete, Path, E}} end. +prepare_plugin(#plugin{type = ez, location = Location}, ExpandDir) -> + zip:unzip(Location, [{cwd, ExpandDir}]); +prepare_plugin(#plugin{type = dir, name = Name, location = Location}, + ExpandDir) -> + rabbit_file:recursive_copy(Location, filename:join([ExpandDir, Name])). + plugin_info({ez, EZ}) -> case read_app_file(EZ) of {application, Name, Props} -> mkplugin(Name, Props, ez, EZ); @@ -494,12 +428,14 @@ mkplugin(Name, Props, Type, Location) -> broker_version_requirements = BrokerVersions, dependency_version_requirements = DepsVersions}. -find_app_path_in_ez(EZ) -> +read_app_file(EZ) -> case zip:list_dir(EZ) of {ok, [_|ZippedFiles]} -> case find_app_files(ZippedFiles) of [AppPath|_] -> - {ok, AppPath}; + {ok, [{AppPath, AppFile}]} = + zip:extract(EZ, [{file_list, [AppPath]}, memory]), + parse_binary(AppFile); [] -> {error, no_app_file} end; @@ -507,16 +443,6 @@ find_app_path_in_ez(EZ) -> {error, {invalid_ez, Reason}} end. -read_app_file(EZ) -> - case find_app_path_in_ez(EZ) of - {ok, AppPath} -> - {ok, [{AppPath, AppFile}]} = - zip:extract(EZ, [{file_list, [AppPath]}, memory]), - parse_binary(AppFile); - {error, Reason} -> - {error, Reason} - end. - find_app_files(ZippedFiles) -> {ok, RE} = re:compile("^.*/ebin/.*.app$"), [Path || {zip_file, Path, _, _, _, _} <- ZippedFiles, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 8b96bbffbd..793eb3e514 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -23,6 +23,10 @@ read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). -export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). +-export([scan_queue_segments/3]). + +%% Migrates from global to per-vhost message stores +-export([move_to_per_vhost_stores/1, update_recovery_term/2]). -define(CLEAN_FILENAME, "clean.dot"). @@ -123,7 +127,7 @@ -define(SEGMENT_EXTENSION, ".idx"). %% TODO: The segment size would be configurable, but deriving all the -%% other values is quite hairy and quite possibly noticably less +%% other values is quite hairy and quite possibly noticeably less %% efficient, depending on how clever the compiler is when it comes to %% binary generation/matching with constant vs variable lengths. @@ -475,11 +479,10 @@ start(DurableQueueNames) -> end, {[], sets:new()}, DurableQueueNames), %% Any queue directory we've not been asked to recover is considered garbage - QueuesDir = queues_dir(), rabbit_file:recursive_delete( - [filename:join(QueuesDir, DirName) || - DirName <- all_queue_directory_names(QueuesDir), - not sets:is_element(DirName, DurableDirectories)]), + [DirName || + DirName <- all_queue_directory_names(), + not sets:is_element(filename:basename(DirName), DurableDirectories)]), rabbit_recovery_terms:clear(), @@ -490,12 +493,9 @@ start(DurableQueueNames) -> stop() -> rabbit_recovery_terms:stop(). -all_queue_directory_names(Dir) -> - case rabbit_file:list_dir(Dir) of - {ok, Entries} -> [E || E <- Entries, - rabbit_file:is_dir(filename:join(Dir, E))]; - {error, enoent} -> [] - end. +all_queue_directory_names() -> + filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_wildcard(), + "queues", "*"])). %%---------------------------------------------------------------------------- %% startup and shutdown @@ -508,14 +508,20 @@ erase_index_dir(Dir) -> end. blank_state(QueueName) -> - blank_state_dir( - filename:join(queues_dir(), queue_name_to_dir_name(QueueName))). + blank_state_dir(queue_dir(QueueName)). blank_state_dir(Dir) -> blank_state_dir_funs(Dir, fun (_) -> ok end, fun (_) -> ok end). +queue_dir(#resource{ virtual_host = VHost } = QueueName) -> + %% Queue directory is + %% {node_database_dir}/msg_stores/vhosts/{vhost}/queues/{queue} + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + QueueDir = queue_name_to_dir_name(QueueName), + filename:join([VHostDir, "queues", QueueDir]). + blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) -> {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), @@ -629,8 +635,8 @@ queue_name_to_dir_name(Name = #resource { kind = queue }) -> <<Num:128>> = erlang:md5(term_to_binary(Name)), rabbit_misc:format("~.36B", [Num]). -queues_dir() -> - filename:join(rabbit_mnesia:dir(), "queues"). +queues_base_dir() -> + rabbit_mnesia:dir(). %%---------------------------------------------------------------------------- %% msg store startup delta function @@ -660,20 +666,19 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> end. queue_index_walker_reader(QueueName, Gatherer) -> - State = blank_state(QueueName), - ok = scan_segments( + ok = scan_queue_segments( fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) when is_binary(MsgId) -> gatherer:sync_in(Gatherer, {MsgId, 1}); (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, Acc) -> Acc - end, ok, State), + end, ok, QueueName), ok = gatherer:finish(Gatherer). -scan_segments(Fun, Acc, State) -> - State1 = #qistate { segments = Segments, dir = Dir } = - recover_journal(State), +scan_queue_segments(Fun, Acc, QueueName) -> + State = #qistate { segments = Segments, dir = Dir } = + recover_journal(blank_state(QueueName)), Result = lists:foldr( fun (Seg, AccN) -> segment_entries_foldr( @@ -682,8 +687,8 @@ scan_segments(Fun, Acc, State) -> Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps, IsPersistent, IsDelivered, IsAcked, AccM) end, AccN, segment_find_or_new(Seg, Dir, Segments)) - end, Acc, all_segment_nums(State1)), - {_SegmentCounts, _State} = terminate(State1), + end, Acc, all_segment_nums(State)), + {_SegmentCounts, _State} = terminate(State), Result. %%---------------------------------------------------------------------------- @@ -1353,15 +1358,13 @@ store_msg_segment(_) -> %%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> - QueuesDir = queues_dir(), - QueueDirNames = all_queue_directory_names(QueuesDir), + QueueDirNames = all_queue_directory_names(), {ok, Gatherer} = gatherer:start_link(), [begin ok = gatherer:fork(Gatherer), ok = worker_pool:submit_async( fun () -> - transform_queue(filename:join(QueuesDir, QueueDirName), - Gatherer, Funs) + transform_queue(QueueDirName, Gatherer, Funs) end) end || QueueDirName <- QueueDirNames], empty = gatherer:out(Gatherer), @@ -1402,3 +1405,21 @@ drive_transform_fun(Fun, Hdl, Contents) -> {Output, Contents1} -> ok = file_handle_cache:append(Hdl, Output), drive_transform_fun(Fun, Hdl, Contents1) end. + +move_to_per_vhost_stores(#resource{} = QueueName) -> + OldQueueDir = filename:join([queues_base_dir(), "queues", + queue_name_to_dir_name(QueueName)]), + NewQueueDir = queue_dir(QueueName), + case rabbit_file:is_dir(OldQueueDir) of + true -> + ok = rabbit_file:ensure_dir(NewQueueDir), + ok = rabbit_file:rename(OldQueueDir, NewQueueDir); + false -> + rabbit_log:info("Queue index directory not found for queue ~p~n", + [QueueName]) + end, + ok. + +update_recovery_term(#resource{} = QueueName, Term) -> + Key = queue_name_to_dir_name(QueueName), + rabbit_recovery_terms:store(Key, Term). diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index 94018a5b54..cee5408f0a 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -27,7 +27,7 @@ %% %% The most obvious use case for runtime parameters is policies but %% there are others: -%% +%% %% * Plugin-specific parameters that only make sense at runtime, %% e.g. Federation and Shovel link settings %% * Exchange and queue decorators diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index ad70540e5b..a457938dc9 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor). --export([start_link/0, start_child/1, start_child/2, start_child/3, +-export([start_link/0, start_child/1, start_child/2, start_child/3, start_child/4, start_supervisor_child/1, start_supervisor_child/2, start_supervisor_child/3, start_restartable_child/1, start_restartable_child/2, @@ -37,6 +37,7 @@ -spec start_child(atom()) -> 'ok'. -spec start_child(atom(), [any()]) -> 'ok'. -spec start_child(atom(), atom(), [any()]) -> 'ok'. +-spec start_child(atom(), atom(), atom(), [any()]) -> 'ok'. -spec start_supervisor_child(atom()) -> 'ok'. -spec start_supervisor_child(atom(), [any()]) -> 'ok'. -spec start_supervisor_child(atom(), atom(), [any()]) -> 'ok'. @@ -60,6 +61,13 @@ start_child(ChildId, Mod, Args) -> {ChildId, {Mod, start_link, Args}, transient, ?WORKER_WAIT, worker, [Mod]})). +start_child(ChildId, Mod, Fun, Args) -> + child_reply(supervisor:start_child( + ?SERVER, + {ChildId, {Mod, Fun, Args}, + transient, ?WORKER_WAIT, worker, [Mod]})). + + start_supervisor_child(Mod) -> start_supervisor_child(Mod, []). start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index f88b7cc73f..95af84de39 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -17,6 +17,7 @@ -module(rabbit_upgrade). -export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0, + maybe_migrate_queues_to_per_vhost_storage/0, nodes_running/1, secondary_upgrade/1]). -include("rabbit.hrl"). @@ -98,7 +99,7 @@ ensure_backup_taken() -> _ -> ok end; true -> - error("Found lock file at ~s. + rabbit_log:error("Found lock file at ~s. Either previous upgrade is in progress or has failed. Database backup path: ~s", [lock_filename(), backup_dir()]), @@ -107,6 +108,7 @@ ensure_backup_taken() -> take_backup() -> BackupDir = backup_dir(), + info("upgrades: Backing up mnesia dir to ~p~n", [BackupDir]), case rabbit_mnesia:copy_db(BackupDir) of ok -> info("upgrades: Mnesia dir backed up to ~p~n", [BackupDir]); @@ -126,7 +128,9 @@ remove_backup() -> maybe_upgrade_mnesia() -> AllNodes = rabbit_mnesia:cluster_nodes(all), ok = rabbit_mnesia_rename:maybe_finish(AllNodes), - case rabbit_version:upgrades_required(mnesia) of + %% Mnesia upgrade is the first upgrade scope, + %% so we should create a backup here if there are any upgrades + case rabbit_version:all_upgrades_required([mnesia, local, message_store]) of {error, starting_from_scratch} -> ok; {error, version_not_available} -> @@ -142,10 +146,15 @@ maybe_upgrade_mnesia() -> ok; {ok, Upgrades} -> ensure_backup_taken(), - ok = case upgrade_mode(AllNodes) of - primary -> primary_upgrade(Upgrades, AllNodes); - secondary -> secondary_upgrade(AllNodes) - end + run_mnesia_upgrades(proplists:get_value(mnesia, Upgrades, []), + AllNodes) + end. + +run_mnesia_upgrades([], _) -> ok; +run_mnesia_upgrades(Upgrades, AllNodes) -> + case upgrade_mode(AllNodes) of + primary -> primary_upgrade(Upgrades, AllNodes); + secondary -> secondary_upgrade(AllNodes) end. upgrade_mode(AllNodes) -> @@ -243,15 +252,32 @@ maybe_upgrade_local() -> {ok, []} -> ensure_backup_removed(), ok; {ok, Upgrades} -> mnesia:stop(), - ensure_backup_taken(), ok = apply_upgrades(local, Upgrades, fun () -> ok end), - ensure_backup_removed(), ok end. %% ------------------------------------------------------------------- +maybe_migrate_queues_to_per_vhost_storage() -> + Result = case rabbit_version:upgrades_required(message_store) of + {error, version_not_available} -> version_not_available; + {error, starting_from_scratch} -> starting_from_scratch; + {error, _} = Err -> throw(Err); + {ok, []} -> ok; + {ok, Upgrades} -> apply_upgrades(message_store, + Upgrades, + fun() -> ok end), + ok + end, + %% Message store upgrades should be + %% the last group. + %% Backup can be deleted here. + ensure_backup_removed(), + Result. + +%% ------------------------------------------------------------------- + apply_upgrades(Scope, Upgrades, Fun) -> ok = rabbit_file:lock_file(lock_filename()), info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index bbec4c749d..9dbd4fdbf2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -30,9 +30,18 @@ -export([start/1, stop/0]). +%% exported for parallel map +-export([add_vhost_msg_store/1]). + %% exported for testing only -export([start_msg_store/2, stop_msg_store/0, init/6]). +-export([move_messages_to_vhost_store/0]). +-export([stop_vhost_msg_store/1]). +-include_lib("stdlib/include/qlc.hrl"). + +-define(QUEUE_MIGRATION_BATCH_SIZE, 100). + %%---------------------------------------------------------------------------- %% Messages, and their position in the queue, can be in memory or on %% disk, or both. Persistent messages will have both message and @@ -334,8 +343,11 @@ }). -define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 +-define(PERSISTENT_MSG_STORE_SUP, msg_store_persistent_vhost). +-define(TRANSIENT_MSG_STORE_SUP, msg_store_transient_vhost). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). + -define(QUEUE, lqueue). -include("rabbit.hrl"). @@ -344,6 +356,9 @@ %%---------------------------------------------------------------------------- -rabbit_upgrade({multiple_routing_keys, local, []}). +-rabbit_upgrade({move_messages_to_vhost_store, message_store, []}). + +-compile(export_all). -type seq_id() :: non_neg_integer(). @@ -453,31 +468,61 @@ start(DurableQueues) -> {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), - start_msg_store( - [Ref || Terms <- AllTerms, - Terms /= non_clean_shutdown, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - StartFunState), + %% Group recovery terms by vhost. + {[], VhostRefs} = lists:foldl( + fun + %% We need to skip a queue name + (non_clean_shutdown, {[_|QNames], VhostRefs}) -> + {QNames, VhostRefs}; + (Terms, {[QueueName | QNames], VhostRefs}) -> + case proplists:get_value(persistent_ref, Terms) of + undefined -> {QNames, VhostRefs}; + Ref -> + #resource{virtual_host = VHost} = QueueName, + Refs = case maps:find(VHost, VhostRefs) of + {ok, Val} -> Val; + error -> [] + end, + {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)} + end + end, + {DurableQueues, #{}}, + AllTerms), + start_msg_store(VhostRefs, StartFunState), {ok, AllTerms}. stop() -> ok = stop_msg_store(), ok = rabbit_queue_index:stop(). -start_msg_store(Refs, StartFunState) -> - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, - [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), +start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined -> + ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup, + [?TRANSIENT_MSG_STORE_SUP, undefined, {fun (ok) -> finished end, ok}]), - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store, - [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), - Refs, StartFunState]). + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup, + [?PERSISTENT_MSG_STORE_SUP, Refs, StartFunState]), + %% Start message store for all known vhosts + VHosts = rabbit_vhost:list(), + lists:foreach(fun(VHost) -> + add_vhost_msg_store(VHost) + end, + VHosts), + ok. + +add_vhost_msg_store(VHost) -> + rabbit_log:info("Starting message store vor vhost ~p~n", [VHost]), + rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), + rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost), + rabbit_log:info("Message store is started vor vhost ~p~n", [VHost]). stop_msg_store() -> - ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), - ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). + ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP), + ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP). + +stop_vhost_msg_store(VHost) -> + rabbit_msg_store_vhost_sup:delete_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), + rabbit_msg_store_vhost_sup:delete_vhost(?PERSISTENT_MSG_STORE_SUP, VHost), + ok. init(Queue, Recover, Callback) -> init( @@ -492,22 +537,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), + VHost = QueueName#resource.virtual_host, init(IsDurable, IndexState, 0, 0, [], case IsDurable of - true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun, AsyncCallback); + true -> msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, + MsgOnDiskFun, AsyncCallback, VHost); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); + msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined, + AsyncCallback, VHost)); %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), + VHost = QueueName#resource.virtual_host, {PersistentClient, ContainsCheckFun} = case IsDurable of - true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun, AsyncCallback), + true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, PRef, + MsgOnDiskFun, AsyncCallback, + VHost), {C, fun (MsgId) when is_binary(MsgId) -> rabbit_msg_store:contains(MsgId, C); (#basic_message{is_persistent = Persistent}) -> @@ -515,12 +564,14 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, end}; false -> {undefined, fun(_MsgId) -> false end} end, - TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, - undefined, AsyncCallback), + TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, + undefined, AsyncCallback, + VHost), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, - rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), + rabbit_msg_store_vhost_sup:successfully_recovered_state( + ?PERSISTENT_MSG_STORE_SUP, VHost), ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). @@ -1195,14 +1246,17 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> end), Res. -msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> +msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, - Callback). + Callback, VHost). -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> - CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun, - fun () -> Callback(?MODULE, CloseFDsFun) end). +msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> + CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP), + rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun, + fun () -> + Callback(?MODULE, CloseFDsFun) + end, + VHost). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( @@ -2673,9 +2727,180 @@ multiple_routing_keys() -> %% Assumes message store is not running transform_storage(TransformFun) -> - transform_store(?PERSISTENT_MSG_STORE, TransformFun), - transform_store(?TRANSIENT_MSG_STORE, TransformFun). + transform_store(?PERSISTENT_MSG_STORE_SUP, TransformFun), + transform_store(?TRANSIENT_MSG_STORE_SUP, TransformFun). transform_store(Store, TransformFun) -> rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). + +move_messages_to_vhost_store() -> + log_upgrade("Moving messages to per-vhost message store"), + Queues = list_persistent_queues(), + %% Move the queue index for each persistent queue to the new store + lists:foreach( + fun(Queue) -> + #amqqueue{name = QueueName} = Queue, + rabbit_queue_index:move_to_per_vhost_stores(QueueName) + end, + Queues), + %% Legacy (global) msg_store may require recovery. + %% This upgrade step should only be started + %% if we are upgrading from a pre-3.7.0 version. + {QueuesWithTerms, RecoveryRefs, StartFunState} = start_recovery_terms(Queues), + + OldStore = run_old_persistent_store(RecoveryRefs, StartFunState), + %% New store should not be recovered. + NewStoreSup = start_new_store_sup(), + Vhosts = rabbit_vhost:list(), + lists:foreach(fun(VHost) -> + rabbit_msg_store_vhost_sup:add_vhost(NewStoreSup, VHost) + end, + Vhosts), + MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size, + ?QUEUE_MIGRATION_BATCH_SIZE), + in_batches(MigrationBatchSize, + {rabbit_variable_queue, migrate_queue, [OldStore, NewStoreSup]}, + QueuesWithTerms, + "Migrating batch ~p of ~p queues ~n", + "Batch ~p of ~p queues migrated ~n"), + + log_upgrade("Message store migration finished"), + delete_old_store(OldStore), + + ok = rabbit_queue_index:stop(), + ok = rabbit_sup:stop_child(NewStoreSup), + ok. + +in_batches(Size, MFA, List, MessageStart, MessageEnd) -> + in_batches(Size, 1, MFA, List, MessageStart, MessageEnd). + +in_batches(_, _, _, [], _, _) -> ok; +in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) -> + {Batch, Tail} = case Size > length(List) of + true -> {List, []}; + false -> lists:split(Size, List) + end, + log_upgrade(MessageStart, [BatchNum, Size]), + {M, F, A} = MFA, + Keys = [ rpc:async_call(node(), M, F, [El | A]) || El <- Batch ], + lists:foreach(fun(Key) -> + case rpc:yield(Key) of + {badrpc, Err} -> throw(Err); + _ -> ok + end + end, + Keys), + log_upgrade(MessageEnd, [BatchNum, Size]), + in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd). + +migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore, NewStoreSup) -> + log_upgrade_verbose( + "Migrating messages in queue ~s in vhost ~s to per-vhost message store~n", + [Name, VHost]), + OldStoreClient = get_global_store_client(OldStore), + NewStoreClient = get_per_vhost_store_client(QueueName, NewStoreSup), + %% WARNING: During scan_queue_segments queue index state is being recovered + %% and terminated. This can cause side effects! + rabbit_queue_index:scan_queue_segments( + %% We migrate only persistent messages which are found in message store + %% and are not acked yet + fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, OldC) + when is_binary(MsgId) -> + migrate_message(MsgId, OldC, NewStoreClient); + (_SeqId, _MsgId, _MsgProps, + _IsPersistent, _IsDelivered, _IsAcked, OldC) -> + OldC + end, + OldStoreClient, + QueueName), + rabbit_msg_store:client_terminate(OldStoreClient), + rabbit_msg_store:client_terminate(NewStoreClient), + NewClientRef = rabbit_msg_store:client_ref(NewStoreClient), + case RecoveryTerm of + non_clean_shutdown -> ok; + Term when is_list(Term) -> + NewRecoveryTerm = lists:keyreplace(persistent_ref, 1, RecoveryTerm, + {persistent_ref, NewClientRef}), + rabbit_queue_index:update_recovery_term(QueueName, NewRecoveryTerm) + end, + log_upgrade_verbose("Finished migrating queue ~s in vhost ~s", [Name, VHost]), + {QueueName, NewClientRef}. + +migrate_message(MsgId, OldC, NewC) -> + case rabbit_msg_store:read(MsgId, OldC) of + {{ok, Msg}, OldC1} -> + ok = rabbit_msg_store:write(MsgId, Msg, NewC), + OldC1; + _ -> OldC + end. + +get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStoreSup) -> + rabbit_msg_store_vhost_sup:client_init(NewStoreSup, + rabbit_guid:gen(), + fun(_,_) -> ok end, + fun() -> ok end, + VHost). + +get_global_store_client(OldStore) -> + rabbit_msg_store:client_init(OldStore, + rabbit_guid:gen(), + fun(_,_) -> ok end, + fun() -> ok end). + +list_persistent_queues() -> + Node = node(), + mnesia:async_dirty( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{name = Name, + pid = Pid} + <- mnesia:table(rabbit_durable_queue), + node(Pid) == Node, + mnesia:read(rabbit_queue, Name, read) =:= []])) + end). + +start_recovery_terms(Queues) -> + QueueNames = [Name || #amqqueue{name = Name} <- Queues], + {AllTerms, StartFunState} = rabbit_queue_index:start(QueueNames), + Refs = [Ref || Terms <- AllTerms, + Terms /= non_clean_shutdown, + begin + Ref = proplists:get_value(persistent_ref, Terms), + Ref =/= undefined + end], + {lists:zip(QueueNames, AllTerms), Refs, StartFunState}. + +run_old_persistent_store(Refs, StartFunState) -> + OldStoreName = ?PERSISTENT_MSG_STORE, + ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, start_global_store_link, + [OldStoreName, rabbit_mnesia:dir(), + Refs, StartFunState]), + OldStoreName. + +start_new_store_sup() -> + % Start persistent store sup without recovery. + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, + rabbit_msg_store_vhost_sup, + [?PERSISTENT_MSG_STORE_SUP, + undefined, {fun (ok) -> finished end, ok}]), + ?PERSISTENT_MSG_STORE_SUP. + +delete_old_store(OldStore) -> + ok = rabbit_sup:stop_child(OldStore), + rabbit_file:recursive_delete( + [filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]), + %% Delete old transient store as well + rabbit_file:recursive_delete( + [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]). + +log_upgrade(Msg) -> + log_upgrade(Msg, []). + +log_upgrade(Msg, Args) -> + rabbit_log:info("message_store upgrades: " ++ Msg, Args). + +log_upgrade_verbose(Msg) -> + log_upgrade_verbose(Msg, []). + +log_upgrade_verbose(Msg, Args) -> + rabbit_log_upgrade:info(Msg, Args). diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index a27f0aca00..4e2edd19eb 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -18,7 +18,8 @@ -export([recorded/0, matches/2, desired/0, desired_for_scope/1, record_desired/0, record_desired_for_scope/1, - upgrades_required/1, check_version_consistency/3, + upgrades_required/1, all_upgrades_required/1, + check_version_consistency/3, check_version_consistency/4, check_otp_consistency/1, version_error/3]). @@ -117,6 +118,30 @@ upgrades_required(Scope) -> end, Scope) end. +all_upgrades_required(Scopes) -> + case recorded() of + {error, enoent} -> + case filelib:is_file(rabbit_guid:filename()) of + false -> {error, starting_from_scratch}; + true -> {error, version_not_available} + end; + {ok, _} -> + lists:foldl( + fun + (_, {error, Err}) -> {error, Err}; + (Scope, {ok, Acc}) -> + case upgrades_required(Scope) of + %% Lift errors from any scope. + {error, Err} -> {error, Err}; + %% Filter non-upgradable scopes + {ok, []} -> {ok, Acc}; + {ok, Upgrades} -> {ok, [{Scope, Upgrades} | Acc]} + end + end, + {ok, []}, + Scopes) + end. + %% ------------------------------------------------------------------- with_upgrade_graph(Fun, Scope) -> diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 213dbaaa0c..6edb62425b 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -23,7 +23,8 @@ -export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2, set_limits/2, limits_of/1]). -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). - +-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). +-export([purge_messages/1]). -spec add(rabbit_types:vhost()) -> 'ok'. -spec delete(rabbit_types:vhost()) -> 'ok'. @@ -95,6 +96,16 @@ delete(VHostPath) -> [ok = Fun() || Fun <- Funs], ok. +purge_messages(VHost) -> + VhostDir = msg_store_dir_path(VHost), + rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]), + %% Message store is stopped to close file handles + rabbit_variable_queue:stop_vhost_msg_store(VHost), + ok = rabbit_file:recursive_delete([VhostDir]), + %% Ensure the store is terminated even if it was restarted during the delete operation + %% above. + rabbit_variable_queue:stop_vhost_msg_store(VHost). + assert_benign(ok) -> ok; assert_benign({ok, _}) -> ok; assert_benign({error, not_found}) -> ok; @@ -120,6 +131,7 @@ internal_delete(VHostPath) -> Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info)) || Info <- rabbit_policy:list(VHostPath)], ok = mnesia:delete({rabbit_vhost, VHostPath}), + purge_messages(VHostPath), Fs1 ++ Fs2. exists(VHostPath) -> @@ -170,6 +182,23 @@ set_limits(VHost = #vhost{}, undefined) -> set_limits(VHost = #vhost{}, Limits) -> VHost#vhost{limits = Limits}. + +dir(Vhost) -> + <<Num:128>> = erlang:md5(term_to_binary(Vhost)), + rabbit_misc:format("~.36B", [Num]). + +msg_store_dir_path(VHost) -> + EncodedName = list_to_binary(dir(VHost)), + rabbit_data_coercion:to_list(filename:join([msg_store_dir_base(), + EncodedName])). + +msg_store_dir_wildcard() -> + rabbit_data_coercion:to_list(filename:join([msg_store_dir_base(), "*"])). + +msg_store_dir_base() -> + Dir = rabbit_mnesia:dir(), + filename:join([Dir, "msg_stores", "vhosts"]). + %%---------------------------------------------------------------------------- infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. @@ -188,3 +217,4 @@ info_all(Ref, AggregatorPid) -> info_all(?INFO_KEYS, Ref, AggregatorPid). info_all(Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map( AggregatorPid, Ref, fun(VHost) -> info(VHost, Items) end, list()). + diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index eae7119007..59c63022d8 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -42,7 +42,7 @@ memory() -> || Names <- distinguished_interesting_sups()], Mnesia = mnesia_memory(), - MsgIndexETS = ets_memory([msg_store_persistent, msg_store_transient]), + MsgIndexETS = ets_memory([msg_store_persistent_vhost, msg_store_transient_vhost]), MetricsETS = ets_memory([rabbit_metrics]), MetricsProc = try [{_, M}] = process_info(whereis(rabbit_metrics), [memory]), @@ -149,7 +149,7 @@ interesting_sups() -> [[rabbit_amqqueue_sup_sup], conn_sups() | interesting_sups0()]. interesting_sups0() -> - MsgIndexProcs = [msg_store_transient, msg_store_persistent], + MsgIndexProcs = [msg_store_transient_vhost, msg_store_persistent_vhost], MgmtDbProcs = [rabbit_mgmt_sup_sup], PluginProcs = plugin_sups(), [MsgIndexProcs, MgmtDbProcs, PluginProcs]. diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl index 4407a24e7f..124fda47b1 100644 --- a/test/channel_operation_timeout_test_queue.erl +++ b/test/channel_operation_timeout_test_queue.erl @@ -111,8 +111,8 @@ }). -define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost). +-define(TRANSIENT_MSG_STORE, msg_store_transient_vhost). -define(QUEUE, lqueue). -define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>). @@ -215,14 +215,27 @@ start(DurableQueues) -> {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), - start_msg_store( - [Ref || Terms <- AllTerms, - Terms /= non_clean_shutdown, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - StartFunState), + %% Group recovery terms by vhost. + {[], VhostRefs} = lists:foldl( + fun + %% We need to skip a queue name + (non_clean_shutdown, {[_|QNames], VhostRefs}) -> + {QNames, VhostRefs}; + (Terms, {[QueueName | QNames], VhostRefs}) -> + case proplists:get_value(persistent_ref, Terms) of + undefined -> {QNames, VhostRefs}; + Ref -> + #resource{virtual_host = VHost} = QueueName, + Refs = case maps:find(VHost, VhostRefs) of + {ok, Val} -> Val; + error -> [] + end, + {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)} + end + end, + {DurableQueues, #{}}, + AllTerms), + start_msg_store(VhostRefs, StartFunState), {ok, AllTerms}. stop() -> @@ -230,12 +243,21 @@ stop() -> ok = rabbit_queue_index:stop(). start_msg_store(Refs, StartFunState) -> - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, + ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup, [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined, {fun (ok) -> finished end, ok}]), - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store, + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup, [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), - Refs, StartFunState]). + Refs, StartFunState]), + %% Start message store for all known vhosts + VHosts = rabbit_vhost:list(), + lists:foreach( + fun(VHost) -> + rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost), + rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost) + end, + VHosts), + ok. stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), @@ -254,22 +276,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), + VHost = QueueName#resource.virtual_host, init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun, AsyncCallback); + MsgOnDiskFun, AsyncCallback, + VHost); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost)); %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), + VHost = QueueName#resource.virtual_host, {PersistentClient, ContainsCheckFun} = case IsDurable of true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun, AsyncCallback), + MsgOnDiskFun, AsyncCallback, + VHost), {C, fun (MsgId) when is_binary(MsgId) -> rabbit_msg_store:contains(MsgId, C); (#basic_message{is_persistent = Persistent}) -> @@ -278,11 +304,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, false -> {undefined, fun(_MsgId) -> false end} end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, - undefined, AsyncCallback), + undefined, AsyncCallback, + VHost), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, - rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), + rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost), ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). @@ -957,14 +984,16 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> end), Res. -msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> +msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, - Callback). + Callback, VHost). -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> +msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun, - fun () -> Callback(?MODULE, CloseFDsFun) end). + rabbit_msg_store_vhost_sup:client_init( + MsgStore, Ref, MsgOnDiskFun, + fun () -> Callback(?MODULE, CloseFDsFun) end, + VHost). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index eac5fa3683..9b605d07ae 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -644,6 +644,7 @@ assert_not_clustered(Node) -> assert_failure(Fun) -> case catch Fun() of + {error, Code, Reason} -> Reason; {error, Reason} -> Reason; {error_string, Reason} -> Reason; {badrpc, {'EXIT', Reason}} -> Reason; @@ -652,35 +653,35 @@ assert_failure(Fun) -> end. stop_app(Node) -> - control_action(stop_app, Node). + rabbit_control_helper:command(stop_app, Node). start_app(Node) -> - control_action(start_app, Node). + rabbit_control_helper:command(start_app, Node). join_cluster(Node, To) -> join_cluster(Node, To, false). join_cluster(Node, To, Ram) -> - control_action(join_cluster, Node, [atom_to_list(To)], [{"--ram", Ram}]). + rabbit_control_helper:command_with_output(join_cluster, Node, [atom_to_list(To)], [{"--ram", Ram}]). reset(Node) -> - control_action(reset, Node). + rabbit_control_helper:command(reset, Node). force_reset(Node) -> - control_action(force_reset, Node). + rabbit_control_helper:command(force_reset, Node). forget_cluster_node(Node, Removee, RemoveWhenOffline) -> - control_action(forget_cluster_node, Node, [atom_to_list(Removee)], + rabbit_control_helper:command(forget_cluster_node, Node, [atom_to_list(Removee)], [{"--offline", RemoveWhenOffline}]). forget_cluster_node(Node, Removee) -> forget_cluster_node(Node, Removee, false). change_cluster_node_type(Node, Type) -> - control_action(change_cluster_node_type, Node, [atom_to_list(Type)]). + rabbit_control_helper:command(change_cluster_node_type, Node, [atom_to_list(Type)]). update_cluster_nodes(Node, DiscoveryNode) -> - control_action(update_cluster_nodes, Node, [atom_to_list(DiscoveryNode)]). + rabbit_control_helper:command(update_cluster_nodes, Node, [atom_to_list(DiscoveryNode)]). stop_join_start(Node, ClusterTo, Ram) -> ok = stop_app(Node), @@ -695,17 +696,6 @@ stop_reset_start(Node) -> ok = reset(Node), ok = start_app(Node). -control_action(Command, Node) -> - control_action(Command, Node, [], []). - -control_action(Command, Node, Args) -> - control_action(Command, Node, Args, []). - -control_action(Command, Node, Args, Opts) -> - rpc:call(Node, rabbit_control_main, action, - [Command, Node, Args, Opts, - fun io:format/2]). - declare(Ch, Name) -> Res = amqp_channel:call(Ch, #'queue.declare'{durable = true, queue = Name}), diff --git a/test/eager_sync_SUITE.erl b/test/eager_sync_SUITE.erl index 93b308b6c5..70d7080269 100644 --- a/test/eager_sync_SUITE.erl +++ b/test/eager_sync_SUITE.erl @@ -23,7 +23,7 @@ -define(QNAME, <<"ha.two.test">>). -define(QNAME_AUTO, <<"ha.auto.test">>). --define(MESSAGE_COUNT, 2000). +-define(MESSAGE_COUNT, 200000). all() -> [ @@ -135,9 +135,11 @@ eager_sync_cancel(Config) -> amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME, durable = true}), {ok, not_syncing} = sync_cancel(C, ?QNAME), %% Idempotence - eager_sync_cancel_test2(Config, A, B, C, Ch). + eager_sync_cancel_test2(Config, A, B, C, Ch, 100). -eager_sync_cancel_test2(Config, A, B, C, Ch) -> +eager_sync_cancel_test2(_, _, _, _, _, 0) -> + error(no_more_attempts_left); +eager_sync_cancel_test2(Config, A, B, C, Ch, Attempts) -> %% Sync then cancel rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT), restart(Config, A), @@ -158,12 +160,12 @@ eager_sync_cancel_test2(Config, A, B, C, Ch) -> %% Damn. Syncing finished between wait_for_syncing/3 and %% sync_cancel/2 above. Start again. amqp_channel:call(Ch, #'queue.purge'{queue = ?QNAME}), - eager_sync_cancel_test2(Config, A, B, C, Ch) + eager_sync_cancel_test2(Config, A, B, C, Ch, Attempts - 1) end; synced_already -> %% Damn. Syncing finished before wait_for_syncing/3. Start again. amqp_channel:call(Ch, #'queue.purge'{queue = ?QNAME}), - eager_sync_cancel_test2(Config, A, B, C, Ch) + eager_sync_cancel_test2(Config, A, B, C, Ch, Attempts - 1) end. eager_sync_auto(Config) -> @@ -240,8 +242,8 @@ wait_for_sync(Node, QName) -> sync_detection_SUITE:wait_for_sync_status(true, Node, QName). action(Node, Action, QName) -> - rabbit_ct_broker_helpers:control_action( - Action, Node, [binary_to_list(QName)], [{"-p", "/"}]). + rabbit_control_helper:command_with_output( + Action, Node, [binary_to_list(QName)], [{"-p", "/"}]). queue(Node, QName) -> QNameRes = rabbit_misc:r(<<"/">>, queue, QName), @@ -273,6 +275,6 @@ state(Node, QName) -> %% in order to pass, because a SyncBatchSize >= ?MESSAGE_COUNT will %% always finish before the test is able to cancel the sync. set_app_sync_batch_size(Node) -> - rabbit_ct_broker_helpers:control_action( + rabbit_control_helper:command( eval, Node, ["application:set_env(rabbit, mirroring_sync_batch_size, 1)."]). diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl index 592e57c41a..efc5ca830e 100644 --- a/test/per_vhost_connection_limit_SUITE.erl +++ b/test/per_vhost_connection_limit_SUITE.erl @@ -795,7 +795,7 @@ set_vhost_connection_limit(Config, VHost, Count) -> set_vhost_connection_limit(Config, NodeIndex, VHost, Count) -> Node = rabbit_ct_broker_helpers:get_node_config( Config, NodeIndex, nodename), - rabbit_ct_broker_helpers:control_action( + ok = rabbit_ct_broker_helpers:control_action( set_vhost_limits, Node, ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"], [{"-p", binary_to_list(VHost)}]). diff --git a/test/per_vhost_msg_store_SUITE.erl b/test/per_vhost_msg_store_SUITE.erl new file mode 100644 index 0000000000..4d88c84b7e --- /dev/null +++ b/test/per_vhost_msg_store_SUITE.erl @@ -0,0 +1,254 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(per_vhost_msg_store_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +-define(MSGS_COUNT, 100). + +all() -> + [ + publish_to_different_dirs, + storage_deleted_on_vhost_delete, + single_vhost_storage_delete_is_safe + ]. + + + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config( + Config, + [{rmq_nodename_suffix, ?MODULE}]), + Config2 = rabbit_ct_helpers:merge_app_env( + Config1, + {rabbit, [{queue_index_embed_msgs_below, 100}]}), + rabbit_ct_helpers:run_setup_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(_, Config) -> + Vhost1 = <<"vhost1">>, + Vhost2 = <<"vhost2">>, + rabbit_ct_broker_helpers:add_vhost(Config, Vhost1), + rabbit_ct_broker_helpers:add_vhost(Config, Vhost2), + Chan1 = open_channel(Vhost1, Config), + Chan2 = open_channel(Vhost2, Config), + rabbit_ct_helpers:set_config( + Config, + [{vhost1, Vhost1}, {vhost2, Vhost2}, + {channel1, Chan1}, {channel2, Chan2}]). + +end_per_testcase(single_vhost_storage_delete_is_safe, Config) -> + Config; +end_per_testcase(_, Config) -> + Vhost1 = ?config(vhost1, Config), + Vhost2 = ?config(vhost2, Config), + rabbit_ct_broker_helpers:delete_vhost(Config, Vhost1), + rabbit_ct_broker_helpers:delete_vhost(Config, Vhost2), + Config. + +publish_to_different_dirs(Config) -> + Vhost1 = ?config(vhost1, Config), + Vhost2 = ?config(vhost2, Config), + Channel1 = ?config(channel1, Config), + Channel2 = ?config(channel2, Config), + Queue1 = declare_durable_queue(Channel1), + Queue2 = declare_durable_queue(Channel2), + FolderSize1 = get_folder_size(Vhost1, Config), + FolderSize2 = get_folder_size(Vhost2, Config), + + %% Publish message to a queue index + publish_persistent_messages(index, Channel1, Queue1), + %% First storage increased + FolderSize11 = get_folder_size(Vhost1, Config), + true = (FolderSize1 < FolderSize11), + %% Second storage didn't increased + FolderSize2 = get_folder_size(Vhost2, Config), + + %% Publish message to a message store + publish_persistent_messages(store, Channel1, Queue1), + %% First storage increased + FolderSize12 = get_folder_size(Vhost1, Config), + true = (FolderSize11 < FolderSize12), + %% Second storage didn't increased + FolderSize2 = get_folder_size(Vhost2, Config), + + %% Publish message to a queue index + publish_persistent_messages(index, Channel2, Queue2), + %% First storage increased + FolderSize21 = get_folder_size(Vhost2, Config), + true = (FolderSize2 < FolderSize21), + %% Second storage didn't increased + FolderSize12 = get_folder_size(Vhost1, Config), + + %% Publish message to a message store + publish_persistent_messages(store, Channel2, Queue2), + %% Second storage increased + FolderSize22 = get_folder_size(Vhost2, Config), + true = (FolderSize21 < FolderSize22), + %% First storage didn't increased + FolderSize12 = get_folder_size(Vhost1, Config). + +storage_deleted_on_vhost_delete(Config) -> + Vhost1 = ?config(vhost1, Config), + Channel1 = ?config(channel1, Config), + Queue1 = declare_durable_queue(Channel1), + FolderSize = get_global_folder_size(Config), + + publish_persistent_messages(index, Channel1, Queue1), + publish_persistent_messages(store, Channel1, Queue1), + FolderSizeAfterPublish = get_global_folder_size(Config), + + %% Total storage size increased + true = (FolderSize < FolderSizeAfterPublish), + + ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost1), + + %% Total memory reduced + FolderSizeAfterDelete = get_global_folder_size(Config), + true = (FolderSizeAfterPublish > FolderSizeAfterDelete), + + %% There is no Vhost1 folder + 0 = get_folder_size(Vhost1, Config). + + +single_vhost_storage_delete_is_safe(Config) -> +ct:pal("Start test 3", []), + Vhost1 = ?config(vhost1, Config), + Vhost2 = ?config(vhost2, Config), + Channel1 = ?config(channel1, Config), + Channel2 = ?config(channel2, Config), + Queue1 = declare_durable_queue(Channel1), + Queue2 = declare_durable_queue(Channel2), + + %% Publish messages to both stores + publish_persistent_messages(index, Channel1, Queue1), + publish_persistent_messages(store, Channel1, Queue1), + publish_persistent_messages(index, Channel2, Queue2), + publish_persistent_messages(store, Channel2, Queue2), + + queue_is_not_empty(Channel2, Queue2), + % Vhost2Dir = vhost_dir(Vhost2, Config), + % [StoreFile] = filelib:wildcard(binary_to_list(filename:join([Vhost2Dir, "msg_store_persistent_*", "0.rdq"]))), + % ct:pal("Store file ~p~n", [file:read_file(StoreFile)]). +% ok. + rabbit_ct_broker_helpers:stop_broker(Config, 0), + delete_vhost_data(Vhost1, Config), + rabbit_ct_broker_helpers:start_broker(Config, 0), + + Channel11 = open_channel(Vhost1, Config), + Channel21 = open_channel(Vhost2, Config), + + %% There are no Vhost1 messages + queue_is_empty(Channel11, Queue1), + + %% But Vhost2 messages are in place + queue_is_not_empty(Channel21, Queue2), + consume_messages(index, Channel21, Queue2), + consume_messages(store, Channel21, Queue2). + +declare_durable_queue(Channel) -> + QName = list_to_binary(erlang:ref_to_list(make_ref())), + #'queue.declare_ok'{queue = QName} = + amqp_channel:call(Channel, + #'queue.declare'{queue = QName, durable = true}), + QName. + +publish_persistent_messages(Storage, Channel, Queue) -> + MessagePayload = case Storage of + index -> binary:copy(<<"=">>, 50); + store -> binary:copy(<<"-">>, 150) + end, + amqp_channel:call(Channel, #'confirm.select'{}), + [amqp_channel:call(Channel, + #'basic.publish'{routing_key = Queue}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = MessagePayload}) + || _ <- lists:seq(1, ?MSGS_COUNT)], + amqp_channel:wait_for_confirms(Channel). + + +get_folder_size(Vhost, Config) -> + Dir = vhost_dir(Vhost, Config), + folder_size(Dir). + +folder_size(Dir) -> + filelib:fold_files(Dir, ".*", true, + fun(F,Acc) -> filelib:file_size(F) + Acc end, 0). + +get_global_folder_size(Config) -> + BaseDir = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_mnesia, dir, []), + folder_size(BaseDir). + +vhost_dir(Vhost, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_vhost, msg_store_dir_path, [Vhost]). + +delete_vhost_data(Vhost, Config) -> + Dir = vhost_dir(Vhost, Config), + rabbit_file:recursive_delete([Dir]). + +queue_is_empty(Channel, Queue) -> + #'queue.declare_ok'{queue = Queue, message_count = 0} = + amqp_channel:call(Channel, + #'queue.declare'{ queue = Queue, + durable = true, + passive = true}). + +queue_is_not_empty(Channel, Queue) -> + #'queue.declare_ok'{queue = Queue, message_count = MsgCount} = + amqp_channel:call(Channel, + #'queue.declare'{ queue = Queue, + durable = true, + passive = true}), + ExpectedCount = ?MSGS_COUNT * 2, + ExpectedCount = MsgCount. + +consume_messages(Storage, Channel, Queue) -> + MessagePayload = case Storage of + index -> binary:copy(<<"=">>, 50); + store -> binary:copy(<<"-">>, 150) + end, + lists:foreach( + fun(I) -> + ct:pal("Consume message ~p~n ~p~n", [I, MessagePayload]), + {#'basic.get_ok'{}, Content} = + amqp_channel:call(Channel, + #'basic.get'{queue = Queue, + no_ack = true}), + #amqp_msg{payload = MessagePayload} = Content + end, + lists:seq(1, ?MSGS_COUNT)), + ok. + +open_channel(Vhost, Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {ok, Conn} = amqp_connection:start( + #amqp_params_direct{node = Node, virtual_host = Vhost}), + {ok, Chan} = amqp_connection:open_channel(Conn), + Chan. diff --git a/test/plugins_SUITE.erl b/test/plugins_SUITE.erl deleted file mode 100644 index 8896298df1..0000000000 --- a/test/plugins_SUITE.erl +++ /dev/null @@ -1,80 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2016 Pivotal Software, Inc. All rights reserved. -%% - --module(plugins_SUITE). - --include_lib("common_test/include/ct.hrl"). - --compile(export_all). - -all() -> - [ - active_with_single_plugin_dir, - active_with_multiple_plugin_dirs - ]. - -%% ------------------------------------------------------------------- -%% Testsuite setup/teardown. -%% ------------------------------------------------------------------- - -init_per_suite(Config) -> - rabbit_ct_helpers:log_environment(), - application:load(rabbit), - rabbit_ct_helpers:run_setup_steps(Config). - -end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config). - -init_per_group(_, Config) -> - Config. - -end_per_group(_, Config) -> - Config. - -init_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_started(Config, Testcase). - -end_per_testcase(Testcase, Config) -> - rabbit_ct_helpers:testcase_finished(Config, Testcase). - -%% ------------------------------------------------------------------- -%% Testcases. -%% ------------------------------------------------------------------- - -active_with_single_plugin_dir(Config) -> - DataDir = rabbit_ct_helpers:get_config(Config, data_dir), - PluginsDir1 = filename:join(DataDir, "plugins1"), - - true = code:add_path(filename:join([PluginsDir1, - "mock_rabbitmq_plugins_01-0.1.0.ez", - "mock_rabbitmq_plugins_01-0.1.0", "ebin"])), - {ok, _} = application:ensure_all_started(mock_rabbitmq_plugins_01), - application:set_env(rabbit, plugins_dir, PluginsDir1), - - [mock_rabbitmq_plugins_01] = rabbit_plugins:active(). - -active_with_multiple_plugin_dirs(Config) -> - DataDir = rabbit_ct_helpers:get_config(Config, data_dir), - PluginsDir1 = filename:join(DataDir, "plugins1"), - PluginsDir2 = filename:join(DataDir, "plugins2"), - - true = code:add_path(filename:join([PluginsDir1, - "mock_rabbitmq_plugins_01-0.1.0.ez", - "mock_rabbitmq_plugins_01-0.1.0", "ebin"])), - {ok, _} = application:ensure_all_started(mock_rabbitmq_plugins_01), - application:set_env(rabbit, plugins_dir, PluginsDir1 ++ ":" ++ PluginsDir2), - - [mock_rabbitmq_plugins_01] = rabbit_plugins:active(). diff --git a/test/plugins_SUITE_data/plugins1/mock_rabbitmq_plugins_01-0.1.0.ez b/test/plugins_SUITE_data/plugins1/mock_rabbitmq_plugins_01-0.1.0.ez Binary files differdeleted file mode 100644 index 40cba9f16b..0000000000 --- a/test/plugins_SUITE_data/plugins1/mock_rabbitmq_plugins_01-0.1.0.ez +++ /dev/null diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl index ef85472f48..bbc4447102 100644 --- a/test/rabbitmqctl_integration_SUITE.erl +++ b/test/rabbitmqctl_integration_SUITE.erl @@ -31,7 +31,6 @@ -export([list_queues_local/1 ,list_queues_offline/1 ,list_queues_online/1 - ,manage_global_parameters/1 ]). all() -> @@ -46,8 +45,7 @@ groups() -> [list_queues_local ,list_queues_online ,list_queues_offline - ]}, - {global_parameters, [], [manage_global_parameters]} + ]} ]. init_per_suite(Config) -> @@ -63,13 +61,6 @@ init_per_group(list_queues, Config0) -> Config1 = declare_some_queues(Config), rabbit_ct_broker_helpers:stop_node(Config1, NumNodes - 1), Config1; -init_per_group(global_parameters,Config) -> - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, ?MODULE} - ]), - rabbit_ct_helpers:run_setup_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()); init_per_group(_, Config) -> Config. @@ -144,75 +135,6 @@ list_queues_offline(Config) -> assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues), ok. -manage_global_parameters(Config) -> - 0 = length(global_parameters(Config)), - Parameter1Key = global_param1, - GlobalParameter1ValueAsString = "{\"a\":\"b\", \"c\":\"d\"}", - ok = control_action(Config, set_global_parameter, - [atom_to_list(Parameter1Key), - GlobalParameter1ValueAsString - ]), - - 1 = length(global_parameters(Config)), - - GlobalParameter1Value = rabbit_ct_broker_helpers:rpc( - Config, 0, - rabbit_runtime_parameters, value_global, - [Parameter1Key] - ), - - [{<<"a">>,<<"b">>}, {<<"c">>,<<"d">>}] = GlobalParameter1Value, - - Parameter2Key = global_param2, - GlobalParameter2ValueAsString = "{\"e\":\"f\", \"g\":\"h\"}", - ok = control_action(Config, set_global_parameter, - [atom_to_list(Parameter2Key), - GlobalParameter2ValueAsString - ]), - - 2 = length(global_parameters(Config)), - - GlobalParameter2Value = rabbit_ct_broker_helpers:rpc( - Config, 0, - rabbit_runtime_parameters, value_global, - [Parameter2Key] - ), - - [{<<"e">>,<<"f">>}, {<<"g">>,<<"h">>}] = GlobalParameter2Value, - - - GlobalParameter1Value2AsString = "{\"a\":\"z\", \"c\":\"d\"}", - ok = control_action(Config, set_global_parameter, - [atom_to_list(Parameter1Key), - GlobalParameter1Value2AsString - ]), - - 2 = length(global_parameters(Config)), - - GlobalParameter1Value2 = rabbit_ct_broker_helpers:rpc( - Config, 0, - rabbit_runtime_parameters, value_global, - [Parameter1Key] - ), - - [{<<"a">>,<<"z">>}, {<<"c">>,<<"d">>}] = GlobalParameter1Value2, - - ok = control_action(Config, clear_global_parameter, - [atom_to_list(Parameter1Key)] - ), - - 1 = length(global_parameters(Config)), - - not_found = rabbit_ct_broker_helpers:rpc( - Config, 0, - rabbit_runtime_parameters, value_global, - [Parameter1Key] - ), - - ok = control_action(Config, list_global_parameters, []), - - ok. - %%---------------------------------------------------------------------------- %% Helpers %%---------------------------------------------------------------------------- @@ -234,11 +156,7 @@ run_list_queues(Config, Node, Args) -> control_action(Config, Command, Args) -> Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - rabbit_control_main:action( - Command, Node, Args, [], - fun (Format, Args1) -> - io:format(Format ++ " ...~n", Args1) - end). + rabbit_ct_broker_helpers:control_action(Command, Node, Args, []). global_parameters(Config) -> rabbit_ct_broker_helpers:rpc( diff --git a/test/sync_detection_SUITE.erl b/test/sync_detection_SUITE.erl index 1e0a66e8fd..3e5ed8208b 100644 --- a/test/sync_detection_SUITE.erl +++ b/test/sync_detection_SUITE.erl @@ -210,7 +210,7 @@ slave_pids(Node, Queue) -> _ -> Pids end. -%% The mnesia syncronization takes a while, but we don't want to wait for the +%% The mnesia synchronization takes a while, but we don't want to wait for the %% test to fail, since the timetrap is quite high. wait_for_sync_status(Status, Node, Queue) -> Max = 10000 / ?LOOP_RECURSION_DELAY, diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl index a7f2ef4603..af86371fc2 100644 --- a/test/unit_inbroker_SUITE.erl +++ b/test/unit_inbroker_SUITE.erl @@ -22,8 +22,8 @@ -compile(export_all). --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost). +-define(TRANSIENT_MSG_STORE, msg_store_transient_vhost). -define(TIMEOUT_LIST_OPS_PASS, 5000). -define(TIMEOUT, 30000). @@ -70,7 +70,8 @@ all() -> [ {group, parallel_tests}, - {group, non_parallel_tests}, + {group, non_parallel_tests} + , {group, backing_queue_tests}, {group, cluster_tests}, @@ -88,24 +89,12 @@ groups() -> credit_flow_settings, dynamic_mirroring, gen_server2_with_state, - list_operations_timeout_pass, mcall, {password_hashing, [], [ password_hashing, change_password ]}, - {policy_validation, [parallel, {repeat, 20}], [ - ha_policy_validation, - policy_validation, - policy_opts_validation, - queue_master_location_policy_validation, - queue_modes_policy_validation, - vhost_removed_while_updating_policy - ]}, - runtime_parameters, - set_disk_free_limit_command, - topic_matching, - user_management + topic_matching ]}, {non_parallel_tests, [], [ app_management, %% Restart RabbitMQ. @@ -115,9 +104,7 @@ groups() -> head_message_timestamp_statistics, %% Expect specific statistics. log_management, %% Check log files. log_management_during_startup, %% Check log files. - memory_high_watermark, %% Trigger alarm. - externally_rotated_logs_are_automatically_reopened, %% Check log files. - server_status %% Trigger alarm. + externally_rotated_logs_are_automatically_reopened %% Check log files. ]}, {backing_queue_tests, [], [ msg_store, @@ -265,25 +252,41 @@ app_management(Config) -> ?MODULE, app_management1, [Config]). app_management1(_Config) -> - control_action(wait, [os:getenv("RABBITMQ_PID_FILE")]), + wait_for_application(rabbit), %% Starting, stopping and diagnostics. Note that we don't try %% 'report' when the rabbit app is stopped and that we enable %% tracing for the duration of this function. - ok = control_action(trace_on, []), - ok = control_action(stop_app, []), - ok = control_action(stop_app, []), - ok = control_action(status, []), - ok = control_action(cluster_status, []), - ok = control_action(environment, []), - ok = control_action(start_app, []), - ok = control_action(start_app, []), - ok = control_action(status, []), - ok = control_action(report, []), - ok = control_action(cluster_status, []), - ok = control_action(environment, []), - ok = control_action(trace_off, []), + ok = rabbit_trace:start(<<"/">>), + ok = rabbit:stop(), + ok = rabbit:stop(), + ok = no_exceptions(rabbit, status, []), + ok = no_exceptions(rabbit, environment, []), + ok = rabbit:start(), + ok = rabbit:start(), + ok = no_exceptions(rabbit, status, []), + ok = no_exceptions(rabbit, environment, []), + ok = rabbit_trace:stop(<<"/">>), passed. +no_exceptions(Mod, Fun, Args) -> + try erlang:apply(Mod, Fun, Args) of _ -> ok + catch Type:Ex -> {Type, Ex} + end. + +wait_for_application(Application) -> + wait_for_application(Application, 5000). + +wait_for_application(_, Time) when Time =< 0 -> + {error, timeout}; +wait_for_application(Application, Time) -> + Interval = 100, + case lists:keyfind(Application, 1, application:which_applications()) of + false -> + timer:sleep(Interval), + wait_for_application(Application, Time - Interval); + _ -> ok + end. + %% ------------------------------------------------------------------- %% Message store. %% ------------------------------------------------------------------- @@ -347,7 +350,7 @@ msg_store1(_Config) -> %% stop and restart, preserving every other msg in 2nd half ok = rabbit_variable_queue:stop_msg_store(), ok = rabbit_variable_queue:start_msg_store( - [], {fun ([]) -> finished; + #{}, {fun ([]) -> finished; ([MsgId|MsgIdsTail]) when length(MsgIdsTail) rem 2 == 0 -> {MsgId, 1, MsgIdsTail}; @@ -468,10 +471,10 @@ on_disk_stop(Pid) -> msg_store_client_init_capture(MsgStore, Ref) -> Pid = spawn(fun on_disk_capture/0), - {Pid, rabbit_msg_store:client_init( + {Pid, rabbit_msg_store_vhost_sup:client_init( MsgStore, Ref, fun (MsgIds, _ActionTaken) -> Pid ! {on_disk, MsgIds} - end, undefined)}. + end, undefined, <<"/">>)}. msg_store_contains(Atom, MsgIds, MSCState) -> Atom = lists:foldl( @@ -548,14 +551,14 @@ test_msg_store_confirm_timer() -> Ref = rabbit_guid:gen(), MsgId = msg_id_bin(1), Self = self(), - MSCState = rabbit_msg_store:client_init( + MSCState = rabbit_msg_store_vhost_sup:client_init( ?PERSISTENT_MSG_STORE, Ref, fun (MsgIds, _ActionTaken) -> case gb_sets:is_member(MsgId, MsgIds) of true -> Self ! on_disk; false -> ok end - end, undefined), + end, undefined, <<"/">>), ok = msg_store_write([MsgId], MSCState), ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false), ok = msg_store_remove([MsgId], MSCState), @@ -1424,7 +1427,7 @@ nop(_) -> ok. nop(_, _) -> ok. msg_store_client_init(MsgStore, Ref) -> - rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). + rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>). variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( @@ -1842,7 +1845,7 @@ log_management(Config) -> ?MODULE, log_management1, [Config]). log_management1(_Config) -> - [LogFile] = rabbit:log_locations(), + [LogFile|_] = rabbit:log_locations(), Suffix = ".0", ok = test_logs_working([LogFile]), @@ -1852,7 +1855,7 @@ log_management1(_Config) -> ok = test_logs_working([LogFile]), %% simple log rotation - ok = control_action(rotate_logs, []), + ok = rabbit:rotate_logs(), %% FIXME: rabbit:rotate_logs/0 is asynchronous due to a limitation %% in Lager. Therefore, we have no choice but to wait an arbitrary %% amount of time. @@ -1862,53 +1865,53 @@ log_management1(_Config) -> %% log rotation on empty files ok = clean_logs([LogFile], Suffix), - ok = control_action(rotate_logs, []), + ok = rabbit:rotate_logs(), timer:sleep(2000), [{error, enoent}, true] = non_empty_files([LogFile ++ Suffix, LogFile]), %% logs with suffix are not writable - ok = control_action(rotate_logs, []), + ok = rabbit:rotate_logs(), timer:sleep(2000), ok = make_files_non_writable([LogFile ++ Suffix]), - ok = control_action(rotate_logs, []), + ok = rabbit:rotate_logs(), timer:sleep(2000), ok = test_logs_working([LogFile]), %% rotate when original log files are not writable ok = make_files_non_writable([LogFile]), - ok = control_action(rotate_logs, []), + ok = rabbit:rotate_logs(), timer:sleep(2000), %% logging directed to tty (first, remove handlers) - ok = control_action(stop_app, []), + ok = rabbit:stop(), ok = clean_logs([LogFile], Suffix), ok = application:set_env(rabbit, lager_handler, tty), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = control_action(start_app, []), + ok = rabbit:start(), timer:sleep(200), rabbit_log:info("test info"), [{error, enoent}] = empty_files([LogFile]), %% rotate logs when logging is turned off - ok = control_action(stop_app, []), + ok = rabbit:stop(), ok = clean_logs([LogFile], Suffix), ok = application:set_env(rabbit, lager_handler, false), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = control_action(start_app, []), + ok = rabbit:start(), timer:sleep(200), rabbit_log:error("test error"), timer:sleep(200), [{error, enoent}] = empty_files([LogFile]), %% cleanup - ok = control_action(stop_app, []), + ok = rabbit:stop(), ok = clean_logs([LogFile], Suffix), ok = application:set_env(rabbit, lager_handler, LogFile), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = control_action(start_app, []), + ok = rabbit:start(), ok = test_logs_working([LogFile]), passed. @@ -1917,84 +1920,80 @@ log_management_during_startup(Config) -> ?MODULE, log_management_during_startup1, [Config]). log_management_during_startup1(_Config) -> - [LogFile] = rabbit:log_locations(), + [LogFile|_] = rabbit:log_locations(), Suffix = ".0", %% start application with simple tty logging - ok = control_action(stop_app, []), + ok = rabbit:stop(), ok = clean_logs([LogFile], Suffix), ok = application:set_env(rabbit, lager_handler, tty), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = control_action(start_app, []), + ok = rabbit:start(), %% start application with logging to non-existing directory NonExistent = "/tmp/non-existent/test.log", delete_file(NonExistent), delete_file(filename:dirname(NonExistent)), - ok = control_action(stop_app, []), + ok = rabbit:stop(), ok = application:set_env(rabbit, lager_handler, NonExistent), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = control_action(start_app, []), + ok = rabbit:start(), %% start application with logging to directory with no %% write permissions - ok = control_action(stop_app, []), + ok = rabbit:stop(), NoPermission1 = "/var/empty/test.log", delete_file(NoPermission1), delete_file(filename:dirname(NoPermission1)), - ok = control_action(stop_app, []), + ok = rabbit:stop(), ok = application:set_env(rabbit, lager_handler, NoPermission1), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = case control_action(start_app, []) of - ok -> exit({got_success_but_expected_failure, - log_rotation_no_write_permission_dir_test}); - {badrpc, - {'EXIT', {error, {cannot_log_to_file, _, Reason1}}}} - when Reason1 =:= enoent orelse Reason1 =:= eacces -> ok; - {badrpc, - {'EXIT', - {error, {cannot_log_to_file, _, - {cannot_create_parent_dirs, _, Reason1}}}}} - when Reason1 =:= eperm orelse - Reason1 =:= eacces orelse - Reason1 =:= enoent-> ok - end, + ok = try rabbit:start() of + ok -> exit({got_success_but_expected_failure, + log_rotation_no_write_permission_dir_test}) + catch + _:{error, {cannot_log_to_file, _, Reason1}} + when Reason1 =:= enoent orelse Reason1 =:= eacces -> ok; + _:{error, {cannot_log_to_file, _, + {cannot_create_parent_dirs, _, Reason1}}} + when Reason1 =:= eperm orelse + Reason1 =:= eacces orelse + Reason1 =:= enoent-> ok + end, %% start application with logging to a subdirectory which %% parent directory has no write permissions NoPermission2 = "/var/empty/non-existent/test.log", delete_file(NoPermission2), delete_file(filename:dirname(NoPermission2)), - case control_action(stop_app, []) of + case rabbit:stop() of ok -> ok; {error, lager_not_running} -> ok end, ok = application:set_env(rabbit, lager_handler, NoPermission2), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = case control_action(start_app, []) of - ok -> exit({got_success_but_expected_failure, - log_rotatation_parent_dirs_test}); - {badrpc, - {'EXIT', {error, {cannot_log_to_file, _, Reason2}}}} - when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok; - {badrpc, - {'EXIT', - {error, {cannot_log_to_file, _, - {cannot_create_parent_dirs, _, Reason2}}}}} - when Reason2 =:= eperm orelse - Reason2 =:= eacces orelse - Reason2 =:= enoent-> ok - end, + ok = try rabbit:start() of + ok -> exit({got_success_but_expected_failure, + log_rotatation_parent_dirs_test}) + catch + _:{error, {cannot_log_to_file, _, Reason2}} + when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok; + _:{error, {cannot_log_to_file, _, + {cannot_create_parent_dirs, _, Reason2}}} + when Reason2 =:= eperm orelse + Reason2 =:= eacces orelse + Reason2 =:= enoent-> ok + end, %% cleanup ok = application:set_env(rabbit, lager_handler, LogFile), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = control_action(start_app, []), + ok = rabbit:start(), passed. externally_rotated_logs_are_automatically_reopened(Config) -> @@ -2002,7 +2001,7 @@ externally_rotated_logs_are_automatically_reopened(Config) -> ?MODULE, externally_rotated_logs_are_automatically_reopened1, [Config]). externally_rotated_logs_are_automatically_reopened1(_Config) -> - [LogFile] = rabbit:log_locations(), + [LogFile|_] = rabbit:log_locations(), %% Make sure log file is opened ok = test_logs_working([LogFile]), @@ -2185,599 +2184,6 @@ change_password1(_Config) -> %% rabbitmqctl. %% ------------------------------------------------------------------- -list_operations_timeout_pass(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, list_operations_timeout_pass1, [Config]). - -list_operations_timeout_pass1(Config) -> - %% create a few things so there is some useful information to list - {_Writer1, Limiter1, Ch1} = rabbit_ct_broker_helpers:test_channel(), - {_Writer2, Limiter2, Ch2} = rabbit_ct_broker_helpers:test_channel(), - - [Q, Q2] = [Queue || Name <- [<<"list_operations_timeout_pass-q1">>, - <<"list_operations_timeout_pass-q2">>], - {new, Queue = #amqqueue{}} <- - [rabbit_amqqueue:declare( - rabbit_misc:r(<<"/">>, queue, Name), - false, false, [], none)]], - - ok = rabbit_amqqueue:basic_consume( - Q, true, Ch1, Limiter1, false, 0, <<"ctag1">>, true, [], - undefined), - ok = rabbit_amqqueue:basic_consume( - Q2, true, Ch2, Limiter2, false, 0, <<"ctag2">>, true, [], - undefined), - - %% list users - ok = control_action(add_user, - ["list_operations_timeout_pass-user", - "list_operations_timeout_pass-password"]), - {error, {user_already_exists, _}} = - control_action(add_user, - ["list_operations_timeout_pass-user", - "list_operations_timeout_pass-password"]), - ok = control_action_t(list_users, [], ?TIMEOUT_LIST_OPS_PASS), - - %% list parameters - ok = dummy_runtime_parameters:register(), - ok = control_action(set_parameter, ["test", "good", "123"]), - ok = control_action_t(list_parameters, [], ?TIMEOUT_LIST_OPS_PASS), - ok = control_action(clear_parameter, ["test", "good"]), - dummy_runtime_parameters:unregister(), - - %% list vhosts - ok = control_action(add_vhost, ["/list_operations_timeout_pass-vhost"]), - {error, {vhost_already_exists, _}} = - control_action(add_vhost, ["/list_operations_timeout_pass-vhost"]), - ok = control_action_t(list_vhosts, [], ?TIMEOUT_LIST_OPS_PASS), - - %% list permissions - ok = control_action(set_permissions, - ["list_operations_timeout_pass-user", ".*", ".*", ".*"], - [{"-p", "/list_operations_timeout_pass-vhost"}]), - ok = control_action_t(list_permissions, [], - [{"-p", "/list_operations_timeout_pass-vhost"}], - ?TIMEOUT_LIST_OPS_PASS), - - %% list user permissions - ok = control_action_t(list_user_permissions, - ["list_operations_timeout_pass-user"], - ?TIMEOUT_LIST_OPS_PASS), - - %% list policies - ok = control_action_opts( - ["set_policy", "list_operations_timeout_pass-policy", ".*", - "{\"ha-mode\":\"all\"}"]), - ok = control_action_t(list_policies, [], ?TIMEOUT_LIST_OPS_PASS), - ok = control_action(clear_policy, ["list_operations_timeout_pass-policy"]), - - %% list queues - ok = info_action_t(list_queues, - rabbit_amqqueue:info_keys(), false, - ?TIMEOUT_LIST_OPS_PASS), - - %% list exchanges - ok = info_action_t(list_exchanges, - rabbit_exchange:info_keys(), true, - ?TIMEOUT_LIST_OPS_PASS), - - %% list bindings - ok = info_action_t(list_bindings, - rabbit_binding:info_keys(), true, - ?TIMEOUT_LIST_OPS_PASS), - - %% list connections - H = ?config(rmq_hostname, Config), - P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - {ok, C1} = gen_tcp:connect(H, P, [binary, {active, false}]), - gen_tcp:send(C1, <<"AMQP", 0, 0, 9, 1>>), - {ok, <<1,0,0>>} = gen_tcp:recv(C1, 3, 100), - - {ok, C2} = gen_tcp:connect(H, P, [binary, {active, false}]), - gen_tcp:send(C2, <<"AMQP", 0, 0, 9, 1>>), - {ok, <<1,0,0>>} = gen_tcp:recv(C2, 3, 100), - - ok = info_action_t( - list_connections, rabbit_networking:connection_info_keys(), false, - ?TIMEOUT_LIST_OPS_PASS), - - %% list consumers - ok = info_action_t( - list_consumers, rabbit_amqqueue:consumer_info_keys(), false, - ?TIMEOUT_LIST_OPS_PASS), - - %% list channels - ok = info_action_t( - list_channels, rabbit_channel:info_keys(), false, - ?TIMEOUT_LIST_OPS_PASS), - - %% do some cleaning up - ok = control_action(delete_user, ["list_operations_timeout_pass-user"]), - {error, {no_such_user, _}} = - control_action(delete_user, ["list_operations_timeout_pass-user"]), - - ok = control_action(delete_vhost, ["/list_operations_timeout_pass-vhost"]), - {error, {no_such_vhost, _}} = - control_action(delete_vhost, ["/list_operations_timeout_pass-vhost"]), - - %% close_connection - Conns = rabbit_ct_broker_helpers:get_connection_pids([C1, C2]), - [ok, ok] = [ok = control_action( - close_connection, [rabbit_misc:pid_to_string(ConnPid), "go away"]) - || ConnPid <- Conns], - - %% cleanup queues - [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], - - [begin - unlink(Chan), - ok = rabbit_channel:shutdown(Chan) - end || Chan <- [Ch1, Ch2]], - passed. - -user_management(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, user_management1, [Config]). - -user_management1(_Config) -> - - %% lots if stuff that should fail - {error, {no_such_user, _}} = - control_action(delete_user, - ["user_management-user"]), - {error, {no_such_user, _}} = - control_action(change_password, - ["user_management-user", "user_management-password"]), - {error, {no_such_vhost, _}} = - control_action(delete_vhost, - ["/user_management-vhost"]), - {error, {no_such_user, _}} = - control_action(set_permissions, - ["user_management-user", ".*", ".*", ".*"]), - {error, {no_such_user, _}} = - control_action(clear_permissions, - ["user_management-user"]), - {error, {no_such_user, _}} = - control_action(list_user_permissions, - ["user_management-user"]), - {error, {no_such_vhost, _}} = - control_action(list_permissions, [], - [{"-p", "/user_management-vhost"}]), - {error, {invalid_regexp, _, _}} = - control_action(set_permissions, - ["guest", "+foo", ".*", ".*"]), - {error, {no_such_user, _}} = - control_action(set_user_tags, - ["user_management-user", "bar"]), - - %% user creation - ok = control_action(add_user, - ["user_management-user", "user_management-password"]), - {error, {user_already_exists, _}} = - control_action(add_user, - ["user_management-user", "user_management-password"]), - ok = control_action(clear_password, - ["user_management-user"]), - ok = control_action(change_password, - ["user_management-user", "user_management-newpassword"]), - - TestTags = fun (Tags) -> - Args = ["user_management-user" | [atom_to_list(T) || T <- Tags]], - ok = control_action(set_user_tags, Args), - {ok, #internal_user{tags = Tags}} = - rabbit_auth_backend_internal:lookup_user( - <<"user_management-user">>), - ok = control_action(list_users, []) - end, - TestTags([foo, bar, baz]), - TestTags([administrator]), - TestTags([]), - - %% user authentication - ok = control_action(authenticate_user, - ["user_management-user", "user_management-newpassword"]), - {refused, _User, _Format, _Params} = - control_action(authenticate_user, - ["user_management-user", "user_management-password"]), - - %% vhost creation - ok = control_action(add_vhost, - ["/user_management-vhost"]), - {error, {vhost_already_exists, _}} = - control_action(add_vhost, - ["/user_management-vhost"]), - ok = control_action(list_vhosts, []), - - %% user/vhost mapping - ok = control_action(set_permissions, - ["user_management-user", ".*", ".*", ".*"], - [{"-p", "/user_management-vhost"}]), - ok = control_action(set_permissions, - ["user_management-user", ".*", ".*", ".*"], - [{"-p", "/user_management-vhost"}]), - ok = control_action(set_permissions, - ["user_management-user", ".*", ".*", ".*"], - [{"-p", "/user_management-vhost"}]), - ok = control_action(list_permissions, [], - [{"-p", "/user_management-vhost"}]), - ok = control_action(list_permissions, [], - [{"-p", "/user_management-vhost"}]), - ok = control_action(list_user_permissions, - ["user_management-user"]), - - %% user/vhost unmapping - ok = control_action(clear_permissions, - ["user_management-user"], [{"-p", "/user_management-vhost"}]), - ok = control_action(clear_permissions, - ["user_management-user"], [{"-p", "/user_management-vhost"}]), - - %% vhost deletion - ok = control_action(delete_vhost, - ["/user_management-vhost"]), - {error, {no_such_vhost, _}} = - control_action(delete_vhost, - ["/user_management-vhost"]), - - %% deleting a populated vhost - ok = control_action(add_vhost, - ["/user_management-vhost"]), - ok = control_action(set_permissions, - ["user_management-user", ".*", ".*", ".*"], - [{"-p", "/user_management-vhost"}]), - {new, _} = rabbit_amqqueue:declare( - rabbit_misc:r(<<"/user_management-vhost">>, queue, - <<"user_management-vhost-queue">>), - true, false, [], none), - ok = control_action(delete_vhost, - ["/user_management-vhost"]), - - %% user deletion - ok = control_action(delete_user, - ["user_management-user"]), - {error, {no_such_user, _}} = - control_action(delete_user, - ["user_management-user"]), - - passed. - -runtime_parameters(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, runtime_parameters1, [Config]). - -runtime_parameters1(_Config) -> - dummy_runtime_parameters:register(), - Good = fun(L) -> ok = control_action(set_parameter, L) end, - Bad = fun(L) -> {error_string, _} = control_action(set_parameter, L) end, - - %% Acceptable for bijection - Good(["test", "good", "\"ignore\""]), - Good(["test", "good", "123"]), - Good(["test", "good", "true"]), - Good(["test", "good", "false"]), - Good(["test", "good", "null"]), - Good(["test", "good", "{\"key\": \"value\"}"]), - - %% Invalid json - Bad(["test", "good", "atom"]), - Bad(["test", "good", "{\"foo\": \"bar\""]), - Bad(["test", "good", "{foo: \"bar\"}"]), - - %% Test actual validation hook - Good(["test", "maybe", "\"good\""]), - Bad(["test", "maybe", "\"bad\""]), - Good(["test", "admin", "\"ignore\""]), %% ctl means 'user' -> none - - ok = control_action(list_parameters, []), - - ok = control_action(clear_parameter, ["test", "good"]), - ok = control_action(clear_parameter, ["test", "maybe"]), - ok = control_action(clear_parameter, ["test", "admin"]), - {error_string, _} = - control_action(clear_parameter, ["test", "neverexisted"]), - - %% We can delete for a component that no longer exists - Good(["test", "good", "\"ignore\""]), - dummy_runtime_parameters:unregister(), - ok = control_action(clear_parameter, ["test", "good"]), - passed. - -policy_validation(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, policy_validation1, [Config]). - -policy_validation1(_Config) -> - PolicyName = "runtime_parameters-policy", - dummy_runtime_parameters:register_policy_validator(), - SetPol = fun (Key, Val) -> - control_action_opts( - ["set_policy", PolicyName, ".*", - rabbit_misc:format("{\"~s\":~p}", [Key, Val])]) - end, - OK = fun (Key, Val) -> - ok = SetPol(Key, Val), - true = does_policy_exist(PolicyName, - [{definition, [{list_to_binary(Key), Val}]}]) - end, - - OK("testeven", []), - OK("testeven", [1, 2]), - OK("testeven", [1, 2, 3, 4]), - OK("testpos", [2, 5, 5678]), - - {error_string, _} = SetPol("testpos", [-1, 0, 1]), - {error_string, _} = SetPol("testeven", [ 1, 2, 3]), - - ok = control_action(clear_policy, [PolicyName]), - dummy_runtime_parameters:unregister_policy_validator(), - passed. - -policy_opts_validation(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, policy_opts_validation1, [Config]). - -policy_opts_validation1(_Config) -> - PolicyName = "policy_opts_validation-policy", - Set = fun (Extra) -> control_action_opts( - ["set_policy", PolicyName, - ".*", "{\"ha-mode\":\"all\"}" - | Extra]) end, - OK = fun (Extra, Props) -> - ok = Set(Extra), - true = does_policy_exist(PolicyName, Props) - end, - Fail = fun (Extra) -> - case Set(Extra) of - {error_string, _} -> ok; - no_command when Extra =:= ["--priority"] -> ok; - no_command when Extra =:= ["--apply-to"] -> ok; - {'EXIT', - {function_clause, - [{rabbit_control_main,action, _, _} | _]}} - when Extra =:= ["--offline"] -> ok - end - end, - - OK ([], [{priority, 0}, {'apply-to', <<"all">>}]), - - OK (["--priority", "0"], [{priority, 0}]), - OK (["--priority", "3"], [{priority, 3}]), - Fail(["--priority", "banana"]), - Fail(["--priority"]), - - OK (["--apply-to", "all"], [{'apply-to', <<"all">>}]), - OK (["--apply-to", "queues"], [{'apply-to', <<"queues">>}]), - Fail(["--apply-to", "bananas"]), - Fail(["--apply-to"]), - - OK (["--priority", "3", "--apply-to", "queues"], [{priority, 3}, {'apply-to', <<"queues">>}]), - Fail(["--priority", "banana", "--apply-to", "queues"]), - Fail(["--priority", "3", "--apply-to", "bananas"]), - - Fail(["--offline"]), - - ok = control_action(clear_policy, [PolicyName]), - passed. - -ha_policy_validation(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, ha_policy_validation1, [Config]). - -ha_policy_validation1(_Config) -> - PolicyName = "ha_policy_validation-policy", - Set = fun (JSON) -> control_action_opts( - ["set_policy", PolicyName, - ".*", JSON]) end, - OK = fun (JSON, Def) -> - ok = Set(JSON), - true = does_policy_exist(PolicyName, [{definition, Def}]) - end, - Fail = fun (JSON) -> {error_string, _} = Set(JSON) end, - - OK ("{\"ha-mode\":\"all\"}", [{<<"ha-mode">>, <<"all">>}]), - Fail("{\"ha-mode\":\"made_up\"}"), - - Fail("{\"ha-mode\":\"nodes\"}"), - Fail("{\"ha-mode\":\"nodes\",\"ha-params\":2}"), - Fail("{\"ha-mode\":\"nodes\",\"ha-params\":[\"a\",2]}"), - OK ("{\"ha-mode\":\"nodes\",\"ha-params\":[\"a\",\"b\"]}", - [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [<<"a">>, <<"b">>]}]), - Fail("{\"ha-params\":[\"a\",\"b\"]}"), - - Fail("{\"ha-mode\":\"exactly\"}"), - Fail("{\"ha-mode\":\"exactly\",\"ha-params\":[\"a\",\"b\"]}"), - OK ("{\"ha-mode\":\"exactly\",\"ha-params\":2}", - [{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}]), - Fail("{\"ha-params\":2}"), - - OK ("{\"ha-mode\":\"all\",\"ha-sync-mode\":\"manual\"}", - [{<<"ha-mode">>, <<"all">>}, {<<"ha-sync-mode">>, <<"manual">>}]), - OK ("{\"ha-mode\":\"all\",\"ha-sync-mode\":\"automatic\"}", - [{<<"ha-mode">>, <<"all">>}, {<<"ha-sync-mode">>, <<"automatic">>}]), - Fail("{\"ha-mode\":\"all\",\"ha-sync-mode\":\"made_up\"}"), - Fail("{\"ha-sync-mode\":\"manual\"}"), - Fail("{\"ha-sync-mode\":\"automatic\"}"), - - ok = control_action(clear_policy, [PolicyName]), - passed. - -queue_master_location_policy_validation(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, queue_master_location_policy_validation1, [Config]). - -queue_master_location_policy_validation1(_Config) -> - PolicyName = "queue_master_location_policy_validation-policy", - Set = fun (JSON) -> - control_action_opts( - ["set_policy", PolicyName, ".*", JSON]) - end, - OK = fun (JSON, Def) -> - ok = Set(JSON), - true = does_policy_exist(PolicyName, [{definition, Def}]) - end, - Fail = fun (JSON) -> {error_string, _} = Set(JSON) end, - - OK ("{\"queue-master-locator\":\"min-masters\"}", - [{<<"queue-master-locator">>, <<"min-masters">>}]), - OK ("{\"queue-master-locator\":\"client-local\"}", - [{<<"queue-master-locator">>, <<"client-local">>}]), - OK ("{\"queue-master-locator\":\"random\"}", - [{<<"queue-master-locator">>, <<"random">>}]), - Fail("{\"queue-master-locator\":\"made_up\"}"), - - ok = control_action(clear_policy, [PolicyName]), - passed. - -queue_modes_policy_validation(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, queue_modes_policy_validation1, [Config]). - -queue_modes_policy_validation1(_Config) -> - PolicyName = "queue_modes_policy_validation-policy", - Set = fun (JSON) -> - control_action_opts( - ["set_policy", PolicyName, ".*", JSON]) - end, - OK = fun (JSON, Def) -> - ok = Set(JSON), - true = does_policy_exist(PolicyName, [{definition, Def}]) - end, - Fail = fun (JSON) -> {error_string, _} = Set(JSON) end, - - OK ("{\"queue-mode\":\"lazy\"}", - [{<<"queue-mode">>, <<"lazy">>}]), - OK ("{\"queue-mode\":\"default\"}", - [{<<"queue-mode">>, <<"default">>}]), - Fail("{\"queue-mode\":\"wrong\"}"), - - ok = control_action(clear_policy, [PolicyName]), - passed. - -vhost_removed_while_updating_policy(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, vhost_removed_while_updating_policy1, [Config]). - -vhost_removed_while_updating_policy1(_Config) -> - VHost = "/vhost_removed_while_updating_policy-vhost", - PolicyName = "vhost_removed_while_updating_policy-policy", - - ok = control_action(add_vhost, [VHost]), - ok = control_action_opts( - ["set_policy", "-p", VHost, PolicyName, ".*", "{\"ha-mode\":\"all\"}"]), - true = does_policy_exist(PolicyName, []), - - %% Removing the vhost triggers the deletion of the policy. Once - %% the policy and the vhost are actually removed, RabbitMQ calls - %% update_policies() which lists policies on the given vhost. This - %% obviously fails because the vhost is gone, but the call should - %% still succeed. - ok = control_action(delete_vhost, [VHost]), - false = does_policy_exist(PolicyName, []), - - passed. - -does_policy_exist(PolicyName, Props) -> - PolicyNameBin = list_to_binary(PolicyName), - Policies = lists:filter( - fun(Policy) -> - lists:member({name, PolicyNameBin}, Policy) - end, rabbit_policy:list()), - case Policies of - [Policy] -> check_policy_props(Policy, Props); - [] -> false; - _ -> false - end. - -check_policy_props(Policy, [Prop | Rest]) -> - case lists:member(Prop, Policy) of - true -> check_policy_props(Policy, Rest); - false -> false - end; -check_policy_props(_Policy, []) -> - true. - -server_status(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, server_status1, [Config]). - -server_status1(Config) -> - %% create a few things so there is some useful information to list - {_Writer, Limiter, Ch} = rabbit_ct_broker_helpers:test_channel(), - [Q, Q2] = [Queue || {Name, Owner} <- [{<<"server_status-q1">>, none}, - {<<"server_status-q2">>, self()}], - {new, Queue = #amqqueue{}} <- - [rabbit_amqqueue:declare( - rabbit_misc:r(<<"/">>, queue, Name), - false, false, [], Owner)]], - ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, Limiter, false, 0, <<"ctag">>, true, [], undefined), - - %% list queues - ok = info_action(list_queues, - rabbit_amqqueue:info_keys(), true), - - %% as we have no way to collect output of - %% info_action/3 call, the only way we - %% can test individual queueinfoitems is by directly calling - %% rabbit_amqqueue:info/2 - [{exclusive, false}] = rabbit_amqqueue:info(Q, [exclusive]), - [{exclusive, true}] = rabbit_amqqueue:info(Q2, [exclusive]), - - %% list exchanges - ok = info_action(list_exchanges, - rabbit_exchange:info_keys(), true), - - %% list bindings - ok = info_action(list_bindings, - rabbit_binding:info_keys(), true), - %% misc binding listing APIs - [_|_] = rabbit_binding:list_for_source( - rabbit_misc:r(<<"/">>, exchange, <<"">>)), - [_] = rabbit_binding:list_for_destination( - rabbit_misc:r(<<"/">>, queue, <<"server_status-q1">>)), - [_] = rabbit_binding:list_for_source_and_destination( - rabbit_misc:r(<<"/">>, exchange, <<"">>), - rabbit_misc:r(<<"/">>, queue, <<"server_status-q1">>)), - - %% list connections - H = ?config(rmq_hostname, Config), - P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - {ok, C} = gen_tcp:connect(H, P, []), - gen_tcp:send(C, <<"AMQP", 0, 0, 9, 1>>), - timer:sleep(100), - ok = info_action(list_connections, - rabbit_networking:connection_info_keys(), false), - %% close_connection - [ConnPid] = rabbit_ct_broker_helpers:get_connection_pids([C]), - ok = control_action(close_connection, - [rabbit_misc:pid_to_string(ConnPid), "go away"]), - - %% list channels - ok = info_action(list_channels, rabbit_channel:info_keys(), false), - - %% list consumers - ok = control_action(list_consumers, []), - - %% set vm memory high watermark - HWM = vm_memory_monitor:get_vm_memory_high_watermark(), - ok = control_action(set_vm_memory_high_watermark, ["1"]), - ok = control_action(set_vm_memory_high_watermark, ["1.0"]), - %% this will trigger an alarm - ok = control_action(set_vm_memory_high_watermark, ["0.0"]), - %% reset - ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]), - - %% eval - {error_string, _} = control_action(eval, ["\""]), - {error_string, _} = control_action(eval, ["a("]), - ok = control_action(eval, ["a."]), - - %% cleanup - [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], - - unlink(Ch), - ok = rabbit_channel:shutdown(Ch), - - passed. amqp_connection_refusal(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, @@ -3395,39 +2801,6 @@ configurable_server_properties1(_Config) -> application:set_env(rabbit, server_properties, ServerProperties), passed. -memory_high_watermark(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, memory_high_watermark1, [Config]). - -memory_high_watermark1(_Config) -> - %% set vm memory high watermark - HWM = vm_memory_monitor:get_vm_memory_high_watermark(), - %% this will trigger an alarm - ok = control_action(set_vm_memory_high_watermark, - ["absolute", "2000"]), - [{{resource_limit,memory,_},[]}] = rabbit_alarm:get_alarms(), - %% reset - ok = control_action(set_vm_memory_high_watermark, - [float_to_list(HWM)]), - - passed. - -set_disk_free_limit_command(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, set_disk_free_limit_command1, [Config]). - -set_disk_free_limit_command1(_Config) -> - ok = control_action(set_disk_free_limit, - ["2000kiB"]), - 2048000 = rabbit_disk_monitor:get_disk_free_limit(), - ok = control_action(set_disk_free_limit, - ["mem_relative", "1.1"]), - ExpectedLimit = 1.1 * vm_memory_monitor:get_total_memory(), - % Total memory is unstable, so checking order - true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() < 1.2, - true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() > 0.98, - ok = control_action(set_disk_free_limit, ["50MB"]), - passed. disk_monitor(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, @@ -3715,83 +3088,6 @@ expect_event(Tag, Key, Type) -> %% rabbitmqctl helpers. %% --------------------------------------------------------------------------- -control_action(Command, Args) -> - control_action(Command, node(), Args, default_options()). - -control_action(Command, Args, NewOpts) -> - control_action(Command, node(), Args, - expand_options(default_options(), NewOpts)). - -control_action(Command, Node, Args, Opts) -> - case catch rabbit_control_main:action( - Command, Node, Args, Opts, - fun (Format, Args1) -> - io:format(Format ++ " ...~n", Args1) - end) of - ok -> - io:format("done.~n"), - ok; - {ok, Result} -> - rabbit_control_misc:print_cmd_result(Command, Result), - ok; - Other -> - io:format("failed: ~p~n", [Other]), - Other - end. - -control_action_t(Command, Args, Timeout) when is_number(Timeout) -> - control_action_t(Command, node(), Args, default_options(), Timeout). - -control_action_t(Command, Args, NewOpts, Timeout) when is_number(Timeout) -> - control_action_t(Command, node(), Args, - expand_options(default_options(), NewOpts), - Timeout). - -control_action_t(Command, Node, Args, Opts, Timeout) when is_number(Timeout) -> - case catch rabbit_control_main:action( - Command, Node, Args, Opts, - fun (Format, Args1) -> - io:format(Format ++ " ...~n", Args1) - end, Timeout) of - ok -> - io:format("done.~n"), - ok; - {ok, Result} -> - rabbit_control_misc:print_cmd_result(Command, Result), - ok; - Other -> - io:format("failed: ~p~n", [Other]), - Other - end. - -control_action_opts(Raw) -> - NodeStr = atom_to_list(node()), - case rabbit_control_main:parse_arguments(Raw, NodeStr) of - {ok, {Cmd, Opts, Args}} -> - case control_action(Cmd, node(), Args, Opts) of - ok -> ok; - Error -> Error - end; - Error -> - Error - end. - -info_action(Command, Args, CheckVHost) -> - ok = control_action(Command, []), - if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]); - true -> ok - end, - ok = control_action(Command, lists:map(fun atom_to_list/1, Args)), - {bad_argument, dummy} = control_action(Command, ["dummy"]), - ok. - -info_action_t(Command, Args, CheckVHost, Timeout) when is_number(Timeout) -> - if CheckVHost -> ok = control_action_t(Command, [], ["-p", "/"], Timeout); - true -> ok - end, - ok = control_action_t(Command, lists:map(fun atom_to_list/1, Args), Timeout), - ok. - default_options() -> [{"-p", "/"}, {"-q", "false"}]. expand_options(As, Bs) -> |
