summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xscripts/rabbitmq-defaults2
-rwxr-xr-xscripts/rabbitmq-env14
-rw-r--r--scripts/rabbitmq-env.bat14
-rwxr-xr-xscripts/rabbitmq-server6
-rw-r--r--scripts/rabbitmq-server.bat3
-rw-r--r--scripts/rabbitmq-service.bat1
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_alarm.erl2
-rw-r--r--src/rabbit_cli.erl2
-rw-r--r--src/rabbit_control_main.erl1110
-rw-r--r--src/rabbit_lager.erl19
-rw-r--r--src/rabbit_log.erl1
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_mnesia_rename.erl16
-rw-r--r--src/rabbit_msg_store.erl111
-rw-r--r--src/rabbit_msg_store_ets_index.erl10
-rw-r--r--src/rabbit_msg_store_vhost_sup.erl93
-rw-r--r--src/rabbit_plugins.erl190
-rw-r--r--src/rabbit_queue_index.erl75
-rw-r--r--src/rabbit_runtime_parameters.erl2
-rw-r--r--src/rabbit_sup.erl10
-rw-r--r--src/rabbit_upgrade.erl42
-rw-r--r--src/rabbit_variable_queue.erl289
-rw-r--r--src/rabbit_version.erl27
-rw-r--r--src/rabbit_vhost.erl32
-rw-r--r--src/rabbit_vm.erl4
-rw-r--r--test/channel_operation_timeout_test_queue.erl75
-rw-r--r--test/clustering_management_SUITE.erl28
-rw-r--r--test/eager_sync_SUITE.erl18
-rw-r--r--test/per_vhost_connection_limit_SUITE.erl2
-rw-r--r--test/per_vhost_msg_store_SUITE.erl254
-rw-r--r--test/plugins_SUITE.erl80
-rw-r--r--test/plugins_SUITE_data/plugins1/mock_rabbitmq_plugins_01-0.1.0.ezbin3280 -> 0 bytes
-rw-r--r--test/rabbitmqctl_integration_SUITE.erl86
-rw-r--r--test/sync_detection_SUITE.erl2
-rw-r--r--test/unit_inbroker_SUITE.erl880
36 files changed, 1117 insertions, 2393 deletions
diff --git a/scripts/rabbitmq-defaults b/scripts/rabbitmq-defaults
index fdcf624d1b..a4d78e6986 100755
--- a/scripts/rabbitmq-defaults
+++ b/scripts/rabbitmq-defaults
@@ -47,7 +47,7 @@ PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
# RABBIT_HOME can contain a version number, so default plugins
# directory can be hard to find if we want to package some plugin
# separately. When RABBITMQ_HOME points to a standard location where
-# it's usally being installed by package managers, we add
+# it's usually being installed by package managers, we add
# "/usr/lib/rabbitmq/plugins" to plugin search path.
case "$RABBITMQ_HOME" in
/usr/lib/rabbitmq/*)
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index d3014ecd66..f1624eddf9 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -220,6 +220,10 @@ rmq_normalize_path_var RABBITMQ_PID_FILE
[ "x" = "x$RABBITMQ_BOOT_MODULE" ] && RABBITMQ_BOOT_MODULE=${BOOT_MODULE}
+[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR}
+[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand
+rmq_normalize_path_var RABBITMQ_PLUGINS_EXPAND_DIR
+
[ "x" != "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE_source=environment
[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
rmq_normalize_path_var RABBITMQ_ENABLED_PLUGINS_FILE
@@ -232,9 +236,11 @@ rmq_normalize_path_var RABBITMQ_PLUGINS_DIR
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
[ "x" != "x$RABBITMQ_LOGS" ] && export RABBITMQ_LOGS_source=environment
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log"
+[ "x" = "x$RABBITMQ_UPGRADE_LOG" ] && RABBITMQ_UPGRADE_LOG="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}_upgrade.log"
-rmq_normalize_path_var \
- RABBITMQ_LOGS
+rmq_normalize_path_var RABBITMQ_LOGS
+
+rmq_normalize_path_var RABBITMQ_UPGRADE_LOG
[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
@@ -247,9 +253,11 @@ rmq_check_if_shared_with_mnesia \
RABBITMQ_CONFIG_FILE \
RABBITMQ_LOG_BASE \
RABBITMQ_PID_FILE \
+ RABBITMQ_PLUGINS_EXPAND_DIR \
RABBITMQ_ENABLED_PLUGINS_FILE \
RABBITMQ_PLUGINS_DIR \
- RABBITMQ_LOGS
+ RABBITMQ_LOGS \
+ RABBITMQ_UPGRADE_LOG
##--- End of overridden <var_name> variables
diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat
index 9426019003..56b2f69b2d 100644
--- a/scripts/rabbitmq-env.bat
+++ b/scripts/rabbitmq-env.bat
@@ -262,6 +262,20 @@ if "!RABBITMQ_BOOT_MODULE!"=="" (
)
)
+REM [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR}
+REM [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand
+if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
+ if "!PLUGINS_EXPAND_DIR!"=="" (
+ set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_MNESIA_BASE!\!RABBITMQ_NODENAME!-plugins-expand
+ ) else (
+ set RABBITMQ_PLUGINS_EXPAND_DIR=!PLUGINS_EXPAND_DIR!
+ )
+)
+REM FIXME: RabbitMQ removes and recreates RABBITMQ_PLUGINS_EXPAND_DIR
+REM itself. Therefore we can't create it here in advance and escape the
+REM directory name, and RABBITMQ_PLUGINS_EXPAND_DIR must not contain
+REM non-US-ASCII characters.
+
REM [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
if "!ENABLED_PLUGINS_FILE!"=="" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index ce731516d4..41d1a81332 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -155,14 +155,16 @@ RABBITMQ_LISTEN_ARG=
[ "x" != "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_LISTEN_ARG="-rabbit tcp_listeners [{\""${RABBITMQ_NODE_IP_ADDRESS}"\","${RABBITMQ_NODE_PORT}"}]"
# If $RABBITMQ_LOGS is '-', send all log messages to stdout. This is
-# particularily useful for Docker images.
+# particularly useful for Docker images.
if [ "$RABBITMQ_LOGS" = '-' ]; then
SASL_ERROR_LOGGER=tty
RABBIT_LAGER_HANDLER=tty
+ RABBITMQ_LAGER_HANDLER_UPGRADE=tty
else
SASL_ERROR_LOGGER=false
RABBIT_LAGER_HANDLER='"'${RABBITMQ_LOGS}'"'
+ RABBITMQ_LAGER_HANDLER_UPGRADE='"'${RABBITMQ_UPGRADE_LOG}'"'
fi
# Bump ETS table limit to 50000
@@ -216,8 +218,10 @@ start_rabbitmq_server() {
-sasl sasl_error_logger "$SASL_ERROR_LOGGER" \
-rabbit lager_log_root "\"$RABBITMQ_LOG_BASE\"" \
-rabbit lager_handler "$RABBIT_LAGER_HANDLER" \
+ -rabbit lager_handler_upgrade "$RABBITMQ_LAGER_HANDLER_UPGRADE" \
-rabbit enabled_plugins_file "\"$RABBITMQ_ENABLED_PLUGINS_FILE\"" \
-rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
+ -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
-os_mon start_cpu_sup false \
-os_mon start_disksup false \
-os_mon start_memsup false \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 5e9b3667bc..1fc872cd9a 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -119,7 +119,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
)
REM If $RABBITMQ_LOGS is '-', send all log messages to stdout. This is
-REM particularily useful for Docker images.
+REM particularly useful for Docker images.
if "!RABBITMQ_LOGS!" == "-" (
set SASL_ERROR_LOGGER=tty
@@ -174,6 +174,7 @@ if "!ENV_OK!"=="false" (
-rabbit lager_handler !RABBIT_LAGER_HANDLER! ^
-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 24a7ab128c..5022ec020a 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -251,6 +251,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-rabbit lager_handler !RABBIT_LAGER_HANDLER! ^
-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-rabbit windows_service_config \""!RABBITMQ_CONFIG_FILE:\=/!"\" ^
-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
diff --git a/src/rabbit.erl b/src/rabbit.erl
index d895d57ff2..e121fb3e2e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -153,6 +153,14 @@
{requires, core_initialized},
{enables, routing_ready}]}).
+-rabbit_boot_step({upgrade_queues,
+ [{description, "per-vhost message store migration"},
+ {mfa, {rabbit_upgrade,
+ maybe_migrate_queues_to_per_vhost_storage,
+ []}},
+ {requires, [core_initialized]},
+ {enables, recovery}]}).
+
-rabbit_boot_step({recovery,
[{description, "exchange, queue and binding recovery"},
{mfa, {rabbit, recover, []}},
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index dd64c6f1c8..daf2c167fa 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -16,7 +16,7 @@
%% There are two types of alarms handled by this module:
%%
%% * per-node resource (disk, memory) alarms for the whole cluster. If any node
-%% has an alarm, then all publishing should be disabled througout the
+%% has an alarm, then all publishing should be disabled across the
%% cluster until all alarms clear. When a node sets such an alarm,
%% this information is automatically propagated throughout the cluster.
%% `#alarms.alarmed_nodes' is being used to track this type of alarms.
diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl
index 65e8563ddf..f6d005be90 100644
--- a/src/rabbit_cli.erl
+++ b/src/rabbit_cli.erl
@@ -154,7 +154,7 @@ start_distribution_anon(TriesLeft, _) ->
start_distribution_anon(TriesLeft - 1, Reason)
end.
-%% Tries to start distribution with random name choosen from limited list of candidates - to
+%% Tries to start distribution with random name chosen from limited list of candidates - to
%% prevent atom table pollution on target nodes.
start_distribution() ->
rabbit_nodes:ensure_epmd(),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
deleted file mode 100644
index d96c1dd476..0000000000
--- a/src/rabbit_control_main.erl
+++ /dev/null
@@ -1,1110 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(rabbit_control_main).
--include("rabbit.hrl").
--include("rabbit_cli.hrl").
--include("rabbit_misc.hrl").
-
--export([start/0, stop/0, parse_arguments/2, action/5, action/6,
- sync_queue/1, cancel_sync_queue/1, become/1,
- purge_queue/1]).
-
--import(rabbit_misc, [rpc_call/4, rpc_call/5]).
-
--define(EXTERNAL_CHECK_INTERVAL, 1000).
-
--define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node), ?TIMEOUT_DEF]).
-
--define(COMMANDS,
- [stop,
- stop_app,
- start_app,
- wait,
- reset,
- force_reset,
- rotate_logs,
- hipe_compile,
-
- {join_cluster, [?RAM_DEF]},
- change_cluster_node_type,
- update_cluster_nodes,
- {forget_cluster_node, [?OFFLINE_DEF]},
- rename_cluster_node,
- force_boot,
- cluster_status,
- {sync_queue, [?VHOST_DEF]},
- {cancel_sync_queue, [?VHOST_DEF]},
- {purge_queue, [?VHOST_DEF]},
-
- add_user,
- delete_user,
- change_password,
- clear_password,
- authenticate_user,
- set_user_tags,
- list_users,
-
- add_vhost,
- delete_vhost,
- list_vhosts,
- {set_permissions, [?VHOST_DEF]},
- {clear_permissions, [?VHOST_DEF]},
- {list_permissions, [?VHOST_DEF]},
- list_user_permissions,
-
- {set_parameter, [?VHOST_DEF]},
- {clear_parameter, [?VHOST_DEF]},
- {list_parameters, [?VHOST_DEF]},
-
- set_global_parameter,
- clear_global_parameter,
- list_global_parameters,
-
- {set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]},
- {clear_policy, [?VHOST_DEF]},
- {set_operator_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]},
- {clear_operator_policy, [?VHOST_DEF]},
- {list_policies, [?VHOST_DEF]},
- {list_operator_policies, [?VHOST_DEF]},
-
- {set_vhost_limits, [?VHOST_DEF]},
- {clear_vhost_limits, [?VHOST_DEF]},
- {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF, ?LOCAL_DEF]},
- {list_exchanges, [?VHOST_DEF]},
- {list_bindings, [?VHOST_DEF]},
- {list_connections, [?VHOST_DEF]},
- list_channels,
- {list_consumers, [?VHOST_DEF]},
- status,
- environment,
- report,
- set_cluster_name,
- eval,
- node_health_check,
-
- close_connection,
- {trace_on, [?VHOST_DEF]},
- {trace_off, [?VHOST_DEF]},
- set_vm_memory_high_watermark,
- set_disk_free_limit,
- help,
- {encode, [?DECODE_DEF, ?CIPHER_DEF, ?HASH_DEF, ?ITERATIONS_DEF, ?LIST_CIPHERS_DEF, ?LIST_HASHES_DEF]}
- ]).
-
--define(GLOBAL_QUERIES,
- [{"Connections", rabbit_networking, connection_info_all,
- connection_info_keys},
- {"Channels", rabbit_channel, info_all, info_keys}]).
-
--define(VHOST_QUERIES,
- [{"Queues", rabbit_amqqueue, info_all, info_keys},
- {"Exchanges", rabbit_exchange, info_all, info_keys},
- {"Bindings", rabbit_binding, info_all, info_keys},
- {"Consumers", rabbit_amqqueue, consumers_all, consumer_info_keys},
- {"Permissions", rabbit_auth_backend_internal, list_vhost_permissions,
- vhost_perms_info_keys},
- {"Policies", rabbit_policy, list_formatted, info_keys},
- {"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]).
-
--define(COMMANDS_NOT_REQUIRING_APP,
- [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs,
- join_cluster, change_cluster_node_type, update_cluster_nodes,
- forget_cluster_node, rename_cluster_node, cluster_status, status,
- environment, eval, force_boot, help, hipe_compile, encode]).
-
-%% [Command | {Command, DefaultTimeoutInMilliSeconds}]
--define(COMMANDS_WITH_TIMEOUT,
- [list_user_permissions, list_policies, list_queues, list_exchanges,
- list_bindings, list_connections, list_channels, list_consumers,
- list_vhosts, list_parameters, list_global_parameters,
- purge_queue,
- {node_health_check, 70000}]).
-
-%%----------------------------------------------------------------------------
-
--spec start() -> no_return().
--spec stop() -> 'ok'.
--spec action
- (atom(), node(), [string()], [{string(), any()}],
- fun ((string(), [any()]) -> 'ok')) ->
- 'ok'.
-
--spec action
- (atom(), node(), [string()], [{string(), any()}],
- fun ((string(), [any()]) -> 'ok'), timeout()) ->
- 'ok'.
-
-%%----------------------------------------------------------------------------
-
-start() ->
- rabbit_cli:main(
- fun (Args, NodeStr) ->
- parse_arguments(Args, NodeStr)
- end,
- fun (Command, Node, Args, Opts) ->
- Quiet = proplists:get_bool(?QUIET_OPT, Opts),
- Inform = case Quiet of
- true -> fun (_Format, _Args1) -> ok end;
- false -> fun (Format, Args1) ->
- io:format(Format ++ " ...~n", Args1)
- end
- end,
- try
- T = case get_timeout(Command, Opts) of
- {ok, Timeout} ->
- Timeout;
- {error, _} ->
- %% since this is an error with user input, ignore the quiet
- %% setting
- io:format("Failed to parse provided timeout value, using ~s~n", [?RPC_TIMEOUT]),
- ?RPC_TIMEOUT
- end,
- do_action(Command, Node, Args, Opts, Inform, T)
- catch _:E -> E
- end
- end, rabbit_ctl_usage).
-
-parse_arguments(CmdLine, NodeStr) ->
- rabbit_cli:parse_arguments(
- ?COMMANDS, ?GLOBAL_DEFS(NodeStr), ?NODE_OPT, CmdLine).
-
-print_report(Node, {Descr, Module, InfoFun, KeysFun}) ->
- io:format("~s:~n", [Descr]),
- print_report0(Node, {Module, InfoFun, KeysFun}, []).
-
-print_report(Node, {Descr, Module, InfoFun, KeysFun}, VHostArg) ->
- io:format("~s on ~s:~n", [Descr, VHostArg]),
- print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg).
-
-print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) ->
- case rpc_call(Node, Module, InfoFun, VHostArg) of
- [_|_] = Results -> InfoItems = rpc_call(Node, Module, KeysFun, []),
- display_row([atom_to_list(I) || I <- InfoItems]),
- display_info_list(Results, InfoItems);
- _ -> ok
- end,
- io:nl().
-
-get_timeout(Command, Opts) ->
- Default = case proplists:lookup(Command, ?COMMANDS_WITH_TIMEOUT) of
- none ->
- infinity;
- {Command, true} ->
- ?RPC_TIMEOUT;
- {Command, D} ->
- D
- end,
- Result = case proplists:get_value(?TIMEOUT_OPT, Opts, Default) of
- use_default ->
- parse_timeout(Default);
- Value ->
- parse_timeout(Value)
- end,
- Result.
-
-
-parse_number(N) when is_list(N) ->
- try list_to_integer(N) of
- Val -> Val
- catch error:badarg ->
- %% could have been a float, give it
- %% another shot
- list_to_float(N)
- end.
-
-parse_timeout("infinity") ->
- {ok, infinity};
-parse_timeout(infinity) ->
- {ok, infinity};
-parse_timeout(N) when is_list(N) ->
- try parse_number(N) of
- M ->
- Y = case M >= 0 of
- true -> round(M) * 1000;
- false -> ?RPC_TIMEOUT
- end,
- {ok, Y}
- catch error:badarg ->
- {error, infinity}
- end;
-parse_timeout(N) ->
- {ok, N}.
-
-announce_timeout(infinity, _Inform) ->
- %% no-op
- ok;
-announce_timeout(Timeout, Inform) when is_number(Timeout) ->
- Inform("Timeout: ~w seconds", [Timeout/1000]),
- ok.
-
-stop() ->
- ok.
-
-%%----------------------------------------------------------------------------
-
-do_action(Command, Node, Args, Opts, Inform, Timeout) ->
- case lists:member(Command, ?COMMANDS_NOT_REQUIRING_APP) of
- false ->
- case ensure_app_running(Node) of
- ok ->
- case proplists:lookup(Command, ?COMMANDS_WITH_TIMEOUT) of
- {Command, _} ->
- announce_timeout(Timeout, Inform),
- action(Command, Node, Args, Opts, Inform, Timeout);
- none ->
- action(Command, Node, Args, Opts, Inform)
- end;
- E -> E
- end;
- true ->
- action(Command, Node, Args, Opts, Inform)
- end.
-
-action(stop, Node, Args, _Opts, Inform) ->
- Inform("Stopping and halting node ~p", [Node]),
- Res = call(Node, {rabbit, stop_and_halt, []}),
- case {Res, Args} of
- {ok, [PidFile]} -> wait_for_process_death(
- read_pid_file(PidFile, false));
- {ok, [_, _| _]} -> exit({badarg, Args});
- _ -> ok
- end,
- Res;
-
-action(stop_app, Node, [], _Opts, Inform) ->
- Inform("Stopping rabbit application on node ~p", [Node]),
- call(Node, {rabbit, stop, []});
-
-action(start_app, Node, [], _Opts, Inform) ->
- Inform("Starting node ~p", [Node]),
- call(Node, {rabbit, start, []});
-
-action(reset, Node, [], _Opts, Inform) ->
- Inform("Resetting node ~p", [Node]),
- require_mnesia_stopped(Node,
- fun() ->
- call(Node, {rabbit_mnesia, reset, []})
- end);
-
-action(force_reset, Node, [], _Opts, Inform) ->
- Inform("Forcefully resetting node ~p", [Node]),
- require_mnesia_stopped(Node,
- fun() ->
- call(Node, {rabbit_mnesia, force_reset, []})
- end);
-
-action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
- ClusterNode = list_to_atom(ClusterNodeS),
- NodeType = case proplists:get_bool(?RAM_OPT, Opts) of
- true -> ram;
- false -> disc
- end,
- Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
- require_mnesia_stopped(Node,
- fun() ->
- rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, NodeType])
- end);
-
-action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) ->
- Inform("Turning ~p into a ram node", [Node]),
- require_mnesia_stopped(Node,
- fun() ->
- rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram])
- end);
-action(change_cluster_node_type, Node, [Type], _Opts, Inform)
- when Type =:= "disc" orelse Type =:= "disk" ->
- Inform("Turning ~p into a disc node", [Node]),
- require_mnesia_stopped(Node,
- fun() ->
- rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc])
- end);
-
-action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) ->
- ClusterNode = list_to_atom(ClusterNodeS),
- Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]),
- require_mnesia_stopped(Node,
- fun() ->
- rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode])
- end);
-
-action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
- ClusterNode = list_to_atom(ClusterNodeS),
- RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts),
- Inform("Removing node ~p from cluster", [ClusterNode]),
- case RemoveWhenOffline of
- true -> become(Node),
- rabbit_mnesia:forget_cluster_node(ClusterNode, true);
- false -> rpc_call(Node, rabbit_mnesia, forget_cluster_node,
- [ClusterNode, false])
- end;
-
-action(rename_cluster_node, Node, NodesS, _Opts, Inform) ->
- Nodes = split_list([list_to_atom(N) || N <- NodesS]),
- Inform("Renaming cluster nodes:~n~s~n",
- [lists:flatten([rabbit_misc:format(" ~s -> ~s~n", [F, T]) ||
- {F, T} <- Nodes])]),
- rabbit_mnesia_rename:rename(Node, Nodes);
-
-action(force_boot, Node, [], _Opts, Inform) ->
- Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]),
- case rabbit:is_running(Node) of
- false -> rabbit_mnesia:force_load_next_boot();
- true -> {error, rabbit_running}
- end;
-
-action(sync_queue, Node, [Q], Opts, Inform) ->
- VHost = proplists:get_value(?VHOST_OPT, Opts),
- QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)),
- Inform("Synchronising ~s", [rabbit_misc:rs(QName)]),
- rpc_call(Node, rabbit_control_main, sync_queue, [QName]);
-
-action(cancel_sync_queue, Node, [Q], Opts, Inform) ->
- VHost = proplists:get_value(?VHOST_OPT, Opts),
- QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)),
- Inform("Stopping synchronising ~s", [rabbit_misc:rs(QName)]),
- rpc_call(Node, rabbit_control_main, cancel_sync_queue, [QName]);
-
-action(wait, Node, [PidFile], _Opts, Inform) ->
- Inform("Waiting for ~p", [Node]),
- wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
-action(wait, Node, [PidFile, App], _Opts, Inform) ->
- Inform("Waiting for ~p on ~p", [App, Node]),
- wait_for_application(Node, PidFile, list_to_atom(App), Inform);
-
-action(status, Node, [], _Opts, Inform) ->
- Inform("Status of node ~p", [Node]),
- display_call_result(Node, {rabbit, status, []});
-
-action(cluster_status, Node, [], _Opts, Inform) ->
- Inform("Cluster status of node ~p", [Node]),
- Status = unsafe_rpc(Node, rabbit_mnesia, status, []),
- io:format("~p~n", [Status ++ [{alarms,
- [alarms_by_node(Name) || Name <- nodes_in_cluster(Node)]}]]),
- ok;
-
-action(environment, Node, _App, _Opts, Inform) ->
- Inform("Application environment of node ~p", [Node]),
- display_call_result(Node, {rabbit, environment, []});
-
-action(rotate_logs, Node, [], _Opts, Inform) ->
- Inform("Rotating logs for node ~p", [Node]),
- call(Node, {rabbit, rotate_logs, []});
-
-action(hipe_compile, _Node, [TargetDir], _Opts, _Inform) ->
- ok = application:load(rabbit),
- case rabbit_hipe:can_hipe_compile() of
- true ->
- {ok, _, _} = rabbit_hipe:compile_to_directory(TargetDir),
- ok;
- false ->
- {error, "HiPE compilation is not supported"}
- end;
-
-action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
- Inform("Closing connection \"~s\"", [PidStr]),
- rpc_call(Node, rabbit_networking, close_connection,
- [rabbit_misc:string_to_pid(PidStr), Explanation]);
-
-action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
- Inform("Creating user \"~s\"", [Username]),
- call(Node, {rabbit_auth_backend_internal, add_user, Args});
-
-action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
- Inform("Deleting user \"~s\"", Args),
- call(Node, {rabbit_auth_backend_internal, delete_user, Args});
-
-action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
- Inform("Changing password for user \"~s\"", [Username]),
- call(Node, {rabbit_auth_backend_internal, change_password, Args});
-
-action(clear_password, Node, Args = [Username], _Opts, Inform) ->
- Inform("Clearing password for user \"~s\"", [Username]),
- call(Node, {rabbit_auth_backend_internal, clear_password, Args});
-
-action(authenticate_user, Node, Args = [Username, _Password], _Opts, Inform) ->
- Inform("Authenticating user \"~s\"", [Username]),
- call(Node, {rabbit_access_control, check_user_pass_login, Args});
-
-action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) ->
- Tags = [list_to_atom(T) || T <- TagsStr],
- Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]),
- rpc_call(Node, rabbit_auth_backend_internal, set_tags,
- [list_to_binary(Username), Tags]);
-
-action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
- Inform("Creating vhost \"~s\"", Args),
- call(Node, {rabbit_vhost, add, Args});
-
-action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
- Inform("Deleting vhost \"~s\"", Args),
- call(Node, {rabbit_vhost, delete, Args});
-
-action(trace_on, Node, [], Opts, Inform) ->
- VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Starting tracing for vhost \"~s\"", [VHost]),
- rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]);
-
-action(trace_off, Node, [], Opts, Inform) ->
- VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Stopping tracing for vhost \"~s\"", [VHost]),
- rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]);
-
-action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
- Frac = list_to_float(case string:chr(Arg, $.) of
- 0 -> Arg ++ ".0";
- _ -> Arg
- end),
- Inform("Setting memory threshold on ~p to ~p", [Node, Frac]),
- rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]);
-
-action(set_vm_memory_high_watermark, Node, ["absolute", Arg], _Opts, Inform) ->
- case rabbit_resource_monitor_misc:parse_information_unit(Arg) of
- {ok, Limit} ->
- Inform("Setting memory threshold on ~p to ~p bytes", [Node, Limit]),
- rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark,
- [{absolute, Limit}]);
- {error, parse_error} ->
- {error_string, rabbit_misc:format(
- "Unable to parse absolute memory limit value ~p", [Arg])}
- end;
-
-action(set_disk_free_limit, Node, [Arg], _Opts, Inform) ->
- case rabbit_resource_monitor_misc:parse_information_unit(Arg) of
- {ok, Limit} ->
- Inform("Setting disk free limit on ~p to ~p bytes", [Node, Limit]),
- rpc_call(Node, rabbit_disk_monitor, set_disk_free_limit, [Limit]);
- {error, parse_error} ->
- {error_string, rabbit_misc:format(
- "Unable to parse disk free limit value ~p", [Arg])}
- end;
-
-action(set_disk_free_limit, Node, ["mem_relative", Arg], _Opts, Inform) ->
- Frac = list_to_float(case string:chr(Arg, $.) of
- 0 -> Arg ++ ".0";
- _ -> Arg
- end),
- Inform("Setting disk free limit on ~p to ~p of total RAM", [Node, Frac]),
- rpc_call(Node,
- rabbit_disk_monitor,
- set_disk_free_limit,
- [{mem_relative, Frac}]);
-
-
-action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
- VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Setting permissions for user \"~s\" in vhost \"~s\"",
- [Username, VHost]),
- call(Node, {rabbit_auth_backend_internal, set_permissions,
- [Username, VHost, CPerm, WPerm, RPerm]});
-
-action(clear_permissions, Node, [Username], Opts, Inform) ->
- VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Clearing permissions for user \"~s\" in vhost \"~s\"",
- [Username, VHost]),
- call(Node, {rabbit_auth_backend_internal, clear_permissions,
- [Username, VHost]});
-
-action(set_parameter, Node, [Component, Key, Value], Opts, Inform) ->
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform("Setting runtime parameter ~p for component ~p to ~p",
- [Key, Component, Value]),
- rpc_call(
- Node, rabbit_runtime_parameters, parse_set,
- [VHostArg, list_to_binary(Component), list_to_binary(Key), Value, none]);
-
-action(clear_parameter, Node, [Component, Key], Opts, Inform) ->
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]),
- rpc_call(Node, rabbit_runtime_parameters, clear, [VHostArg,
- list_to_binary(Component),
- list_to_binary(Key)]);
-
-action(set_global_parameter, Node, [Key, Value], _Opts, Inform) ->
- Inform("Setting global runtime parameter ~p to ~p", [Key, Value]),
- rpc_call(
- Node, rabbit_runtime_parameters, parse_set_global,
- [rabbit_data_coercion:to_atom(Key), rabbit_data_coercion:to_binary(Value)]
- );
-
-action(clear_global_parameter, Node, [Key], _Opts, Inform) ->
- Inform("Clearing global runtime parameter ~p", [Key]),
- rpc_call(
- Node, rabbit_runtime_parameters, clear_global,
- [rabbit_data_coercion:to_atom(Key)]
- );
-
-action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) ->
- Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p",
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts),
- ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)),
- Inform(Msg, [Key, Pattern, Defn, PriorityArg]),
- Res = rpc_call(
- Node, rabbit_policy, parse_set,
- [VHostArg, list_to_binary(Key), list_to_binary(Pattern), list_to_binary(Defn), list_to_binary(PriorityArg), ApplyToArg]),
- case Res of
- {error, Format, Args} when is_list(Format) andalso is_list(Args) ->
- {error_string, rabbit_misc:format(Format, Args)};
- _ ->
- Res
- end;
-
-action(clear_policy, Node, [Key], Opts, Inform) ->
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform("Clearing policy ~p", [Key]),
- rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]);
-
-action(set_operator_policy, Node, [Key, Pattern, Defn], Opts, Inform) ->
- Msg = "Setting operator policy override ~p for pattern ~p to ~p with priority ~p",
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts),
- ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)),
- Inform(Msg, [Key, Pattern, Defn, PriorityArg]),
- Res = rpc_call(
- Node, rabbit_policy, parse_set_op,
- [VHostArg, list_to_binary(Key), list_to_binary(Pattern), list_to_binary(Defn), list_to_binary(PriorityArg), ApplyToArg]),
- case Res of
- {error, Format, Args} when is_list(Format) andalso is_list(Args) ->
- {error_string, rabbit_misc:format(Format, Args)};
- _ ->
- Res
- end;
-
-action(clear_operator_policy, Node, [Key], Opts, Inform) ->
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform("Clearing operator policy ~p", [Key]),
- rpc_call(Node, rabbit_policy, delete_op, [VHostArg, list_to_binary(Key)]);
-
-action(set_vhost_limits, Node, [Defn], Opts, Inform) ->
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform("Setting vhost limits for vhost ~p", [VHostArg]),
- rpc_call(Node, rabbit_vhost_limit, parse_set, [VHostArg, Defn]),
- ok;
-
-action(clear_vhost_limits, Node, [], Opts, Inform) ->
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform("Clearing vhost ~p limits", [VHostArg]),
- rpc_call(Node, rabbit_vhost_limit, clear, [VHostArg]);
-
-action(report, Node, _Args, _Opts, Inform) ->
- Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
- [begin ok = action(Action, N, [], [], Inform), io:nl() end ||
- N <- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]),
- Action <- [status, cluster_status, environment]],
- VHosts = unsafe_rpc(Node, rabbit_vhost, list, []),
- [print_report(Node, Q) || Q <- ?GLOBAL_QUERIES],
- [print_report(Node, Q, [V]) || Q <- ?VHOST_QUERIES, V <- VHosts],
- ok;
-
-action(set_cluster_name, Node, [Name], _Opts, Inform) ->
- Inform("Setting cluster name to ~s", [Name]),
- rpc_call(Node, rabbit_nodes, set_cluster_name, [list_to_binary(Name)]);
-
-action(eval, Node, [Expr], _Opts, _Inform) ->
- case erl_scan:string(Expr) of
- {ok, Scanned, _} ->
- case erl_parse:parse_exprs(Scanned) of
- {ok, Parsed} -> {value, Value, _} =
- unsafe_rpc(
- Node, erl_eval, exprs, [Parsed, []]),
- io:format("~p~n", [Value]),
- ok;
- {error, E} -> {error_string, format_parse_error(E)}
- end;
- {error, E, _} ->
- {error_string, format_parse_error(E)}
- end;
-
-action(help, _Node, _Args, _Opts, _Inform) ->
- io:format("~s", [rabbit_ctl_usage:usage()]);
-
-action(encode, _Node, Args, Opts, _Inform) ->
- ListCiphers = lists:member({?LIST_CIPHERS_OPT, true}, Opts),
- ListHashes = lists:member({?LIST_HASHES_OPT, true}, Opts),
- Decode = lists:member({?DECODE_OPT, true}, Opts),
- Cipher = list_to_atom(proplists:get_value(?CIPHER_OPT, Opts)),
- Hash = list_to_atom(proplists:get_value(?HASH_OPT, Opts)),
- Iterations = list_to_integer(proplists:get_value(?ITERATIONS_OPT, Opts)),
-
- {_, Msg} = rabbit_control_pbe:encode(ListCiphers, ListHashes, Decode, Cipher, Hash, Iterations, Args),
- io:format(Msg ++ "~n");
-
-action(Command, Node, Args, Opts, Inform) ->
- %% For backward compatibility, run commands accepting a timeout with
- %% the default timeout.
- action(Command, Node, Args, Opts, Inform, ?RPC_TIMEOUT).
-
-action(purge_queue, _Node, [], _Opts, _Inform, _Timeout) ->
- {error, "purge_queue takes queue name as an argument"};
-
-action(purge_queue, Node, [Q], Opts, Inform, Timeout) ->
- VHost = proplists:get_value(?VHOST_OPT, Opts),
- QRes = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)),
- Inform("Purging ~s", [rabbit_misc:rs(QRes)]),
- rpc_call(Node, rabbit_control_main, purge_queue, [QRes], Timeout);
-
-action(list_users, Node, [], _Opts, Inform, Timeout) ->
- Inform("Listing users", []),
- call_emitter(Node, {rabbit_auth_backend_internal, list_users, []},
- rabbit_auth_backend_internal:user_info_keys(),
- [{timeout, Timeout}, to_bin_utf8]);
-
-action(list_permissions, Node, [], Opts, Inform, Timeout) ->
- VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Listing permissions in vhost \"~s\"", [VHost]),
- call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
- rabbit_auth_backend_internal:vhost_perms_info_keys(),
- [{timeout, Timeout}, to_bin_utf8, is_escaped]);
-
-action(list_parameters, Node, [], Opts, Inform, Timeout) ->
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform("Listing runtime parameters", []),
- call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
- rabbit_runtime_parameters:info_keys(),
- [{timeout, Timeout}]);
-
-action(list_global_parameters, Node, [], _Opts, Inform, Timeout) ->
- Inform("Listing global runtime parameters", []),
- call_emitter(Node, {rabbit_runtime_parameters, list_global_formatted, []},
- rabbit_runtime_parameters:global_info_keys(),
- [{timeout, Timeout}]);
-
-action(list_policies, Node, [], Opts, Inform, Timeout) ->
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform("Listing policies", []),
- call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]},
- rabbit_policy:info_keys(),
- [{timeout, Timeout}]);
-
-action(list_operator_policies, Node, [], Opts, Inform, Timeout) ->
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform("Listing policies", []),
- call_emitter(Node, {rabbit_policy, list_formatted_op, [VHostArg]},
- rabbit_policy:info_keys(),
- [{timeout, Timeout}]);
-
-
-action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
- Inform("Listing vhosts", []),
- ArgAtoms = default_if_empty(Args, [name]),
- call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms,
- [{timeout, Timeout}, to_bin_utf8]);
-
-action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) ->
- {error_string,
- "list_user_permissions expects a username argument, but none provided."};
-action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) ->
- Inform("Listing permissions for user ~p", Args),
- call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
- rabbit_auth_backend_internal:user_perms_info_keys(),
- [{timeout, Timeout}, to_bin_utf8, is_escaped]);
-
-action(list_queues, Node, Args, Opts, Inform, Timeout) ->
- case rabbit_cli:mutually_exclusive_flags(
- Opts, all, [{?ONLINE_OPT, online}
- ,{?OFFLINE_OPT, offline}
- ,{?LOCAL_OPT, local}]) of
- {ok, Filter} ->
- Inform("Listing queues", []),
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- ArgAtoms = default_if_empty(Args, [name, messages]),
-
- %% Data for emission
- Nodes = nodes_in_cluster(Node, Timeout),
- ChunksOpt = {chunks, get_number_of_chunks(Filter, Nodes)},
- TimeoutOpt = {timeout, Timeout},
- EmissionRef = make_ref(),
- EmissionRefOpt = {ref, EmissionRef},
-
- case Filter of
- all ->
- start_emission(Node, {rabbit_amqqueue, emit_info_all,
- [Nodes, VHostArg, ArgAtoms]},
- [TimeoutOpt, EmissionRefOpt]),
- start_emission(Node, {rabbit_amqqueue, emit_info_down,
- [VHostArg, ArgAtoms]},
- [TimeoutOpt, EmissionRefOpt]);
- online ->
- start_emission(Node, {rabbit_amqqueue, emit_info_all,
- [Nodes, VHostArg, ArgAtoms]},
- [TimeoutOpt, EmissionRefOpt]);
- offline ->
- start_emission(Node, {rabbit_amqqueue, emit_info_down,
- [VHostArg, ArgAtoms]},
- [TimeoutOpt, EmissionRefOpt]);
- local ->
- start_emission(Node, {rabbit_amqqueue, emit_info_local,
- [VHostArg, ArgAtoms]},
- [TimeoutOpt, EmissionRefOpt])
- end,
- display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
- {error, ErrStr} ->
- {error_string, ErrStr}
- end;
-
-action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
- Inform("Listing exchanges", []),
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- ArgAtoms = default_if_empty(Args, [name, type]),
- call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
- ArgAtoms, [{timeout, Timeout}]);
-
-action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
- Inform("Listing bindings", []),
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- ArgAtoms = default_if_empty(Args, [source_name, source_kind,
- destination_name, destination_kind,
- routing_key, arguments]),
- call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
- ArgAtoms, [{timeout, Timeout}]);
-
-action(list_connections, Node, Args, _Opts, Inform, Timeout) ->
- Inform("Listing connections", []),
- ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
- Nodes = nodes_in_cluster(Node, Timeout),
- call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]},
- ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]);
-
-action(list_channels, Node, Args, _Opts, Inform, Timeout) ->
- Inform("Listing channels", []),
- ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
- messages_unacknowledged]),
- Nodes = nodes_in_cluster(Node, Timeout),
- call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms,
- [{timeout, Timeout}, {chunks, length(Nodes)}]);
-
-action(list_consumers, Node, _Args, Opts, Inform, Timeout) ->
- Inform("Listing consumers", []),
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Nodes = nodes_in_cluster(Node, Timeout),
- call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]},
- rabbit_amqqueue:consumer_info_keys(),
- [{timeout, Timeout}, {chunks, length(Nodes)}]);
-
-action(node_health_check, Node, _Args, _Opts, Inform, Timeout) ->
- Inform("Checking health of node ~p", [Node]),
- case rabbit_health_check:node(Node, Timeout) of
- ok ->
- io:format("Health check passed~n"),
- ok;
- Other ->
- Other
- end.
-
-format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
-
-sync_queue(Q) ->
- rabbit_mirror_queue_misc:sync_queue(Q).
-
-cancel_sync_queue(Q) ->
- rabbit_mirror_queue_misc:cancel_sync_queue(Q).
-
-purge_queue(Q) ->
- rabbit_amqqueue:with(
- Q, fun(Q1) ->
- rabbit_amqqueue:purge(Q1),
- ok
- end).
-
-%%----------------------------------------------------------------------------
-
-require_mnesia_stopped(Node, Fun) ->
- case Fun() of
- {error, mnesia_unexpectedly_running} ->
- {error_string, rabbit_misc:format(
- " Mnesia is still running on node ~p.
- Please stop the node with rabbitmqctl stop_app first.", [Node])};
- Other -> Other
- end.
-
-wait_for_application(Node, PidFile, Application, Inform) ->
- Pid = read_pid_file(PidFile, true),
- Inform("pid is ~s", [Pid]),
- wait_for_application(Node, Pid, Application).
-
-wait_for_application(Node, Pid, rabbit_and_plugins) ->
- wait_for_startup(Node, Pid);
-wait_for_application(Node, Pid, Application) ->
- while_process_is_alive(
- Node, Pid, fun() -> rabbit_nodes:is_running(Node, Application) end).
-
-wait_for_startup(Node, Pid) ->
- while_process_is_alive(
- Node, Pid, fun() -> rpc:call(Node, rabbit, await_startup, []) =:= ok end).
-
-while_process_is_alive(Node, Pid, Activity) ->
- case rabbit_misc:is_os_process_alive(Pid) of
- true -> case Activity() of
- true -> ok;
- false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
- while_process_is_alive(Node, Pid, Activity)
- end;
- false -> {error, process_not_running}
- end.
-
-wait_for_process_death(Pid) ->
- case rabbit_misc:is_os_process_alive(Pid) of
- true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
- wait_for_process_death(Pid);
- false -> ok
- end.
-
-read_pid_file(PidFile, Wait) ->
- case {file:read_file(PidFile), Wait} of
- {{ok, Bin}, _} ->
- S = binary_to_list(Bin),
- {match, [PidS]} = re:run(S, "[^\\s]+",
- [{capture, all, list}]),
- try list_to_integer(PidS)
- catch error:badarg ->
- exit({error, {garbage_in_pid_file, PidFile}})
- end,
- PidS;
- {{error, enoent}, true} ->
- timer:sleep(?EXTERNAL_CHECK_INTERVAL),
- read_pid_file(PidFile, Wait);
- {{error, _} = E, _} ->
- exit({error, {could_not_read_pid, E}})
- end.
-
-become(BecomeNode) ->
- error_logger:tty(false),
- case net_adm:ping(BecomeNode) of
- pong -> exit({node_running, BecomeNode});
- pang -> ok = net_kernel:stop(),
- io:format(" * Impersonating node: ~s...", [BecomeNode]),
- {ok, _} = rabbit_cli:start_distribution(BecomeNode),
- io:format(" done~n", []),
- Dir = mnesia:system_info(directory),
- io:format(" * Mnesia directory : ~s~n", [Dir])
- end.
-
-%%----------------------------------------------------------------------------
-
-default_if_empty(List, Default) when is_list(List) ->
- if List == [] -> Default;
- true -> [list_to_atom(X) || X <- List]
- end.
-
-display_info_message_row(IsEscaped, Result, InfoItemKeys) ->
- display_row([format_info_item(
- case proplists:lookup(X, Result) of
- none when is_list(Result), length(Result) > 0 ->
- exit({error, {bad_info_key, X}});
- none -> Result;
- {X, Value} -> Value
- end, IsEscaped) || X <- InfoItemKeys]).
-
-display_info_message(IsEscaped, InfoItemKeys) ->
- fun ([], _) ->
- ok;
- ([FirstResult|_] = List, _) when is_list(FirstResult) ->
- lists:foreach(fun(Result) ->
- display_info_message_row(IsEscaped, Result, InfoItemKeys)
- end,
- List),
- ok;
- (Result, _) ->
- display_info_message_row(IsEscaped, Result, InfoItemKeys),
- ok
- end.
-
-display_info_list(Results, InfoItemKeys) when is_list(Results) ->
- lists:foreach(
- fun (Result) -> display_row(
- [format_info_item(proplists:get_value(X, Result), true)
- || X <- InfoItemKeys])
- end, lists:sort(Results)),
- ok;
-display_info_list(Other, _) ->
- Other.
-
-display_row(Row) ->
- io:fwrite(string:join(Row, "\t")),
- io:nl().
-
--define(IS_U8(X), (X >= 0 andalso X =< 255)).
--define(IS_U16(X), (X >= 0 andalso X =< 65535)).
-
-format_info_item(#resource{name = Name}, IsEscaped) ->
- escape(Name, IsEscaped);
-format_info_item({N1, N2, N3, N4} = Value, _IsEscaped) when
- ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) ->
- rabbit_misc:ntoa(Value);
-format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value, _IsEscaped) when
- ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4),
- ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) ->
- rabbit_misc:ntoa(Value);
-format_info_item(Value, _IsEscaped) when is_pid(Value) ->
- rabbit_misc:pid_to_string(Value);
-format_info_item(Value, IsEscaped) when is_binary(Value) ->
- escape(Value, IsEscaped);
-format_info_item(Value, IsEscaped) when is_atom(Value) ->
- escape(atom_to_list(Value), IsEscaped);
-format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
- Value, IsEscaped) when is_binary(TableEntryKey) andalso
- is_atom(TableEntryType) ->
- io_lib:format("~1000000000000p", [prettify_amqp_table(Value, IsEscaped)]);
-format_info_item([T | _] = Value, IsEscaped)
- when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse
- is_list(T) ->
- "[" ++
- lists:nthtail(2, lists:append(
- [", " ++ format_info_item(E, IsEscaped)
- || E <- Value])) ++ "]";
-format_info_item({Key, Value}, IsEscaped) ->
- "{" ++ io_lib:format("~p", [Key]) ++ ", " ++
- format_info_item(Value, IsEscaped) ++ "}";
-format_info_item(Value, _IsEscaped) ->
- io_lib:format("~w", [Value]).
-
-display_call_result(Node, MFA) ->
- case call(Node, MFA) of
- {badrpc, _} = Res -> throw(Res);
- Res -> io:format("~p~n", [Res]),
- ok
- end.
-
-unsafe_rpc(Node, Mod, Fun, Args) ->
- unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
-
-unsafe_rpc(Node, Mod, Fun, Args, Timeout) ->
- case rpc_call(Node, Mod, Fun, Args, Timeout) of
- {badrpc, _} = Res -> throw(Res);
- Normal -> Normal
- end.
-
-ensure_app_running(Node) ->
- case call(Node, {rabbit, is_running, []}) of
- true -> ok;
- false -> {error_string,
- rabbit_misc:format(
- "rabbit application is not running on node ~s.~n"
- " * Suggestion: start it with \"rabbitmqctl start_app\" "
- "and try again", [Node])};
- Other -> Other
- end.
-
-call(Node, {Mod, Fun, Args}) ->
- rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
-
-call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) ->
- Ref = start_emission(Node, {Mod, Fun, Args}, Opts),
- display_emission_result(Ref, InfoKeys, Opts).
-
-start_emission(Node, {Mod, Fun, Args}, Opts) ->
- ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false),
- Timeout = proplists:get_value(timeout, Opts, infinity),
- Ref = proplists:get_value(ref, Opts, make_ref()),
- rabbit_control_misc:spawn_emitter_caller(
- Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8),
- Ref, self(), Timeout),
- Ref.
-
-display_emission_result(Ref, InfoKeys, Opts) ->
- IsEscaped = proplists:get_value(is_escaped, Opts, false),
- Chunks = proplists:get_value(chunks, Opts, 1),
- Timeout = proplists:get_value(timeout, Opts, infinity),
- EmissionStatus = rabbit_control_misc:wait_for_info_messages(
- self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks),
- emission_to_action_result(EmissionStatus).
-
-%% Convert rabbit_control_misc:wait_for_info_messages/6 return value
-%% into form expected by rabbit_cli:main/3.
-emission_to_action_result({ok, ok}) ->
- ok;
-emission_to_action_result({error, Error}) ->
- Error.
-
-prepare_call_args(Args, ToBinUtf8) ->
- case ToBinUtf8 of
- true -> valid_utf8_args(Args);
- false -> Args
- end.
-
-valid_utf8_args(Args) ->
- lists:map(fun list_to_binary_utf8/1, Args).
-
-list_to_binary_utf8(L) ->
- B = list_to_binary(L),
- case rabbit_binary_parser:validate_utf8(B) of
- ok -> B;
- error -> throw({error, {not_utf_8, L}})
- end.
-
-%% escape does C-style backslash escaping of non-printable ASCII
-%% characters. We don't escape characters above 127, since they may
-%% form part of UTF-8 strings.
-
-escape(Atom, IsEscaped) when is_atom(Atom) ->
- escape(atom_to_list(Atom), IsEscaped);
-escape(Bin, IsEscaped) when is_binary(Bin) ->
- escape(binary_to_list(Bin), IsEscaped);
-escape(L, false) when is_list(L) ->
- escape_char(lists:reverse(L), []);
-escape(L, true) when is_list(L) ->
- L.
-
-escape_char([$\\ | T], Acc) ->
- escape_char(T, [$\\, $\\ | Acc]);
-escape_char([X | T], Acc) when X >= 32, X /= 127 ->
- escape_char(T, [X | Acc]);
-escape_char([X | T], Acc) ->
- escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3),
- $0 + (X band 7) | Acc]);
-escape_char([], Acc) ->
- Acc.
-
-prettify_amqp_table(Table, IsEscaped) ->
- [{escape(K, IsEscaped), prettify_typed_amqp_value(T, V, IsEscaped)}
- || {K, T, V} <- Table].
-
-prettify_typed_amqp_value(longstr, Value, IsEscaped) ->
- escape(Value, IsEscaped);
-prettify_typed_amqp_value(table, Value, IsEscaped) ->
- prettify_amqp_table(Value, IsEscaped);
-prettify_typed_amqp_value(array, Value, IsEscaped) ->
- [prettify_typed_amqp_value(T, V, IsEscaped) || {T, V} <- Value];
-prettify_typed_amqp_value(_Type, Value, _IsEscaped) ->
- Value.
-
-split_list([]) -> [];
-split_list([_]) -> exit(even_list_needed);
-split_list([A, B | T]) -> [{A, B} | split_list(T)].
-
-nodes_in_cluster(Node) ->
- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT).
-
-nodes_in_cluster(Node, Timeout) ->
- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout).
-
-alarms_by_node(Name) ->
- case rpc_call(Name, rabbit, status, []) of
- {badrpc,nodedown} -> {Name, [nodedown]};
- Status ->
- {_, As} = lists:keyfind(alarms, 1, Status),
- {Name, As}
- end.
-
-get_number_of_chunks(all, Nodes) ->
- length(Nodes) + 1;
-get_number_of_chunks(online, Nodes) ->
- length(Nodes);
-get_number_of_chunks(offline, _) ->
- 1;
-get_number_of_chunks(local, _) ->
- 1.
diff --git a/src/rabbit_lager.erl b/src/rabbit_lager.erl
index 8beee10846..c1ed613088 100644
--- a/src/rabbit_lager.erl
+++ b/src/rabbit_lager.erl
@@ -210,7 +210,7 @@ configure_lager() ->
%% messages to the default sink. To know the list of expected extra
%% sinks, we look at the 'lager_extra_sinks' compilation option.
Sinks0 = application:get_env(lager, extra_sinks, []),
- Sinks1 = configure_extra_sinks(Sinks0,
+ Sinks1 = configure_extra_sinks(Sinks0,
[error_logger | list_expected_sinks()]),
%% TODO Waiting for basho/lager#303
%% Sinks2 = lists:keystore(error_logger_lager_event, 1, Sinks1,
@@ -231,11 +231,7 @@ configure_lager() ->
configure_extra_sinks(Sinks, [SinkName | Rest]) ->
Sink0 = proplists:get_value(SinkName, Sinks, []),
Sink1 = case proplists:is_defined(handlers, Sink0) of
- false -> lists:keystore(handlers, 1, Sink0,
- {handlers,
- [{lager_forwarder_backend,
- lager_util:make_internal_sink_name(lager)
- }]});
+ false -> default_sink_config(SinkName, Sink0);
true -> Sink0
end,
Sinks1 = lists:keystore(SinkName, 1, Sinks, {SinkName, Sink1}),
@@ -243,6 +239,17 @@ configure_extra_sinks(Sinks, [SinkName | Rest]) ->
configure_extra_sinks(Sinks, []) ->
Sinks.
+default_sink_config(rabbit_log_upgrade_lager_event, Sink) ->
+ Handlers = lager_handlers(application:get_env(rabbit,
+ lager_handler_upgrade,
+ tty)),
+ lists:keystore(handlers, 1, Sink, {handlers, Handlers});
+default_sink_config(_, Sink) ->
+ lists:keystore(handlers, 1, Sink,
+ {handlers,
+ [{lager_forwarder_backend,
+ lager_util:make_internal_sink_name(lager)}]}).
+
list_expected_sinks() ->
case application:get_env(rabbit, lager_extra_sinks) of
{ok, List} ->
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index be5f0146b6..22181ce8b7 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -78,6 +78,7 @@ make_internal_sink_name(rabbit_log_channel) -> rabbit_log_channel_lager_event;
make_internal_sink_name(rabbit_log_mirroring) -> rabbit_log_mirroring_lager_event;
make_internal_sink_name(rabbit_log_queue) -> rabbit_log_queue_lager_event;
make_internal_sink_name(rabbit_log_federation) -> rabbit_log_federation_lager_event;
+make_internal_sink_name(rabbit_log_upgrade) -> rabbit_log_upgrade_lager_event;
make_internal_sink_name(Category) ->
lager_util:make_internal_sink_name(Category).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 51deed8597..9d1282b936 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -877,7 +877,7 @@ check_rabbit_consistency(Remote) ->
%% that a `reset' would leave it in. We cannot simply check if the
%% mnesia tables aren't there because restarted RAM nodes won't have
%% tables while still being non-virgin. What we do instead is to
-%% check if the mnesia directory is non existant or empty, with the
+%% check if the mnesia directory is non existent or empty, with the
%% exception of the cluster status files, which will be there thanks to
%% `rabbit_node_monitor:prepare_cluster_status_file/0'.
is_virgin_node() ->
diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl
index 2d7e0f56b6..bcaaf117ff 100644
--- a/src/rabbit_mnesia_rename.erl
+++ b/src/rabbit_mnesia_rename.erl
@@ -70,13 +70,13 @@ rename(Node, NodeMapList) ->
ok = rabbit_mnesia:copy_db(mnesia_copy_dir()),
%% And make the actual changes
- rabbit_control_main:become(FromNode),
+ become(FromNode),
take_backup(before_backup_name()),
convert_backup(NodeMap, before_backup_name(), after_backup_name()),
ok = rabbit_file:write_term_file(rename_config_name(),
[{FromNode, ToNode}]),
convert_config_files(NodeMap),
- rabbit_control_main:become(ToNode),
+ become(ToNode),
restore_backup(after_backup_name()),
ok
after
@@ -267,3 +267,15 @@ transform_table(Table, Map, Key) ->
[Term] = mnesia:read(Table, Key, write),
ok = mnesia:write(Table, update_term(Map, Term), write),
transform_table(Table, Map, mnesia:next(Table, Key)).
+
+become(BecomeNode) ->
+ error_logger:tty(false),
+ case net_adm:ping(BecomeNode) of
+ pong -> exit({node_running, BecomeNode});
+ pang -> ok = net_kernel:stop(),
+ io:format(" * Impersonating node: ~s...", [BecomeNode]),
+ {ok, _} = rabbit_cli:start_distribution(BecomeNode),
+ io:format(" done~n", []),
+ Dir = mnesia:system_info(directory),
+ io:format(" * Mnesia directory : ~s~n", [Dir])
+ end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 8e2b1c0d49..6c9eb92cff 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/4, successfully_recovered_state/1,
+-export([start_link/4, start_global_store_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
write/3, write_flow/3, read/2, contains/2, remove/2]).
@@ -63,7 +63,7 @@
%% the module for index ops,
%% rabbit_msg_store_ets_index by default
index_module,
- %% %% where are messages?
+ %% where are messages?
index_state,
%% current file name as number
current_file,
@@ -91,8 +91,6 @@
flying_ets,
%% set of dying clients
dying_clients,
- %% index of file positions for client death messages
- dying_client_index,
%% map of references of all registered clients
%% to callbacks
clients,
@@ -265,7 +263,7 @@
%% updated.
%%
%% On non-clean startup, we scan the files we discover, dealing with
-%% the possibilites of a crash having occured during a compaction
+%% the possibilites of a crash having occurred during a compaction
%% (this consists of tidyup - the compaction is deliberately designed
%% such that data is duplicated on disk rather than risking it being
%% lost), and rebuild the file summary and index ETS table.
@@ -310,7 +308,7 @@
%% From this reasoning, we do have a bound on the number of times the
%% message is rewritten. From when it is inserted, there can be no
%% files inserted between it and the head of the queue, and the worst
-%% case is that everytime it is rewritten, it moves one position lower
+%% case is that every time it is rewritten, it moves one position lower
%% in the file (for it to stay at the same position requires that
%% there are no holes beneath it, which means truncate would be used
%% and so it would not be rewritten at all). Thus this seems to
@@ -352,7 +350,7 @@
%% because in the event of the same message being sent to several
%% different queues, there is the possibility of one queue writing and
%% removing the message before other queues write it at all. Thus
-%% accomodating 0-reference counts allows us to avoid unnecessary
+%% accommodating 0-reference counts allows us to avoid unnecessary
%% writes here. Of course, there are complications: the file to which
%% the message has already been written could be locked pending
%% deletion or GC, which means we have to rewrite the message as the
@@ -474,15 +472,20 @@
%% public API
%%----------------------------------------------------------------------------
-start_link(Server, Dir, ClientRefs, StartupFunState) ->
- gen_server2:start_link({local, Server}, ?MODULE,
- [Server, Dir, ClientRefs, StartupFunState],
+start_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) ->
+ gen_server2:start_link(?MODULE,
+ [Name, Dir, ClientRefs, StartupFunState],
+ [{timeout, infinity}]).
+
+start_global_store_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) ->
+ gen_server2:start_link({local, Name}, ?MODULE,
+ [Name, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
-client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
+client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom(Server) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
@@ -522,7 +525,7 @@ write_flow(MsgId, Msg,
%% rabbit_amqqueue_process process via the
%% rabbit_variable_queue. We are accessing the
%% rabbit_amqqueue_process process dictionary.
- credit_flow:send(whereis(Server), CreditDiscBound),
+ credit_flow:send(Server, CreditDiscBound),
client_write(MsgId, Msg, flow, CState).
write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
@@ -548,7 +551,7 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
[client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}).
-set_maximum_since_use(Server, Age) ->
+set_maximum_since_use(Server, Age) when is_pid(Server); is_atom(Server) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
%%----------------------------------------------------------------------------
@@ -699,27 +702,25 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
end.
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
- dying_clients = DyingClients,
- dying_client_index = DyingIndex }) ->
- ets:delete(DyingIndex, CRef),
+ dying_clients = DyingClients }) ->
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
- dying_clients = sets:del_element(CRef, DyingClients) }.
+ dying_clients = maps:remove(CRef, DyingClients) }.
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([Server, BaseDir, ClientRefs, StartupFunState]) ->
+init([Name, BaseDir, ClientRefs, StartupFunState]) ->
process_flag(trap_exit, true),
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- Dir = filename:join(BaseDir, atom_to_list(Server)),
+ Dir = filename:join(BaseDir, atom_to_list(Name)),
- {ok, IndexModule} = application:get_env(msg_store_index_module),
- rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
+ {ok, IndexModule} = application:get_env(rabbit, msg_store_index_module),
+ rabbit_log:info("~tp: using ~p to provide index~n", [Dir, IndexModule]),
AttemptFileSummaryRecovery =
case ClientRefs of
@@ -738,7 +739,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
{CleanShutdown, IndexState, ClientRefs1} =
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
- ClientRefs, Dir, Server),
+ ClientRefs, Dir),
Clients = dict:from_list(
[{CRef, {undefined, undefined, undefined}} ||
CRef <- ClientRefs1]),
@@ -755,10 +756,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]),
- DyingIndex = ets:new(rabbit_msg_store_dying_client_index,
- [set, public, {keypos, #dying_client.client_ref}]),
- {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
+ {ok, FileSizeLimit} = application:get_env(rabbit, msg_store_file_size_limit),
{ok, GCPid} = rabbit_msg_store_gc:start_link(
#gc_state { dir = Dir,
@@ -787,8 +786,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
flying_ets = FlyingEts,
- dying_clients = sets:new(),
- dying_client_index = DyingIndex,
+ dying_clients = #{},
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
@@ -866,14 +864,14 @@ handle_call({contains, MsgId}, From, State) ->
handle_cast({client_dying, CRef},
State = #msstate { dying_clients = DyingClients,
- dying_client_index = DyingIndex,
current_file_handle = CurHdl,
current_file = CurFile }) ->
- DyingClients1 = sets:add_element(CRef, DyingClients),
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
- true = ets:insert_new(DyingIndex, #dying_client{client_ref = CRef,
- file = CurFile,
- offset = CurOffset}),
+ DyingClients1 = maps:put(CRef,
+ #dying_client{client_ref = CRef,
+ file = CurFile,
+ offset = CurOffset},
+ DyingClients),
noreply(State #msstate { dying_clients = DyingClients1 });
handle_cast({client_delete, CRef},
@@ -995,12 +993,25 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State2
end,
State3 = close_all_handles(State1),
- ok = store_file_summary(FileSummaryEts, Dir),
+ case store_file_summary(FileSummaryEts, Dir) of
+ ok -> ok;
+ {error, FSErr} ->
+ rabbit_log:error("Unable to store file summary"
+ " for vhost message store for directory ~p~n"
+ "Error: ~p~n",
+ [Dir, FSErr])
+ end,
[true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
CurFileCacheEts, FlyingEts]],
IndexModule:terminate(IndexState),
- ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
- {index_module, IndexModule}], Dir),
+ case store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
+ {index_module, IndexModule}], Dir) of
+ ok -> ok;
+ {error, RTErr} ->
+ rabbit_log:error("Unable to save message store recovery terms"
+ "for directory ~p~nError: ~p~n",
+ [Dir, RTErr])
+ end,
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.
@@ -1357,17 +1368,15 @@ blind_confirm(CRef, MsgIds, ActionTaken, State) ->
%% msg and thus should be ignored. Note that this (correctly) returns
%% false when testing to remove the death msg itself.
should_mask_action(CRef, MsgId,
- State = #msstate { dying_clients = DyingClients,
- dying_client_index = DyingIndex }) ->
- case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of
- {false, Location} ->
+ State = #msstate{dying_clients = DyingClients}) ->
+ case {maps:find(CRef, DyingClients), index_lookup(MsgId, State)} of
+ {error, Location} ->
{false, Location};
- {true, not_found} ->
+ {{ok, _}, not_found} ->
{true, not_found};
- {true, #msg_location { file = File, offset = Offset,
- ref_count = RefCount } = Location} ->
- [#dying_client { file = DeathFile, offset = DeathOffset }] =
- ets:lookup(DyingIndex, CRef),
+ {{ok, Client}, #msg_location { file = File, offset = Offset,
+ ref_count = RefCount } = Location} ->
+ #dying_client{file = DeathFile, offset = DeathOffset} = Client,
{case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
{true, _} -> true;
{false, 0} -> false_if_increment;
@@ -1538,16 +1547,16 @@ index_delete_by_file(File, #msstate { index_module = Index,
%% shutdown and recovery
%%----------------------------------------------------------------------------
-recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
+recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir) ->
{false, IndexModule:new(Dir), []};
-recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
- rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
+recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir) ->
+ rabbit_log:warning("~tp : rebuilding indices from scratch~n", [Dir]),
{false, IndexModule:new(Dir), []};
-recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
+recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir) ->
Fresh = fun (ErrorMsg, ErrorArgs) ->
- rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
+ rabbit_log:warning("~tp : " ++ ErrorMsg ++ "~n"
"rebuilding indices from scratch~n",
- [Server | ErrorArgs]),
+ [Dir | ErrorArgs]),
{false, IndexModule:new(Dir), []}
end,
case read_recovery_terms(Dir) of
@@ -1582,7 +1591,7 @@ read_recovery_terms(Dir) ->
end.
store_file_summary(Tid, Dir) ->
- ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
+ ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
[{extended_info, [object_count]}]).
recover_file_summary(false, _Dir) ->
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
index 76ef112069..0e8b7174e2 100644
--- a/src/rabbit_msg_store_ets_index.erl
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -74,6 +74,12 @@ delete_by_file(File, State) ->
ok.
terminate(#state { table = MsgLocations, dir = Dir }) ->
- ok = ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
- [{extended_info, [object_count]}]),
+ case ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
+ [{extended_info, [object_count]}]) of
+ ok -> ok;
+ {error, Err} ->
+ rabbit_log:error("Unable to save message store index"
+ " for directory ~p.~nError: ~p~n",
+ [Dir, Err])
+ end,
ets:delete(MsgLocations).
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl
new file mode 100644
index 0000000000..0209e88cf7
--- /dev/null
+++ b/src/rabbit_msg_store_vhost_sup.erl
@@ -0,0 +1,93 @@
+-module(rabbit_msg_store_vhost_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/3, init/1, add_vhost/2, delete_vhost/2,
+ client_init/5, successfully_recovered_state/2]).
+
+%% Internal
+-export([start_store_for_vhost/4]).
+
+start_link(Name, VhostsClientRefs, StartupFunState) when is_map(VhostsClientRefs);
+ VhostsClientRefs == undefined ->
+ supervisor2:start_link({local, Name}, ?MODULE,
+ [Name, VhostsClientRefs, StartupFunState]).
+
+init([Name, VhostsClientRefs, StartupFunState]) ->
+ ets:new(Name, [named_table, public]),
+ {ok, {{simple_one_for_one, 1, 1},
+ [{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_store_for_vhost,
+ [Name, VhostsClientRefs, StartupFunState]},
+ transient, infinity, supervisor, [rabbit_msg_store]}]}}.
+
+
+add_vhost(Name, VHost) ->
+ supervisor2:start_child(Name, [VHost]).
+
+start_store_for_vhost(Name, VhostsClientRefs, StartupFunState, VHost) ->
+ case vhost_store_pid(Name, VHost) of
+ no_pid ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ ok = rabbit_file:ensure_dir(VHostDir),
+ rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]),
+ VhostRefs = refs_for_vhost(VHost, VhostsClientRefs),
+ case rabbit_msg_store:start_link(Name, VHostDir, VhostRefs, StartupFunState) of
+ {ok, Pid} ->
+ ets:insert(Name, {VHost, Pid}),
+ {ok, Pid};
+ Other -> Other
+ end;
+ Pid when is_pid(Pid) ->
+ {error, {already_started, Pid}}
+ end.
+
+refs_for_vhost(_, undefined) -> undefined;
+refs_for_vhost(VHost, Refs) ->
+ case maps:find(VHost, Refs) of
+ {ok, Val} -> Val;
+ error -> []
+ end.
+
+
+delete_vhost(Name, VHost) ->
+ case vhost_store_pid(Name, VHost) of
+ no_pid -> ok;
+ Pid when is_pid(Pid) ->
+ supervisor2:terminate_child(Name, Pid),
+ cleanup_vhost_store(Name, VHost, Pid)
+ end,
+ ok.
+
+client_init(Name, Ref, MsgOnDiskFun, CloseFDsFun, VHost) ->
+ VHostPid = maybe_start_store_for_vhost(Name, VHost),
+ rabbit_msg_store:client_init(VHostPid, Ref, MsgOnDiskFun, CloseFDsFun).
+
+maybe_start_store_for_vhost(Name, VHost) ->
+ case add_vhost(Name, VHost) of
+ {ok, Pid} -> Pid;
+ {error, {already_started, Pid}} -> Pid;
+ Error -> throw(Error)
+ end.
+
+vhost_store_pid(Name, VHost) ->
+ case ets:lookup(Name, VHost) of
+ [] -> no_pid;
+ [{VHost, Pid}] ->
+ case erlang:is_process_alive(Pid) of
+ true -> Pid;
+ false ->
+ cleanup_vhost_store(Name, VHost, Pid),
+ no_pid
+ end
+ end.
+
+cleanup_vhost_store(Name, VHost, Pid) ->
+ ets:delete_object(Name, {VHost, Pid}).
+
+successfully_recovered_state(Name, VHost) ->
+ case vhost_store_pid(Name, VHost) of
+ no_pid ->
+ throw({message_store_not_started, Name, VHost});
+ Pid when is_pid(Pid) ->
+ rabbit_msg_store:successfully_recovered_state(Pid)
+ end.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 9da68b7640..54f4180244 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -67,22 +67,15 @@ ensure(FileJustChanged0) ->
{error, {enabled_plugins_mismatch, FileJustChanged, OurFile}}
end.
+%% @doc Prepares the file system and installs all enabled plugins.
setup() ->
- case application:get_env(rabbit, plugins_expand_dir) of
- {ok, ExpandDir} ->
- case filelib:is_dir(ExpandDir) of
- true ->
- rabbit_log:info(
- "\"~s\" is no longer used to expand plugins.~n"
- "RabbitMQ still manages this directory "
- "but will stop doing so in the future.", [ExpandDir]),
-
- _ = delete_recursively(ExpandDir);
- false ->
- ok
- end;
- undefined ->
- ok
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+
+ %% Eliminate the contents of the destination directory
+ case delete_recursively(ExpandDir) of
+ ok -> ok;
+ {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir,
+ [ExpandDir, E1]}})
end,
{ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file),
@@ -135,61 +128,10 @@ extract_schema(#plugin{type = dir, location = Location}, SchemaDir) ->
%% @doc Lists the plugins which are currently running.
active() ->
- LoadedPluginNames = maybe_keep_required_deps(false, loaded_plugin_names()),
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ InstalledPlugins = plugin_names(list(ExpandDir)),
[App || {App, _, _} <- rabbit_misc:which_applications(),
- lists:member(App, LoadedPluginNames)].
-
-loaded_plugin_names() ->
- {ok, PluginsPath} = application:get_env(rabbit, plugins_dir),
- PluginsDirs = split_path(PluginsPath),
- lists:flatmap(
- fun(PluginsDir) ->
- PluginsDirComponents = filename:split(PluginsDir),
- loaded_plugin_names(code:get_path(), PluginsDirComponents, [])
- end,
- PluginsDirs).
-
-loaded_plugin_names([Path | OtherPaths], PluginsDirComponents, PluginNames) ->
- case lists:sublist(filename:split(Path), length(PluginsDirComponents)) of
- PluginsDirComponents ->
- case build_plugin_name_from_code_path(Path) of
- undefined ->
- loaded_plugin_names(
- OtherPaths, PluginsDirComponents, PluginNames);
- PluginName ->
- loaded_plugin_names(
- OtherPaths, PluginsDirComponents,
- [list_to_atom(PluginName) | PluginNames])
- end;
- _ ->
- loaded_plugin_names(OtherPaths, PluginsDirComponents, PluginNames)
- end;
-loaded_plugin_names([], _, PluginNames) ->
- PluginNames.
-
-build_plugin_name_from_code_path(Path) ->
- AppPath = case filelib:is_dir(Path) of
- true ->
- case filelib:wildcard(filename:join(Path, "*.app")) of
- [AP | _] -> AP;
- [] -> undefined
- end;
- false ->
- EZ = filename:dirname(filename:dirname(Path)),
- case filelib:is_regular(EZ) of
- true ->
- case find_app_path_in_ez(EZ) of
- {ok, AP} -> AP;
- _ -> undefined
- end;
- _ ->
- undefined
- end
- end,
- case AppPath of
- undefined -> undefined;
- _ -> filename:basename(AppPath, ".app")
- end.
+ lists:member(App, InstalledPlugins)].
%% @doc Get the list of plugins which are ready to be enabled.
list(PluginsPath) ->
@@ -279,19 +221,25 @@ running_plugins() ->
%%----------------------------------------------------------------------------
prepare_plugins(Enabled) ->
- AllPlugins = installed_plugins(),
+ {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir),
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+
+ AllPlugins = list(PluginsDistDir),
Wanted = dependencies(false, Enabled, AllPlugins),
WantedPlugins = lookup_plugins(Wanted, AllPlugins),
{ValidPlugins, Problems} = validate_plugins(WantedPlugins),
maybe_warn_about_invalid_plugins(Problems),
+ case filelib:ensure_dir(ExpandDir ++ "/") of
+ ok -> ok;
+ {error, E2} -> throw({error, {cannot_create_plugins_expand_dir,
+ [ExpandDir, E2]}})
+ end,
+ [prepare_plugin(Plugin, ExpandDir) || Plugin <- ValidPlugins],
- [prepare_dir_plugin(ValidPlugin) || ValidPlugin <- ValidPlugins],
+ [prepare_dir_plugin(PluginAppDescPath) ||
+ PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")],
Wanted.
-installed_plugins() ->
- {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir),
- list(PluginsDistDir).
-
maybe_warn_about_invalid_plugins([]) ->
ok;
maybe_warn_about_invalid_plugins(InvalidPlugins) ->
@@ -404,60 +352,40 @@ is_version_supported(Version, ExpectedVersions) ->
end.
clean_plugins(Plugins) ->
- [clean_plugin(Plugin) || Plugin <- Plugins].
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ [clean_plugin(Plugin, ExpandDir) || Plugin <- Plugins].
-clean_plugin(Plugin) ->
+clean_plugin(Plugin, ExpandDir) ->
{ok, Mods} = application:get_key(Plugin, modules),
- PluginEbinDir = code:lib_dir(Plugin, ebin),
-
application:unload(Plugin),
[begin
code:soft_purge(Mod),
code:delete(Mod),
false = code:is_loaded(Mod)
end || Mod <- Mods],
-
- code:del_path(PluginEbinDir).
-
-plugin_ebin_dir(#plugin{type = ez, location = Location}) ->
- case find_app_path_in_ez(Location) of
- {ok, AppPath} ->
- filename:join(Location, filename:dirname(AppPath));
- {error, Reason} ->
- {error, Reason}
- end;
-plugin_ebin_dir(#plugin{type = dir, location = Location}) ->
- filename:join(Location, "ebin").
-
-prepare_dir_plugin(#plugin{name = Name} = Plugin) ->
- PluginEbinDir = case plugin_ebin_dir(Plugin) of
- {error, Reason} ->
- throw({plugin_ebin_dir_not_found, Name, Reason});
- Dir ->
- Dir
- end,
- case code:add_patha(PluginEbinDir) of
- true ->
- case filelib:wildcard(PluginEbinDir++ "/*.beam") of
- [] ->
+ delete_recursively(rabbit_misc:format("~s/~s", [ExpandDir, Plugin])).
+
+prepare_dir_plugin(PluginAppDescPath) ->
+ PluginEbinDir = filename:dirname(PluginAppDescPath),
+ Plugin = filename:basename(PluginAppDescPath, ".app"),
+ code:add_patha(PluginEbinDir),
+ case filelib:wildcard(PluginEbinDir++ "/*.beam") of
+ [] ->
+ ok;
+ [BeamPath | _] ->
+ Module = list_to_atom(filename:basename(BeamPath, ".beam")),
+ case code:ensure_loaded(Module) of
+ {module, _} ->
ok;
- [BeamPath | _] ->
- Module = list_to_atom(filename:basename(BeamPath, ".beam")),
- case code:ensure_loaded(Module) of
- {module, _} ->
- ok;
- {error, badfile} ->
- rabbit_log:error("Failed to enable plugin \"~s\": "
- "it may have been built with an "
- "incompatible (more recent?) "
- "version of Erlang~n", [Name]),
- throw({plugin_built_with_incompatible_erlang, Name});
- Error ->
- throw({plugin_module_unloadable, Name, Error})
- end
- end;
- {error, bad_directory} ->
- throw({plugin_ebin_path_incorrect, Name, PluginEbinDir})
+ {error, badfile} ->
+ rabbit_log:error("Failed to enable plugin \"~s\": "
+ "it may have been built with an "
+ "incompatible (more recent?) "
+ "version of Erlang~n", [Plugin]),
+ throw({plugin_built_with_incompatible_erlang, Plugin});
+ Error ->
+ throw({plugin_module_unloadable, Plugin, Error})
+ end
end.
%%----------------------------------------------------------------------------
@@ -468,6 +396,12 @@ delete_recursively(Fn) ->
{error, {Path, E}} -> {error, {cannot_delete, Path, E}}
end.
+prepare_plugin(#plugin{type = ez, location = Location}, ExpandDir) ->
+ zip:unzip(Location, [{cwd, ExpandDir}]);
+prepare_plugin(#plugin{type = dir, name = Name, location = Location},
+ ExpandDir) ->
+ rabbit_file:recursive_copy(Location, filename:join([ExpandDir, Name])).
+
plugin_info({ez, EZ}) ->
case read_app_file(EZ) of
{application, Name, Props} -> mkplugin(Name, Props, ez, EZ);
@@ -494,12 +428,14 @@ mkplugin(Name, Props, Type, Location) ->
broker_version_requirements = BrokerVersions,
dependency_version_requirements = DepsVersions}.
-find_app_path_in_ez(EZ) ->
+read_app_file(EZ) ->
case zip:list_dir(EZ) of
{ok, [_|ZippedFiles]} ->
case find_app_files(ZippedFiles) of
[AppPath|_] ->
- {ok, AppPath};
+ {ok, [{AppPath, AppFile}]} =
+ zip:extract(EZ, [{file_list, [AppPath]}, memory]),
+ parse_binary(AppFile);
[] ->
{error, no_app_file}
end;
@@ -507,16 +443,6 @@ find_app_path_in_ez(EZ) ->
{error, {invalid_ez, Reason}}
end.
-read_app_file(EZ) ->
- case find_app_path_in_ez(EZ) of
- {ok, AppPath} ->
- {ok, [{AppPath, AppFile}]} =
- zip:extract(EZ, [{file_list, [AppPath]}, memory]),
- parse_binary(AppFile);
- {error, Reason} ->
- {error, Reason}
- end.
-
find_app_files(ZippedFiles) ->
{ok, RE} = re:compile("^.*/ebin/.*.app$"),
[Path || {zip_file, Path, _, _, _, _} <- ZippedFiles,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 8b96bbffbd..793eb3e514 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -23,6 +23,10 @@
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
+-export([scan_queue_segments/3]).
+
+%% Migrates from global to per-vhost message stores
+-export([move_to_per_vhost_stores/1, update_recovery_term/2]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -123,7 +127,7 @@
-define(SEGMENT_EXTENSION, ".idx").
%% TODO: The segment size would be configurable, but deriving all the
-%% other values is quite hairy and quite possibly noticably less
+%% other values is quite hairy and quite possibly noticeably less
%% efficient, depending on how clever the compiler is when it comes to
%% binary generation/matching with constant vs variable lengths.
@@ -475,11 +479,10 @@ start(DurableQueueNames) ->
end, {[], sets:new()}, DurableQueueNames),
%% Any queue directory we've not been asked to recover is considered garbage
- QueuesDir = queues_dir(),
rabbit_file:recursive_delete(
- [filename:join(QueuesDir, DirName) ||
- DirName <- all_queue_directory_names(QueuesDir),
- not sets:is_element(DirName, DurableDirectories)]),
+ [DirName ||
+ DirName <- all_queue_directory_names(),
+ not sets:is_element(filename:basename(DirName), DurableDirectories)]),
rabbit_recovery_terms:clear(),
@@ -490,12 +493,9 @@ start(DurableQueueNames) ->
stop() -> rabbit_recovery_terms:stop().
-all_queue_directory_names(Dir) ->
- case rabbit_file:list_dir(Dir) of
- {ok, Entries} -> [E || E <- Entries,
- rabbit_file:is_dir(filename:join(Dir, E))];
- {error, enoent} -> []
- end.
+all_queue_directory_names() ->
+ filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_wildcard(),
+ "queues", "*"])).
%%----------------------------------------------------------------------------
%% startup and shutdown
@@ -508,14 +508,20 @@ erase_index_dir(Dir) ->
end.
blank_state(QueueName) ->
- blank_state_dir(
- filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).
+ blank_state_dir(queue_dir(QueueName)).
blank_state_dir(Dir) ->
blank_state_dir_funs(Dir,
fun (_) -> ok end,
fun (_) -> ok end).
+queue_dir(#resource{ virtual_host = VHost } = QueueName) ->
+ %% Queue directory is
+ %% {node_database_dir}/msg_stores/vhosts/{vhost}/queues/{queue}
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ QueueDir = queue_name_to_dir_name(QueueName),
+ filename:join([VHostDir, "queues", QueueDir]).
+
blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
@@ -629,8 +635,8 @@ queue_name_to_dir_name(Name = #resource { kind = queue }) ->
<<Num:128>> = erlang:md5(term_to_binary(Name)),
rabbit_misc:format("~.36B", [Num]).
-queues_dir() ->
- filename:join(rabbit_mnesia:dir(), "queues").
+queues_base_dir() ->
+ rabbit_mnesia:dir().
%%----------------------------------------------------------------------------
%% msg store startup delta function
@@ -660,20 +666,19 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
end.
queue_index_walker_reader(QueueName, Gatherer) ->
- State = blank_state(QueueName),
- ok = scan_segments(
+ ok = scan_queue_segments(
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
when is_binary(MsgId) ->
gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
Acc
- end, ok, State),
+ end, ok, QueueName),
ok = gatherer:finish(Gatherer).
-scan_segments(Fun, Acc, State) ->
- State1 = #qistate { segments = Segments, dir = Dir } =
- recover_journal(State),
+scan_queue_segments(Fun, Acc, QueueName) ->
+ State = #qistate { segments = Segments, dir = Dir } =
+ recover_journal(blank_state(QueueName)),
Result = lists:foldr(
fun (Seg, AccN) ->
segment_entries_foldr(
@@ -682,8 +687,8 @@ scan_segments(Fun, Acc, State) ->
Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
IsPersistent, IsDelivered, IsAcked, AccM)
end, AccN, segment_find_or_new(Seg, Dir, Segments))
- end, Acc, all_segment_nums(State1)),
- {_SegmentCounts, _State} = terminate(State1),
+ end, Acc, all_segment_nums(State)),
+ {_SegmentCounts, _State} = terminate(State),
Result.
%%----------------------------------------------------------------------------
@@ -1353,15 +1358,13 @@ store_msg_segment(_) ->
%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
- QueuesDir = queues_dir(),
- QueueDirNames = all_queue_directory_names(QueuesDir),
+ QueueDirNames = all_queue_directory_names(),
{ok, Gatherer} = gatherer:start_link(),
[begin
ok = gatherer:fork(Gatherer),
ok = worker_pool:submit_async(
fun () ->
- transform_queue(filename:join(QueuesDir, QueueDirName),
- Gatherer, Funs)
+ transform_queue(QueueDirName, Gatherer, Funs)
end)
end || QueueDirName <- QueueDirNames],
empty = gatherer:out(Gatherer),
@@ -1402,3 +1405,21 @@ drive_transform_fun(Fun, Hdl, Contents) ->
{Output, Contents1} -> ok = file_handle_cache:append(Hdl, Output),
drive_transform_fun(Fun, Hdl, Contents1)
end.
+
+move_to_per_vhost_stores(#resource{} = QueueName) ->
+ OldQueueDir = filename:join([queues_base_dir(), "queues",
+ queue_name_to_dir_name(QueueName)]),
+ NewQueueDir = queue_dir(QueueName),
+ case rabbit_file:is_dir(OldQueueDir) of
+ true ->
+ ok = rabbit_file:ensure_dir(NewQueueDir),
+ ok = rabbit_file:rename(OldQueueDir, NewQueueDir);
+ false ->
+ rabbit_log:info("Queue index directory not found for queue ~p~n",
+ [QueueName])
+ end,
+ ok.
+
+update_recovery_term(#resource{} = QueueName, Term) ->
+ Key = queue_name_to_dir_name(QueueName),
+ rabbit_recovery_terms:store(Key, Term).
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 94018a5b54..cee5408f0a 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -27,7 +27,7 @@
%%
%% The most obvious use case for runtime parameters is policies but
%% there are others:
-%%
+%%
%% * Plugin-specific parameters that only make sense at runtime,
%% e.g. Federation and Shovel link settings
%% * Exchange and queue decorators
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index ad70540e5b..a457938dc9 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor).
--export([start_link/0, start_child/1, start_child/2, start_child/3,
+-export([start_link/0, start_child/1, start_child/2, start_child/3, start_child/4,
start_supervisor_child/1, start_supervisor_child/2,
start_supervisor_child/3,
start_restartable_child/1, start_restartable_child/2,
@@ -37,6 +37,7 @@
-spec start_child(atom()) -> 'ok'.
-spec start_child(atom(), [any()]) -> 'ok'.
-spec start_child(atom(), atom(), [any()]) -> 'ok'.
+-spec start_child(atom(), atom(), atom(), [any()]) -> 'ok'.
-spec start_supervisor_child(atom()) -> 'ok'.
-spec start_supervisor_child(atom(), [any()]) -> 'ok'.
-spec start_supervisor_child(atom(), atom(), [any()]) -> 'ok'.
@@ -60,6 +61,13 @@ start_child(ChildId, Mod, Args) ->
{ChildId, {Mod, start_link, Args},
transient, ?WORKER_WAIT, worker, [Mod]})).
+start_child(ChildId, Mod, Fun, Args) ->
+ child_reply(supervisor:start_child(
+ ?SERVER,
+ {ChildId, {Mod, Fun, Args},
+ transient, ?WORKER_WAIT, worker, [Mod]})).
+
+
start_supervisor_child(Mod) -> start_supervisor_child(Mod, []).
start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index f88b7cc73f..95af84de39 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -17,6 +17,7 @@
-module(rabbit_upgrade).
-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0,
+ maybe_migrate_queues_to_per_vhost_storage/0,
nodes_running/1, secondary_upgrade/1]).
-include("rabbit.hrl").
@@ -98,7 +99,7 @@ ensure_backup_taken() ->
_ -> ok
end;
true ->
- error("Found lock file at ~s.
+ rabbit_log:error("Found lock file at ~s.
Either previous upgrade is in progress or has failed.
Database backup path: ~s",
[lock_filename(), backup_dir()]),
@@ -107,6 +108,7 @@ ensure_backup_taken() ->
take_backup() ->
BackupDir = backup_dir(),
+ info("upgrades: Backing up mnesia dir to ~p~n", [BackupDir]),
case rabbit_mnesia:copy_db(BackupDir) of
ok -> info("upgrades: Mnesia dir backed up to ~p~n",
[BackupDir]);
@@ -126,7 +128,9 @@ remove_backup() ->
maybe_upgrade_mnesia() ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
ok = rabbit_mnesia_rename:maybe_finish(AllNodes),
- case rabbit_version:upgrades_required(mnesia) of
+ %% Mnesia upgrade is the first upgrade scope,
+ %% so we should create a backup here if there are any upgrades
+ case rabbit_version:all_upgrades_required([mnesia, local, message_store]) of
{error, starting_from_scratch} ->
ok;
{error, version_not_available} ->
@@ -142,10 +146,15 @@ maybe_upgrade_mnesia() ->
ok;
{ok, Upgrades} ->
ensure_backup_taken(),
- ok = case upgrade_mode(AllNodes) of
- primary -> primary_upgrade(Upgrades, AllNodes);
- secondary -> secondary_upgrade(AllNodes)
- end
+ run_mnesia_upgrades(proplists:get_value(mnesia, Upgrades, []),
+ AllNodes)
+ end.
+
+run_mnesia_upgrades([], _) -> ok;
+run_mnesia_upgrades(Upgrades, AllNodes) ->
+ case upgrade_mode(AllNodes) of
+ primary -> primary_upgrade(Upgrades, AllNodes);
+ secondary -> secondary_upgrade(AllNodes)
end.
upgrade_mode(AllNodes) ->
@@ -243,15 +252,32 @@ maybe_upgrade_local() ->
{ok, []} -> ensure_backup_removed(),
ok;
{ok, Upgrades} -> mnesia:stop(),
- ensure_backup_taken(),
ok = apply_upgrades(local, Upgrades,
fun () -> ok end),
- ensure_backup_removed(),
ok
end.
%% -------------------------------------------------------------------
+maybe_migrate_queues_to_per_vhost_storage() ->
+ Result = case rabbit_version:upgrades_required(message_store) of
+ {error, version_not_available} -> version_not_available;
+ {error, starting_from_scratch} -> starting_from_scratch;
+ {error, _} = Err -> throw(Err);
+ {ok, []} -> ok;
+ {ok, Upgrades} -> apply_upgrades(message_store,
+ Upgrades,
+ fun() -> ok end),
+ ok
+ end,
+ %% Message store upgrades should be
+ %% the last group.
+ %% Backup can be deleted here.
+ ensure_backup_removed(),
+ Result.
+
+%% -------------------------------------------------------------------
+
apply_upgrades(Scope, Upgrades, Fun) ->
ok = rabbit_file:lock_file(lock_filename()),
info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index bbec4c749d..9dbd4fdbf2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -30,9 +30,18 @@
-export([start/1, stop/0]).
+%% exported for parallel map
+-export([add_vhost_msg_store/1]).
+
%% exported for testing only
-export([start_msg_store/2, stop_msg_store/0, init/6]).
+-export([move_messages_to_vhost_store/0]).
+-export([stop_vhost_msg_store/1]).
+-include_lib("stdlib/include/qlc.hrl").
+
+-define(QUEUE_MIGRATION_BATCH_SIZE, 100).
+
%%----------------------------------------------------------------------------
%% Messages, and their position in the queue, can be in memory or on
%% disk, or both. Persistent messages will have both message and
@@ -334,8 +343,11 @@
}).
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
+-define(PERSISTENT_MSG_STORE_SUP, msg_store_persistent_vhost).
+-define(TRANSIENT_MSG_STORE_SUP, msg_store_transient_vhost).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
+
-define(QUEUE, lqueue).
-include("rabbit.hrl").
@@ -344,6 +356,9 @@
%%----------------------------------------------------------------------------
-rabbit_upgrade({multiple_routing_keys, local, []}).
+-rabbit_upgrade({move_messages_to_vhost_store, message_store, []}).
+
+-compile(export_all).
-type seq_id() :: non_neg_integer().
@@ -453,31 +468,61 @@
start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
- start_msg_store(
- [Ref || Terms <- AllTerms,
- Terms /= non_clean_shutdown,
- begin
- Ref = proplists:get_value(persistent_ref, Terms),
- Ref =/= undefined
- end],
- StartFunState),
+ %% Group recovery terms by vhost.
+ {[], VhostRefs} = lists:foldl(
+ fun
+ %% We need to skip a queue name
+ (non_clean_shutdown, {[_|QNames], VhostRefs}) ->
+ {QNames, VhostRefs};
+ (Terms, {[QueueName | QNames], VhostRefs}) ->
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {QNames, VhostRefs};
+ Ref ->
+ #resource{virtual_host = VHost} = QueueName,
+ Refs = case maps:find(VHost, VhostRefs) of
+ {ok, Val} -> Val;
+ error -> []
+ end,
+ {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)}
+ end
+ end,
+ {DurableQueues, #{}},
+ AllTerms),
+ start_msg_store(VhostRefs, StartFunState),
{ok, AllTerms}.
stop() ->
ok = stop_msg_store(),
ok = rabbit_queue_index:stop().
-start_msg_store(Refs, StartFunState) ->
- ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
- [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
+start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined ->
+ ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
+ [?TRANSIENT_MSG_STORE_SUP,
undefined, {fun (ok) -> finished end, ok}]),
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
- [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
- Refs, StartFunState]).
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
+ [?PERSISTENT_MSG_STORE_SUP, Refs, StartFunState]),
+ %% Start message store for all known vhosts
+ VHosts = rabbit_vhost:list(),
+ lists:foreach(fun(VHost) ->
+ add_vhost_msg_store(VHost)
+ end,
+ VHosts),
+ ok.
+
+add_vhost_msg_store(VHost) ->
+ rabbit_log:info("Starting message store vor vhost ~p~n", [VHost]),
+ rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
+ rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost),
+ rabbit_log:info("Message store is started vor vhost ~p~n", [VHost]).
stop_msg_store() ->
- ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
- ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
+ ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP),
+ ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP).
+
+stop_vhost_msg_store(VHost) ->
+ rabbit_msg_store_vhost_sup:delete_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
+ rabbit_msg_store_vhost_sup:delete_vhost(?PERSISTENT_MSG_STORE_SUP, VHost),
+ ok.
init(Queue, Recover, Callback) ->
init(
@@ -492,22 +537,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName,
MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
+ VHost = QueueName#resource.virtual_host,
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
- true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
- MsgOnDiskFun, AsyncCallback);
+ true -> msg_store_client_init(?PERSISTENT_MSG_STORE_SUP,
+ MsgOnDiskFun, AsyncCallback, VHost);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
+ msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined,
+ AsyncCallback, VHost));
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
+ VHost = QueueName#resource.virtual_host,
{PersistentClient, ContainsCheckFun} =
case IsDurable of
- true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun, AsyncCallback),
+ true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, PRef,
+ MsgOnDiskFun, AsyncCallback,
+ VHost),
{C, fun (MsgId) when is_binary(MsgId) ->
rabbit_msg_store:contains(MsgId, C);
(#basic_message{is_persistent = Persistent}) ->
@@ -515,12 +564,14 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
end};
false -> {undefined, fun(_MsgId) -> false end}
end,
- TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
- undefined, AsyncCallback),
+ TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP,
+ undefined, AsyncCallback,
+ VHost),
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
- rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
+ rabbit_msg_store_vhost_sup:successfully_recovered_state(
+ ?PERSISTENT_MSG_STORE_SUP, VHost),
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
@@ -1195,14 +1246,17 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
end),
Res.
-msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
+msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
- Callback).
+ Callback, VHost).
-msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
- CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
- rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
- fun () -> Callback(?MODULE, CloseFDsFun) end).
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
+ CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP),
+ rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun,
+ fun () ->
+ Callback(?MODULE, CloseFDsFun)
+ end,
+ VHost).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
@@ -2673,9 +2727,180 @@ multiple_routing_keys() ->
%% Assumes message store is not running
transform_storage(TransformFun) ->
- transform_store(?PERSISTENT_MSG_STORE, TransformFun),
- transform_store(?TRANSIENT_MSG_STORE, TransformFun).
+ transform_store(?PERSISTENT_MSG_STORE_SUP, TransformFun),
+ transform_store(?TRANSIENT_MSG_STORE_SUP, TransformFun).
transform_store(Store, TransformFun) ->
rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
+
+move_messages_to_vhost_store() ->
+ log_upgrade("Moving messages to per-vhost message store"),
+ Queues = list_persistent_queues(),
+ %% Move the queue index for each persistent queue to the new store
+ lists:foreach(
+ fun(Queue) ->
+ #amqqueue{name = QueueName} = Queue,
+ rabbit_queue_index:move_to_per_vhost_stores(QueueName)
+ end,
+ Queues),
+ %% Legacy (global) msg_store may require recovery.
+ %% This upgrade step should only be started
+ %% if we are upgrading from a pre-3.7.0 version.
+ {QueuesWithTerms, RecoveryRefs, StartFunState} = start_recovery_terms(Queues),
+
+ OldStore = run_old_persistent_store(RecoveryRefs, StartFunState),
+ %% New store should not be recovered.
+ NewStoreSup = start_new_store_sup(),
+ Vhosts = rabbit_vhost:list(),
+ lists:foreach(fun(VHost) ->
+ rabbit_msg_store_vhost_sup:add_vhost(NewStoreSup, VHost)
+ end,
+ Vhosts),
+ MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size,
+ ?QUEUE_MIGRATION_BATCH_SIZE),
+ in_batches(MigrationBatchSize,
+ {rabbit_variable_queue, migrate_queue, [OldStore, NewStoreSup]},
+ QueuesWithTerms,
+ "Migrating batch ~p of ~p queues ~n",
+ "Batch ~p of ~p queues migrated ~n"),
+
+ log_upgrade("Message store migration finished"),
+ delete_old_store(OldStore),
+
+ ok = rabbit_queue_index:stop(),
+ ok = rabbit_sup:stop_child(NewStoreSup),
+ ok.
+
+in_batches(Size, MFA, List, MessageStart, MessageEnd) ->
+ in_batches(Size, 1, MFA, List, MessageStart, MessageEnd).
+
+in_batches(_, _, _, [], _, _) -> ok;
+in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) ->
+ {Batch, Tail} = case Size > length(List) of
+ true -> {List, []};
+ false -> lists:split(Size, List)
+ end,
+ log_upgrade(MessageStart, [BatchNum, Size]),
+ {M, F, A} = MFA,
+ Keys = [ rpc:async_call(node(), M, F, [El | A]) || El <- Batch ],
+ lists:foreach(fun(Key) ->
+ case rpc:yield(Key) of
+ {badrpc, Err} -> throw(Err);
+ _ -> ok
+ end
+ end,
+ Keys),
+ log_upgrade(MessageEnd, [BatchNum, Size]),
+ in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd).
+
+migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore, NewStoreSup) ->
+ log_upgrade_verbose(
+ "Migrating messages in queue ~s in vhost ~s to per-vhost message store~n",
+ [Name, VHost]),
+ OldStoreClient = get_global_store_client(OldStore),
+ NewStoreClient = get_per_vhost_store_client(QueueName, NewStoreSup),
+ %% WARNING: During scan_queue_segments queue index state is being recovered
+ %% and terminated. This can cause side effects!
+ rabbit_queue_index:scan_queue_segments(
+ %% We migrate only persistent messages which are found in message store
+ %% and are not acked yet
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, OldC)
+ when is_binary(MsgId) ->
+ migrate_message(MsgId, OldC, NewStoreClient);
+ (_SeqId, _MsgId, _MsgProps,
+ _IsPersistent, _IsDelivered, _IsAcked, OldC) ->
+ OldC
+ end,
+ OldStoreClient,
+ QueueName),
+ rabbit_msg_store:client_terminate(OldStoreClient),
+ rabbit_msg_store:client_terminate(NewStoreClient),
+ NewClientRef = rabbit_msg_store:client_ref(NewStoreClient),
+ case RecoveryTerm of
+ non_clean_shutdown -> ok;
+ Term when is_list(Term) ->
+ NewRecoveryTerm = lists:keyreplace(persistent_ref, 1, RecoveryTerm,
+ {persistent_ref, NewClientRef}),
+ rabbit_queue_index:update_recovery_term(QueueName, NewRecoveryTerm)
+ end,
+ log_upgrade_verbose("Finished migrating queue ~s in vhost ~s", [Name, VHost]),
+ {QueueName, NewClientRef}.
+
+migrate_message(MsgId, OldC, NewC) ->
+ case rabbit_msg_store:read(MsgId, OldC) of
+ {{ok, Msg}, OldC1} ->
+ ok = rabbit_msg_store:write(MsgId, Msg, NewC),
+ OldC1;
+ _ -> OldC
+ end.
+
+get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStoreSup) ->
+ rabbit_msg_store_vhost_sup:client_init(NewStoreSup,
+ rabbit_guid:gen(),
+ fun(_,_) -> ok end,
+ fun() -> ok end,
+ VHost).
+
+get_global_store_client(OldStore) ->
+ rabbit_msg_store:client_init(OldStore,
+ rabbit_guid:gen(),
+ fun(_,_) -> ok end,
+ fun() -> ok end).
+
+list_persistent_queues() ->
+ Node = node(),
+ mnesia:async_dirty(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
+ pid = Pid}
+ <- mnesia:table(rabbit_durable_queue),
+ node(Pid) == Node,
+ mnesia:read(rabbit_queue, Name, read) =:= []]))
+ end).
+
+start_recovery_terms(Queues) ->
+ QueueNames = [Name || #amqqueue{name = Name} <- Queues],
+ {AllTerms, StartFunState} = rabbit_queue_index:start(QueueNames),
+ Refs = [Ref || Terms <- AllTerms,
+ Terms /= non_clean_shutdown,
+ begin
+ Ref = proplists:get_value(persistent_ref, Terms),
+ Ref =/= undefined
+ end],
+ {lists:zip(QueueNames, AllTerms), Refs, StartFunState}.
+
+run_old_persistent_store(Refs, StartFunState) ->
+ OldStoreName = ?PERSISTENT_MSG_STORE,
+ ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, start_global_store_link,
+ [OldStoreName, rabbit_mnesia:dir(),
+ Refs, StartFunState]),
+ OldStoreName.
+
+start_new_store_sup() ->
+ % Start persistent store sup without recovery.
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP,
+ rabbit_msg_store_vhost_sup,
+ [?PERSISTENT_MSG_STORE_SUP,
+ undefined, {fun (ok) -> finished end, ok}]),
+ ?PERSISTENT_MSG_STORE_SUP.
+
+delete_old_store(OldStore) ->
+ ok = rabbit_sup:stop_child(OldStore),
+ rabbit_file:recursive_delete(
+ [filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]),
+ %% Delete old transient store as well
+ rabbit_file:recursive_delete(
+ [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]).
+
+log_upgrade(Msg) ->
+ log_upgrade(Msg, []).
+
+log_upgrade(Msg, Args) ->
+ rabbit_log:info("message_store upgrades: " ++ Msg, Args).
+
+log_upgrade_verbose(Msg) ->
+ log_upgrade_verbose(Msg, []).
+
+log_upgrade_verbose(Msg, Args) ->
+ rabbit_log_upgrade:info(Msg, Args).
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index a27f0aca00..4e2edd19eb 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -18,7 +18,8 @@
-export([recorded/0, matches/2, desired/0, desired_for_scope/1,
record_desired/0, record_desired_for_scope/1,
- upgrades_required/1, check_version_consistency/3,
+ upgrades_required/1, all_upgrades_required/1,
+ check_version_consistency/3,
check_version_consistency/4, check_otp_consistency/1,
version_error/3]).
@@ -117,6 +118,30 @@ upgrades_required(Scope) ->
end, Scope)
end.
+all_upgrades_required(Scopes) ->
+ case recorded() of
+ {error, enoent} ->
+ case filelib:is_file(rabbit_guid:filename()) of
+ false -> {error, starting_from_scratch};
+ true -> {error, version_not_available}
+ end;
+ {ok, _} ->
+ lists:foldl(
+ fun
+ (_, {error, Err}) -> {error, Err};
+ (Scope, {ok, Acc}) ->
+ case upgrades_required(Scope) of
+ %% Lift errors from any scope.
+ {error, Err} -> {error, Err};
+ %% Filter non-upgradable scopes
+ {ok, []} -> {ok, Acc};
+ {ok, Upgrades} -> {ok, [{Scope, Upgrades} | Acc]}
+ end
+ end,
+ {ok, []},
+ Scopes)
+ end.
+
%% -------------------------------------------------------------------
with_upgrade_graph(Fun, Scope) ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 213dbaaa0c..6edb62425b 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -23,7 +23,8 @@
-export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2,
set_limits/2, limits_of/1]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-
+-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
+-export([purge_messages/1]).
-spec add(rabbit_types:vhost()) -> 'ok'.
-spec delete(rabbit_types:vhost()) -> 'ok'.
@@ -95,6 +96,16 @@ delete(VHostPath) ->
[ok = Fun() || Fun <- Funs],
ok.
+purge_messages(VHost) ->
+ VhostDir = msg_store_dir_path(VHost),
+ rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
+ %% Message store is stopped to close file handles
+ rabbit_variable_queue:stop_vhost_msg_store(VHost),
+ ok = rabbit_file:recursive_delete([VhostDir]),
+ %% Ensure the store is terminated even if it was restarted during the delete operation
+ %% above.
+ rabbit_variable_queue:stop_vhost_msg_store(VHost).
+
assert_benign(ok) -> ok;
assert_benign({ok, _}) -> ok;
assert_benign({error, not_found}) -> ok;
@@ -120,6 +131,7 @@ internal_delete(VHostPath) ->
Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info))
|| Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
+ purge_messages(VHostPath),
Fs1 ++ Fs2.
exists(VHostPath) ->
@@ -170,6 +182,23 @@ set_limits(VHost = #vhost{}, undefined) ->
set_limits(VHost = #vhost{}, Limits) ->
VHost#vhost{limits = Limits}.
+
+dir(Vhost) ->
+ <<Num:128>> = erlang:md5(term_to_binary(Vhost)),
+ rabbit_misc:format("~.36B", [Num]).
+
+msg_store_dir_path(VHost) ->
+ EncodedName = list_to_binary(dir(VHost)),
+ rabbit_data_coercion:to_list(filename:join([msg_store_dir_base(),
+ EncodedName])).
+
+msg_store_dir_wildcard() ->
+ rabbit_data_coercion:to_list(filename:join([msg_store_dir_base(), "*"])).
+
+msg_store_dir_base() ->
+ Dir = rabbit_mnesia:dir(),
+ filename:join([Dir, "msg_stores", "vhosts"]).
+
%%----------------------------------------------------------------------------
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
@@ -188,3 +217,4 @@ info_all(Ref, AggregatorPid) -> info_all(?INFO_KEYS, Ref, AggregatorPid).
info_all(Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref, fun(VHost) -> info(VHost, Items) end, list()).
+
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index eae7119007..59c63022d8 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -42,7 +42,7 @@ memory() ->
|| Names <- distinguished_interesting_sups()],
Mnesia = mnesia_memory(),
- MsgIndexETS = ets_memory([msg_store_persistent, msg_store_transient]),
+ MsgIndexETS = ets_memory([msg_store_persistent_vhost, msg_store_transient_vhost]),
MetricsETS = ets_memory([rabbit_metrics]),
MetricsProc = try
[{_, M}] = process_info(whereis(rabbit_metrics), [memory]),
@@ -149,7 +149,7 @@ interesting_sups() ->
[[rabbit_amqqueue_sup_sup], conn_sups() | interesting_sups0()].
interesting_sups0() ->
- MsgIndexProcs = [msg_store_transient, msg_store_persistent],
+ MsgIndexProcs = [msg_store_transient_vhost, msg_store_persistent_vhost],
MgmtDbProcs = [rabbit_mgmt_sup_sup],
PluginProcs = plugin_sups(),
[MsgIndexProcs, MgmtDbProcs, PluginProcs].
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl
index 4407a24e7f..124fda47b1 100644
--- a/test/channel_operation_timeout_test_queue.erl
+++ b/test/channel_operation_timeout_test_queue.erl
@@ -111,8 +111,8 @@
}).
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
--define(PERSISTENT_MSG_STORE, msg_store_persistent).
--define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost).
+-define(TRANSIENT_MSG_STORE, msg_store_transient_vhost).
-define(QUEUE, lqueue).
-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
@@ -215,14 +215,27 @@
start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
- start_msg_store(
- [Ref || Terms <- AllTerms,
- Terms /= non_clean_shutdown,
- begin
- Ref = proplists:get_value(persistent_ref, Terms),
- Ref =/= undefined
- end],
- StartFunState),
+ %% Group recovery terms by vhost.
+ {[], VhostRefs} = lists:foldl(
+ fun
+ %% We need to skip a queue name
+ (non_clean_shutdown, {[_|QNames], VhostRefs}) ->
+ {QNames, VhostRefs};
+ (Terms, {[QueueName | QNames], VhostRefs}) ->
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {QNames, VhostRefs};
+ Ref ->
+ #resource{virtual_host = VHost} = QueueName,
+ Refs = case maps:find(VHost, VhostRefs) of
+ {ok, Val} -> Val;
+ error -> []
+ end,
+ {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)}
+ end
+ end,
+ {DurableQueues, #{}},
+ AllTerms),
+ start_msg_store(VhostRefs, StartFunState),
{ok, AllTerms}.
stop() ->
@@ -230,12 +243,21 @@ stop() ->
ok = rabbit_queue_index:stop().
start_msg_store(Refs, StartFunState) ->
- ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
+ ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup,
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
undefined, {fun (ok) -> finished end, ok}]),
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
- Refs, StartFunState]).
+ Refs, StartFunState]),
+ %% Start message store for all known vhosts
+ VHosts = rabbit_vhost:list(),
+ lists:foreach(
+ fun(VHost) ->
+ rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost),
+ rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost)
+ end,
+ VHosts),
+ ok.
stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
@@ -254,22 +276,26 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName,
MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
+ VHost = QueueName#resource.virtual_host,
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
- MsgOnDiskFun, AsyncCallback);
+ MsgOnDiskFun, AsyncCallback,
+ VHost);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost));
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
+ VHost = QueueName#resource.virtual_host,
{PersistentClient, ContainsCheckFun} =
case IsDurable of
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun, AsyncCallback),
+ MsgOnDiskFun, AsyncCallback,
+ VHost),
{C, fun (MsgId) when is_binary(MsgId) ->
rabbit_msg_store:contains(MsgId, C);
(#basic_message{is_persistent = Persistent}) ->
@@ -278,11 +304,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
false -> {undefined, fun(_MsgId) -> false end}
end,
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
- undefined, AsyncCallback),
+ undefined, AsyncCallback,
+ VHost),
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
- rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
+ rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost),
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
@@ -957,14 +984,16 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
end),
Res.
-msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
+msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
- Callback).
+ Callback, VHost).
-msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
- rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
- fun () -> Callback(?MODULE, CloseFDsFun) end).
+ rabbit_msg_store_vhost_sup:client_init(
+ MsgStore, Ref, MsgOnDiskFun,
+ fun () -> Callback(?MODULE, CloseFDsFun) end,
+ VHost).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index eac5fa3683..9b605d07ae 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -644,6 +644,7 @@ assert_not_clustered(Node) ->
assert_failure(Fun) ->
case catch Fun() of
+ {error, Code, Reason} -> Reason;
{error, Reason} -> Reason;
{error_string, Reason} -> Reason;
{badrpc, {'EXIT', Reason}} -> Reason;
@@ -652,35 +653,35 @@ assert_failure(Fun) ->
end.
stop_app(Node) ->
- control_action(stop_app, Node).
+ rabbit_control_helper:command(stop_app, Node).
start_app(Node) ->
- control_action(start_app, Node).
+ rabbit_control_helper:command(start_app, Node).
join_cluster(Node, To) ->
join_cluster(Node, To, false).
join_cluster(Node, To, Ram) ->
- control_action(join_cluster, Node, [atom_to_list(To)], [{"--ram", Ram}]).
+ rabbit_control_helper:command_with_output(join_cluster, Node, [atom_to_list(To)], [{"--ram", Ram}]).
reset(Node) ->
- control_action(reset, Node).
+ rabbit_control_helper:command(reset, Node).
force_reset(Node) ->
- control_action(force_reset, Node).
+ rabbit_control_helper:command(force_reset, Node).
forget_cluster_node(Node, Removee, RemoveWhenOffline) ->
- control_action(forget_cluster_node, Node, [atom_to_list(Removee)],
+ rabbit_control_helper:command(forget_cluster_node, Node, [atom_to_list(Removee)],
[{"--offline", RemoveWhenOffline}]).
forget_cluster_node(Node, Removee) ->
forget_cluster_node(Node, Removee, false).
change_cluster_node_type(Node, Type) ->
- control_action(change_cluster_node_type, Node, [atom_to_list(Type)]).
+ rabbit_control_helper:command(change_cluster_node_type, Node, [atom_to_list(Type)]).
update_cluster_nodes(Node, DiscoveryNode) ->
- control_action(update_cluster_nodes, Node, [atom_to_list(DiscoveryNode)]).
+ rabbit_control_helper:command(update_cluster_nodes, Node, [atom_to_list(DiscoveryNode)]).
stop_join_start(Node, ClusterTo, Ram) ->
ok = stop_app(Node),
@@ -695,17 +696,6 @@ stop_reset_start(Node) ->
ok = reset(Node),
ok = start_app(Node).
-control_action(Command, Node) ->
- control_action(Command, Node, [], []).
-
-control_action(Command, Node, Args) ->
- control_action(Command, Node, Args, []).
-
-control_action(Command, Node, Args, Opts) ->
- rpc:call(Node, rabbit_control_main, action,
- [Command, Node, Args, Opts,
- fun io:format/2]).
-
declare(Ch, Name) ->
Res = amqp_channel:call(Ch, #'queue.declare'{durable = true,
queue = Name}),
diff --git a/test/eager_sync_SUITE.erl b/test/eager_sync_SUITE.erl
index 93b308b6c5..70d7080269 100644
--- a/test/eager_sync_SUITE.erl
+++ b/test/eager_sync_SUITE.erl
@@ -23,7 +23,7 @@
-define(QNAME, <<"ha.two.test">>).
-define(QNAME_AUTO, <<"ha.auto.test">>).
--define(MESSAGE_COUNT, 2000).
+-define(MESSAGE_COUNT, 200000).
all() ->
[
@@ -135,9 +135,11 @@ eager_sync_cancel(Config) ->
amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME,
durable = true}),
{ok, not_syncing} = sync_cancel(C, ?QNAME), %% Idempotence
- eager_sync_cancel_test2(Config, A, B, C, Ch).
+ eager_sync_cancel_test2(Config, A, B, C, Ch, 100).
-eager_sync_cancel_test2(Config, A, B, C, Ch) ->
+eager_sync_cancel_test2(_, _, _, _, _, 0) ->
+ error(no_more_attempts_left);
+eager_sync_cancel_test2(Config, A, B, C, Ch, Attempts) ->
%% Sync then cancel
rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
restart(Config, A),
@@ -158,12 +160,12 @@ eager_sync_cancel_test2(Config, A, B, C, Ch) ->
%% Damn. Syncing finished between wait_for_syncing/3 and
%% sync_cancel/2 above. Start again.
amqp_channel:call(Ch, #'queue.purge'{queue = ?QNAME}),
- eager_sync_cancel_test2(Config, A, B, C, Ch)
+ eager_sync_cancel_test2(Config, A, B, C, Ch, Attempts - 1)
end;
synced_already ->
%% Damn. Syncing finished before wait_for_syncing/3. Start again.
amqp_channel:call(Ch, #'queue.purge'{queue = ?QNAME}),
- eager_sync_cancel_test2(Config, A, B, C, Ch)
+ eager_sync_cancel_test2(Config, A, B, C, Ch, Attempts - 1)
end.
eager_sync_auto(Config) ->
@@ -240,8 +242,8 @@ wait_for_sync(Node, QName) ->
sync_detection_SUITE:wait_for_sync_status(true, Node, QName).
action(Node, Action, QName) ->
- rabbit_ct_broker_helpers:control_action(
- Action, Node, [binary_to_list(QName)], [{"-p", "/"}]).
+ rabbit_control_helper:command_with_output(
+ Action, Node, [binary_to_list(QName)], [{"-p", "/"}]).
queue(Node, QName) ->
QNameRes = rabbit_misc:r(<<"/">>, queue, QName),
@@ -273,6 +275,6 @@ state(Node, QName) ->
%% in order to pass, because a SyncBatchSize >= ?MESSAGE_COUNT will
%% always finish before the test is able to cancel the sync.
set_app_sync_batch_size(Node) ->
- rabbit_ct_broker_helpers:control_action(
+ rabbit_control_helper:command(
eval, Node,
["application:set_env(rabbit, mirroring_sync_batch_size, 1)."]).
diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl
index 592e57c41a..efc5ca830e 100644
--- a/test/per_vhost_connection_limit_SUITE.erl
+++ b/test/per_vhost_connection_limit_SUITE.erl
@@ -795,7 +795,7 @@ set_vhost_connection_limit(Config, VHost, Count) ->
set_vhost_connection_limit(Config, NodeIndex, VHost, Count) ->
Node = rabbit_ct_broker_helpers:get_node_config(
Config, NodeIndex, nodename),
- rabbit_ct_broker_helpers:control_action(
+ ok = rabbit_ct_broker_helpers:control_action(
set_vhost_limits, Node,
["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"],
[{"-p", binary_to_list(VHost)}]).
diff --git a/test/per_vhost_msg_store_SUITE.erl b/test/per_vhost_msg_store_SUITE.erl
new file mode 100644
index 0000000000..4d88c84b7e
--- /dev/null
+++ b/test/per_vhost_msg_store_SUITE.erl
@@ -0,0 +1,254 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(per_vhost_msg_store_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+-define(MSGS_COUNT, 100).
+
+all() ->
+ [
+ publish_to_different_dirs,
+ storage_deleted_on_vhost_delete,
+ single_vhost_storage_delete_is_safe
+ ].
+
+
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config,
+ [{rmq_nodename_suffix, ?MODULE}]),
+ Config2 = rabbit_ct_helpers:merge_app_env(
+ Config1,
+ {rabbit, [{queue_index_embed_msgs_below, 100}]}),
+ rabbit_ct_helpers:run_setup_steps(
+ Config2,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(
+ Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(_, Config) ->
+ Vhost1 = <<"vhost1">>,
+ Vhost2 = <<"vhost2">>,
+ rabbit_ct_broker_helpers:add_vhost(Config, Vhost1),
+ rabbit_ct_broker_helpers:add_vhost(Config, Vhost2),
+ Chan1 = open_channel(Vhost1, Config),
+ Chan2 = open_channel(Vhost2, Config),
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{vhost1, Vhost1}, {vhost2, Vhost2},
+ {channel1, Chan1}, {channel2, Chan2}]).
+
+end_per_testcase(single_vhost_storage_delete_is_safe, Config) ->
+ Config;
+end_per_testcase(_, Config) ->
+ Vhost1 = ?config(vhost1, Config),
+ Vhost2 = ?config(vhost2, Config),
+ rabbit_ct_broker_helpers:delete_vhost(Config, Vhost1),
+ rabbit_ct_broker_helpers:delete_vhost(Config, Vhost2),
+ Config.
+
+publish_to_different_dirs(Config) ->
+ Vhost1 = ?config(vhost1, Config),
+ Vhost2 = ?config(vhost2, Config),
+ Channel1 = ?config(channel1, Config),
+ Channel2 = ?config(channel2, Config),
+ Queue1 = declare_durable_queue(Channel1),
+ Queue2 = declare_durable_queue(Channel2),
+ FolderSize1 = get_folder_size(Vhost1, Config),
+ FolderSize2 = get_folder_size(Vhost2, Config),
+
+ %% Publish message to a queue index
+ publish_persistent_messages(index, Channel1, Queue1),
+ %% First storage increased
+ FolderSize11 = get_folder_size(Vhost1, Config),
+ true = (FolderSize1 < FolderSize11),
+ %% Second storage didn't increased
+ FolderSize2 = get_folder_size(Vhost2, Config),
+
+ %% Publish message to a message store
+ publish_persistent_messages(store, Channel1, Queue1),
+ %% First storage increased
+ FolderSize12 = get_folder_size(Vhost1, Config),
+ true = (FolderSize11 < FolderSize12),
+ %% Second storage didn't increased
+ FolderSize2 = get_folder_size(Vhost2, Config),
+
+ %% Publish message to a queue index
+ publish_persistent_messages(index, Channel2, Queue2),
+ %% First storage increased
+ FolderSize21 = get_folder_size(Vhost2, Config),
+ true = (FolderSize2 < FolderSize21),
+ %% Second storage didn't increased
+ FolderSize12 = get_folder_size(Vhost1, Config),
+
+ %% Publish message to a message store
+ publish_persistent_messages(store, Channel2, Queue2),
+ %% Second storage increased
+ FolderSize22 = get_folder_size(Vhost2, Config),
+ true = (FolderSize21 < FolderSize22),
+ %% First storage didn't increased
+ FolderSize12 = get_folder_size(Vhost1, Config).
+
+storage_deleted_on_vhost_delete(Config) ->
+ Vhost1 = ?config(vhost1, Config),
+ Channel1 = ?config(channel1, Config),
+ Queue1 = declare_durable_queue(Channel1),
+ FolderSize = get_global_folder_size(Config),
+
+ publish_persistent_messages(index, Channel1, Queue1),
+ publish_persistent_messages(store, Channel1, Queue1),
+ FolderSizeAfterPublish = get_global_folder_size(Config),
+
+ %% Total storage size increased
+ true = (FolderSize < FolderSizeAfterPublish),
+
+ ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost1),
+
+ %% Total memory reduced
+ FolderSizeAfterDelete = get_global_folder_size(Config),
+ true = (FolderSizeAfterPublish > FolderSizeAfterDelete),
+
+ %% There is no Vhost1 folder
+ 0 = get_folder_size(Vhost1, Config).
+
+
+single_vhost_storage_delete_is_safe(Config) ->
+ct:pal("Start test 3", []),
+ Vhost1 = ?config(vhost1, Config),
+ Vhost2 = ?config(vhost2, Config),
+ Channel1 = ?config(channel1, Config),
+ Channel2 = ?config(channel2, Config),
+ Queue1 = declare_durable_queue(Channel1),
+ Queue2 = declare_durable_queue(Channel2),
+
+ %% Publish messages to both stores
+ publish_persistent_messages(index, Channel1, Queue1),
+ publish_persistent_messages(store, Channel1, Queue1),
+ publish_persistent_messages(index, Channel2, Queue2),
+ publish_persistent_messages(store, Channel2, Queue2),
+
+ queue_is_not_empty(Channel2, Queue2),
+ % Vhost2Dir = vhost_dir(Vhost2, Config),
+ % [StoreFile] = filelib:wildcard(binary_to_list(filename:join([Vhost2Dir, "msg_store_persistent_*", "0.rdq"]))),
+ % ct:pal("Store file ~p~n", [file:read_file(StoreFile)]).
+% ok.
+ rabbit_ct_broker_helpers:stop_broker(Config, 0),
+ delete_vhost_data(Vhost1, Config),
+ rabbit_ct_broker_helpers:start_broker(Config, 0),
+
+ Channel11 = open_channel(Vhost1, Config),
+ Channel21 = open_channel(Vhost2, Config),
+
+ %% There are no Vhost1 messages
+ queue_is_empty(Channel11, Queue1),
+
+ %% But Vhost2 messages are in place
+ queue_is_not_empty(Channel21, Queue2),
+ consume_messages(index, Channel21, Queue2),
+ consume_messages(store, Channel21, Queue2).
+
+declare_durable_queue(Channel) ->
+ QName = list_to_binary(erlang:ref_to_list(make_ref())),
+ #'queue.declare_ok'{queue = QName} =
+ amqp_channel:call(Channel,
+ #'queue.declare'{queue = QName, durable = true}),
+ QName.
+
+publish_persistent_messages(Storage, Channel, Queue) ->
+ MessagePayload = case Storage of
+ index -> binary:copy(<<"=">>, 50);
+ store -> binary:copy(<<"-">>, 150)
+ end,
+ amqp_channel:call(Channel, #'confirm.select'{}),
+ [amqp_channel:call(Channel,
+ #'basic.publish'{routing_key = Queue},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = MessagePayload})
+ || _ <- lists:seq(1, ?MSGS_COUNT)],
+ amqp_channel:wait_for_confirms(Channel).
+
+
+get_folder_size(Vhost, Config) ->
+ Dir = vhost_dir(Vhost, Config),
+ folder_size(Dir).
+
+folder_size(Dir) ->
+ filelib:fold_files(Dir, ".*", true,
+ fun(F,Acc) -> filelib:file_size(F) + Acc end, 0).
+
+get_global_folder_size(Config) ->
+ BaseDir = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_mnesia, dir, []),
+ folder_size(BaseDir).
+
+vhost_dir(Vhost, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_vhost, msg_store_dir_path, [Vhost]).
+
+delete_vhost_data(Vhost, Config) ->
+ Dir = vhost_dir(Vhost, Config),
+ rabbit_file:recursive_delete([Dir]).
+
+queue_is_empty(Channel, Queue) ->
+ #'queue.declare_ok'{queue = Queue, message_count = 0} =
+ amqp_channel:call(Channel,
+ #'queue.declare'{ queue = Queue,
+ durable = true,
+ passive = true}).
+
+queue_is_not_empty(Channel, Queue) ->
+ #'queue.declare_ok'{queue = Queue, message_count = MsgCount} =
+ amqp_channel:call(Channel,
+ #'queue.declare'{ queue = Queue,
+ durable = true,
+ passive = true}),
+ ExpectedCount = ?MSGS_COUNT * 2,
+ ExpectedCount = MsgCount.
+
+consume_messages(Storage, Channel, Queue) ->
+ MessagePayload = case Storage of
+ index -> binary:copy(<<"=">>, 50);
+ store -> binary:copy(<<"-">>, 150)
+ end,
+ lists:foreach(
+ fun(I) ->
+ ct:pal("Consume message ~p~n ~p~n", [I, MessagePayload]),
+ {#'basic.get_ok'{}, Content} =
+ amqp_channel:call(Channel,
+ #'basic.get'{queue = Queue,
+ no_ack = true}),
+ #amqp_msg{payload = MessagePayload} = Content
+ end,
+ lists:seq(1, ?MSGS_COUNT)),
+ ok.
+
+open_channel(Vhost, Config) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {ok, Conn} = amqp_connection:start(
+ #amqp_params_direct{node = Node, virtual_host = Vhost}),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ Chan.
diff --git a/test/plugins_SUITE.erl b/test/plugins_SUITE.erl
deleted file mode 100644
index 8896298df1..0000000000
--- a/test/plugins_SUITE.erl
+++ /dev/null
@@ -1,80 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2016 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(plugins_SUITE).
-
--include_lib("common_test/include/ct.hrl").
-
--compile(export_all).
-
-all() ->
- [
- active_with_single_plugin_dir,
- active_with_multiple_plugin_dirs
- ].
-
-%% -------------------------------------------------------------------
-%% Testsuite setup/teardown.
-%% -------------------------------------------------------------------
-
-init_per_suite(Config) ->
- rabbit_ct_helpers:log_environment(),
- application:load(rabbit),
- rabbit_ct_helpers:run_setup_steps(Config).
-
-end_per_suite(Config) ->
- rabbit_ct_helpers:run_teardown_steps(Config).
-
-init_per_group(_, Config) ->
- Config.
-
-end_per_group(_, Config) ->
- Config.
-
-init_per_testcase(Testcase, Config) ->
- rabbit_ct_helpers:testcase_started(Config, Testcase).
-
-end_per_testcase(Testcase, Config) ->
- rabbit_ct_helpers:testcase_finished(Config, Testcase).
-
-%% -------------------------------------------------------------------
-%% Testcases.
-%% -------------------------------------------------------------------
-
-active_with_single_plugin_dir(Config) ->
- DataDir = rabbit_ct_helpers:get_config(Config, data_dir),
- PluginsDir1 = filename:join(DataDir, "plugins1"),
-
- true = code:add_path(filename:join([PluginsDir1,
- "mock_rabbitmq_plugins_01-0.1.0.ez",
- "mock_rabbitmq_plugins_01-0.1.0", "ebin"])),
- {ok, _} = application:ensure_all_started(mock_rabbitmq_plugins_01),
- application:set_env(rabbit, plugins_dir, PluginsDir1),
-
- [mock_rabbitmq_plugins_01] = rabbit_plugins:active().
-
-active_with_multiple_plugin_dirs(Config) ->
- DataDir = rabbit_ct_helpers:get_config(Config, data_dir),
- PluginsDir1 = filename:join(DataDir, "plugins1"),
- PluginsDir2 = filename:join(DataDir, "plugins2"),
-
- true = code:add_path(filename:join([PluginsDir1,
- "mock_rabbitmq_plugins_01-0.1.0.ez",
- "mock_rabbitmq_plugins_01-0.1.0", "ebin"])),
- {ok, _} = application:ensure_all_started(mock_rabbitmq_plugins_01),
- application:set_env(rabbit, plugins_dir, PluginsDir1 ++ ":" ++ PluginsDir2),
-
- [mock_rabbitmq_plugins_01] = rabbit_plugins:active().
diff --git a/test/plugins_SUITE_data/plugins1/mock_rabbitmq_plugins_01-0.1.0.ez b/test/plugins_SUITE_data/plugins1/mock_rabbitmq_plugins_01-0.1.0.ez
deleted file mode 100644
index 40cba9f16b..0000000000
--- a/test/plugins_SUITE_data/plugins1/mock_rabbitmq_plugins_01-0.1.0.ez
+++ /dev/null
Binary files differ
diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl
index ef85472f48..bbc4447102 100644
--- a/test/rabbitmqctl_integration_SUITE.erl
+++ b/test/rabbitmqctl_integration_SUITE.erl
@@ -31,7 +31,6 @@
-export([list_queues_local/1
,list_queues_offline/1
,list_queues_online/1
- ,manage_global_parameters/1
]).
all() ->
@@ -46,8 +45,7 @@ groups() ->
[list_queues_local
,list_queues_online
,list_queues_offline
- ]},
- {global_parameters, [], [manage_global_parameters]}
+ ]}
].
init_per_suite(Config) ->
@@ -63,13 +61,6 @@ init_per_group(list_queues, Config0) ->
Config1 = declare_some_queues(Config),
rabbit_ct_broker_helpers:stop_node(Config1, NumNodes - 1),
Config1;
-init_per_group(global_parameters,Config) ->
- Config1 = rabbit_ct_helpers:set_config(Config, [
- {rmq_nodename_suffix, ?MODULE}
- ]),
- rabbit_ct_helpers:run_setup_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps());
init_per_group(_, Config) ->
Config.
@@ -144,75 +135,6 @@ list_queues_offline(Config) ->
assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues),
ok.
-manage_global_parameters(Config) ->
- 0 = length(global_parameters(Config)),
- Parameter1Key = global_param1,
- GlobalParameter1ValueAsString = "{\"a\":\"b\", \"c\":\"d\"}",
- ok = control_action(Config, set_global_parameter,
- [atom_to_list(Parameter1Key),
- GlobalParameter1ValueAsString
- ]),
-
- 1 = length(global_parameters(Config)),
-
- GlobalParameter1Value = rabbit_ct_broker_helpers:rpc(
- Config, 0,
- rabbit_runtime_parameters, value_global,
- [Parameter1Key]
- ),
-
- [{<<"a">>,<<"b">>}, {<<"c">>,<<"d">>}] = GlobalParameter1Value,
-
- Parameter2Key = global_param2,
- GlobalParameter2ValueAsString = "{\"e\":\"f\", \"g\":\"h\"}",
- ok = control_action(Config, set_global_parameter,
- [atom_to_list(Parameter2Key),
- GlobalParameter2ValueAsString
- ]),
-
- 2 = length(global_parameters(Config)),
-
- GlobalParameter2Value = rabbit_ct_broker_helpers:rpc(
- Config, 0,
- rabbit_runtime_parameters, value_global,
- [Parameter2Key]
- ),
-
- [{<<"e">>,<<"f">>}, {<<"g">>,<<"h">>}] = GlobalParameter2Value,
-
-
- GlobalParameter1Value2AsString = "{\"a\":\"z\", \"c\":\"d\"}",
- ok = control_action(Config, set_global_parameter,
- [atom_to_list(Parameter1Key),
- GlobalParameter1Value2AsString
- ]),
-
- 2 = length(global_parameters(Config)),
-
- GlobalParameter1Value2 = rabbit_ct_broker_helpers:rpc(
- Config, 0,
- rabbit_runtime_parameters, value_global,
- [Parameter1Key]
- ),
-
- [{<<"a">>,<<"z">>}, {<<"c">>,<<"d">>}] = GlobalParameter1Value2,
-
- ok = control_action(Config, clear_global_parameter,
- [atom_to_list(Parameter1Key)]
- ),
-
- 1 = length(global_parameters(Config)),
-
- not_found = rabbit_ct_broker_helpers:rpc(
- Config, 0,
- rabbit_runtime_parameters, value_global,
- [Parameter1Key]
- ),
-
- ok = control_action(Config, list_global_parameters, []),
-
- ok.
-
%%----------------------------------------------------------------------------
%% Helpers
%%----------------------------------------------------------------------------
@@ -234,11 +156,7 @@ run_list_queues(Config, Node, Args) ->
control_action(Config, Command, Args) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
- rabbit_control_main:action(
- Command, Node, Args, [],
- fun (Format, Args1) ->
- io:format(Format ++ " ...~n", Args1)
- end).
+ rabbit_ct_broker_helpers:control_action(Command, Node, Args, []).
global_parameters(Config) ->
rabbit_ct_broker_helpers:rpc(
diff --git a/test/sync_detection_SUITE.erl b/test/sync_detection_SUITE.erl
index 1e0a66e8fd..3e5ed8208b 100644
--- a/test/sync_detection_SUITE.erl
+++ b/test/sync_detection_SUITE.erl
@@ -210,7 +210,7 @@ slave_pids(Node, Queue) ->
_ -> Pids
end.
-%% The mnesia syncronization takes a while, but we don't want to wait for the
+%% The mnesia synchronization takes a while, but we don't want to wait for the
%% test to fail, since the timetrap is quite high.
wait_for_sync_status(Status, Node, Queue) ->
Max = 10000 / ?LOOP_RECURSION_DELAY,
diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl
index a7f2ef4603..af86371fc2 100644
--- a/test/unit_inbroker_SUITE.erl
+++ b/test/unit_inbroker_SUITE.erl
@@ -22,8 +22,8 @@
-compile(export_all).
--define(PERSISTENT_MSG_STORE, msg_store_persistent).
--define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost).
+-define(TRANSIENT_MSG_STORE, msg_store_transient_vhost).
-define(TIMEOUT_LIST_OPS_PASS, 5000).
-define(TIMEOUT, 30000).
@@ -70,7 +70,8 @@
all() ->
[
{group, parallel_tests},
- {group, non_parallel_tests},
+ {group, non_parallel_tests}
+ ,
{group, backing_queue_tests},
{group, cluster_tests},
@@ -88,24 +89,12 @@ groups() ->
credit_flow_settings,
dynamic_mirroring,
gen_server2_with_state,
- list_operations_timeout_pass,
mcall,
{password_hashing, [], [
password_hashing,
change_password
]},
- {policy_validation, [parallel, {repeat, 20}], [
- ha_policy_validation,
- policy_validation,
- policy_opts_validation,
- queue_master_location_policy_validation,
- queue_modes_policy_validation,
- vhost_removed_while_updating_policy
- ]},
- runtime_parameters,
- set_disk_free_limit_command,
- topic_matching,
- user_management
+ topic_matching
]},
{non_parallel_tests, [], [
app_management, %% Restart RabbitMQ.
@@ -115,9 +104,7 @@ groups() ->
head_message_timestamp_statistics, %% Expect specific statistics.
log_management, %% Check log files.
log_management_during_startup, %% Check log files.
- memory_high_watermark, %% Trigger alarm.
- externally_rotated_logs_are_automatically_reopened, %% Check log files.
- server_status %% Trigger alarm.
+ externally_rotated_logs_are_automatically_reopened %% Check log files.
]},
{backing_queue_tests, [], [
msg_store,
@@ -265,25 +252,41 @@ app_management(Config) ->
?MODULE, app_management1, [Config]).
app_management1(_Config) ->
- control_action(wait, [os:getenv("RABBITMQ_PID_FILE")]),
+ wait_for_application(rabbit),
%% Starting, stopping and diagnostics. Note that we don't try
%% 'report' when the rabbit app is stopped and that we enable
%% tracing for the duration of this function.
- ok = control_action(trace_on, []),
- ok = control_action(stop_app, []),
- ok = control_action(stop_app, []),
- ok = control_action(status, []),
- ok = control_action(cluster_status, []),
- ok = control_action(environment, []),
- ok = control_action(start_app, []),
- ok = control_action(start_app, []),
- ok = control_action(status, []),
- ok = control_action(report, []),
- ok = control_action(cluster_status, []),
- ok = control_action(environment, []),
- ok = control_action(trace_off, []),
+ ok = rabbit_trace:start(<<"/">>),
+ ok = rabbit:stop(),
+ ok = rabbit:stop(),
+ ok = no_exceptions(rabbit, status, []),
+ ok = no_exceptions(rabbit, environment, []),
+ ok = rabbit:start(),
+ ok = rabbit:start(),
+ ok = no_exceptions(rabbit, status, []),
+ ok = no_exceptions(rabbit, environment, []),
+ ok = rabbit_trace:stop(<<"/">>),
passed.
+no_exceptions(Mod, Fun, Args) ->
+ try erlang:apply(Mod, Fun, Args) of _ -> ok
+ catch Type:Ex -> {Type, Ex}
+ end.
+
+wait_for_application(Application) ->
+ wait_for_application(Application, 5000).
+
+wait_for_application(_, Time) when Time =< 0 ->
+ {error, timeout};
+wait_for_application(Application, Time) ->
+ Interval = 100,
+ case lists:keyfind(Application, 1, application:which_applications()) of
+ false ->
+ timer:sleep(Interval),
+ wait_for_application(Application, Time - Interval);
+ _ -> ok
+ end.
+
%% -------------------------------------------------------------------
%% Message store.
%% -------------------------------------------------------------------
@@ -347,7 +350,7 @@ msg_store1(_Config) ->
%% stop and restart, preserving every other msg in 2nd half
ok = rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start_msg_store(
- [], {fun ([]) -> finished;
+ #{}, {fun ([]) -> finished;
([MsgId|MsgIdsTail])
when length(MsgIdsTail) rem 2 == 0 ->
{MsgId, 1, MsgIdsTail};
@@ -468,10 +471,10 @@ on_disk_stop(Pid) ->
msg_store_client_init_capture(MsgStore, Ref) ->
Pid = spawn(fun on_disk_capture/0),
- {Pid, rabbit_msg_store:client_init(
+ {Pid, rabbit_msg_store_vhost_sup:client_init(
MsgStore, Ref, fun (MsgIds, _ActionTaken) ->
Pid ! {on_disk, MsgIds}
- end, undefined)}.
+ end, undefined, <<"/">>)}.
msg_store_contains(Atom, MsgIds, MSCState) ->
Atom = lists:foldl(
@@ -548,14 +551,14 @@ test_msg_store_confirm_timer() ->
Ref = rabbit_guid:gen(),
MsgId = msg_id_bin(1),
Self = self(),
- MSCState = rabbit_msg_store:client_init(
+ MSCState = rabbit_msg_store_vhost_sup:client_init(
?PERSISTENT_MSG_STORE, Ref,
fun (MsgIds, _ActionTaken) ->
case gb_sets:is_member(MsgId, MsgIds) of
true -> Self ! on_disk;
false -> ok
end
- end, undefined),
+ end, undefined, <<"/">>),
ok = msg_store_write([MsgId], MSCState),
ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false),
ok = msg_store_remove([MsgId], MSCState),
@@ -1424,7 +1427,7 @@ nop(_) -> ok.
nop(_, _) -> ok.
msg_store_client_init(MsgStore, Ref) ->
- rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined).
+ rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>).
variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
@@ -1842,7 +1845,7 @@ log_management(Config) ->
?MODULE, log_management1, [Config]).
log_management1(_Config) ->
- [LogFile] = rabbit:log_locations(),
+ [LogFile|_] = rabbit:log_locations(),
Suffix = ".0",
ok = test_logs_working([LogFile]),
@@ -1852,7 +1855,7 @@ log_management1(_Config) ->
ok = test_logs_working([LogFile]),
%% simple log rotation
- ok = control_action(rotate_logs, []),
+ ok = rabbit:rotate_logs(),
%% FIXME: rabbit:rotate_logs/0 is asynchronous due to a limitation
%% in Lager. Therefore, we have no choice but to wait an arbitrary
%% amount of time.
@@ -1862,53 +1865,53 @@ log_management1(_Config) ->
%% log rotation on empty files
ok = clean_logs([LogFile], Suffix),
- ok = control_action(rotate_logs, []),
+ ok = rabbit:rotate_logs(),
timer:sleep(2000),
[{error, enoent}, true] = non_empty_files([LogFile ++ Suffix, LogFile]),
%% logs with suffix are not writable
- ok = control_action(rotate_logs, []),
+ ok = rabbit:rotate_logs(),
timer:sleep(2000),
ok = make_files_non_writable([LogFile ++ Suffix]),
- ok = control_action(rotate_logs, []),
+ ok = rabbit:rotate_logs(),
timer:sleep(2000),
ok = test_logs_working([LogFile]),
%% rotate when original log files are not writable
ok = make_files_non_writable([LogFile]),
- ok = control_action(rotate_logs, []),
+ ok = rabbit:rotate_logs(),
timer:sleep(2000),
%% logging directed to tty (first, remove handlers)
- ok = control_action(stop_app, []),
+ ok = rabbit:stop(),
ok = clean_logs([LogFile], Suffix),
ok = application:set_env(rabbit, lager_handler, tty),
application:unset_env(lager, handlers),
application:unset_env(lager, extra_sinks),
- ok = control_action(start_app, []),
+ ok = rabbit:start(),
timer:sleep(200),
rabbit_log:info("test info"),
[{error, enoent}] = empty_files([LogFile]),
%% rotate logs when logging is turned off
- ok = control_action(stop_app, []),
+ ok = rabbit:stop(),
ok = clean_logs([LogFile], Suffix),
ok = application:set_env(rabbit, lager_handler, false),
application:unset_env(lager, handlers),
application:unset_env(lager, extra_sinks),
- ok = control_action(start_app, []),
+ ok = rabbit:start(),
timer:sleep(200),
rabbit_log:error("test error"),
timer:sleep(200),
[{error, enoent}] = empty_files([LogFile]),
%% cleanup
- ok = control_action(stop_app, []),
+ ok = rabbit:stop(),
ok = clean_logs([LogFile], Suffix),
ok = application:set_env(rabbit, lager_handler, LogFile),
application:unset_env(lager, handlers),
application:unset_env(lager, extra_sinks),
- ok = control_action(start_app, []),
+ ok = rabbit:start(),
ok = test_logs_working([LogFile]),
passed.
@@ -1917,84 +1920,80 @@ log_management_during_startup(Config) ->
?MODULE, log_management_during_startup1, [Config]).
log_management_during_startup1(_Config) ->
- [LogFile] = rabbit:log_locations(),
+ [LogFile|_] = rabbit:log_locations(),
Suffix = ".0",
%% start application with simple tty logging
- ok = control_action(stop_app, []),
+ ok = rabbit:stop(),
ok = clean_logs([LogFile], Suffix),
ok = application:set_env(rabbit, lager_handler, tty),
application:unset_env(lager, handlers),
application:unset_env(lager, extra_sinks),
- ok = control_action(start_app, []),
+ ok = rabbit:start(),
%% start application with logging to non-existing directory
NonExistent = "/tmp/non-existent/test.log",
delete_file(NonExistent),
delete_file(filename:dirname(NonExistent)),
- ok = control_action(stop_app, []),
+ ok = rabbit:stop(),
ok = application:set_env(rabbit, lager_handler, NonExistent),
application:unset_env(lager, handlers),
application:unset_env(lager, extra_sinks),
- ok = control_action(start_app, []),
+ ok = rabbit:start(),
%% start application with logging to directory with no
%% write permissions
- ok = control_action(stop_app, []),
+ ok = rabbit:stop(),
NoPermission1 = "/var/empty/test.log",
delete_file(NoPermission1),
delete_file(filename:dirname(NoPermission1)),
- ok = control_action(stop_app, []),
+ ok = rabbit:stop(),
ok = application:set_env(rabbit, lager_handler, NoPermission1),
application:unset_env(lager, handlers),
application:unset_env(lager, extra_sinks),
- ok = case control_action(start_app, []) of
- ok -> exit({got_success_but_expected_failure,
- log_rotation_no_write_permission_dir_test});
- {badrpc,
- {'EXIT', {error, {cannot_log_to_file, _, Reason1}}}}
- when Reason1 =:= enoent orelse Reason1 =:= eacces -> ok;
- {badrpc,
- {'EXIT',
- {error, {cannot_log_to_file, _,
- {cannot_create_parent_dirs, _, Reason1}}}}}
- when Reason1 =:= eperm orelse
- Reason1 =:= eacces orelse
- Reason1 =:= enoent-> ok
- end,
+ ok = try rabbit:start() of
+ ok -> exit({got_success_but_expected_failure,
+ log_rotation_no_write_permission_dir_test})
+ catch
+ _:{error, {cannot_log_to_file, _, Reason1}}
+ when Reason1 =:= enoent orelse Reason1 =:= eacces -> ok;
+ _:{error, {cannot_log_to_file, _,
+ {cannot_create_parent_dirs, _, Reason1}}}
+ when Reason1 =:= eperm orelse
+ Reason1 =:= eacces orelse
+ Reason1 =:= enoent-> ok
+ end,
%% start application with logging to a subdirectory which
%% parent directory has no write permissions
NoPermission2 = "/var/empty/non-existent/test.log",
delete_file(NoPermission2),
delete_file(filename:dirname(NoPermission2)),
- case control_action(stop_app, []) of
+ case rabbit:stop() of
ok -> ok;
{error, lager_not_running} -> ok
end,
ok = application:set_env(rabbit, lager_handler, NoPermission2),
application:unset_env(lager, handlers),
application:unset_env(lager, extra_sinks),
- ok = case control_action(start_app, []) of
- ok -> exit({got_success_but_expected_failure,
- log_rotatation_parent_dirs_test});
- {badrpc,
- {'EXIT', {error, {cannot_log_to_file, _, Reason2}}}}
- when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok;
- {badrpc,
- {'EXIT',
- {error, {cannot_log_to_file, _,
- {cannot_create_parent_dirs, _, Reason2}}}}}
- when Reason2 =:= eperm orelse
- Reason2 =:= eacces orelse
- Reason2 =:= enoent-> ok
- end,
+ ok = try rabbit:start() of
+ ok -> exit({got_success_but_expected_failure,
+ log_rotatation_parent_dirs_test})
+ catch
+ _:{error, {cannot_log_to_file, _, Reason2}}
+ when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok;
+ _:{error, {cannot_log_to_file, _,
+ {cannot_create_parent_dirs, _, Reason2}}}
+ when Reason2 =:= eperm orelse
+ Reason2 =:= eacces orelse
+ Reason2 =:= enoent-> ok
+ end,
%% cleanup
ok = application:set_env(rabbit, lager_handler, LogFile),
application:unset_env(lager, handlers),
application:unset_env(lager, extra_sinks),
- ok = control_action(start_app, []),
+ ok = rabbit:start(),
passed.
externally_rotated_logs_are_automatically_reopened(Config) ->
@@ -2002,7 +2001,7 @@ externally_rotated_logs_are_automatically_reopened(Config) ->
?MODULE, externally_rotated_logs_are_automatically_reopened1, [Config]).
externally_rotated_logs_are_automatically_reopened1(_Config) ->
- [LogFile] = rabbit:log_locations(),
+ [LogFile|_] = rabbit:log_locations(),
%% Make sure log file is opened
ok = test_logs_working([LogFile]),
@@ -2185,599 +2184,6 @@ change_password1(_Config) ->
%% rabbitmqctl.
%% -------------------------------------------------------------------
-list_operations_timeout_pass(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, list_operations_timeout_pass1, [Config]).
-
-list_operations_timeout_pass1(Config) ->
- %% create a few things so there is some useful information to list
- {_Writer1, Limiter1, Ch1} = rabbit_ct_broker_helpers:test_channel(),
- {_Writer2, Limiter2, Ch2} = rabbit_ct_broker_helpers:test_channel(),
-
- [Q, Q2] = [Queue || Name <- [<<"list_operations_timeout_pass-q1">>,
- <<"list_operations_timeout_pass-q2">>],
- {new, Queue = #amqqueue{}} <-
- [rabbit_amqqueue:declare(
- rabbit_misc:r(<<"/">>, queue, Name),
- false, false, [], none)]],
-
- ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch1, Limiter1, false, 0, <<"ctag1">>, true, [],
- undefined),
- ok = rabbit_amqqueue:basic_consume(
- Q2, true, Ch2, Limiter2, false, 0, <<"ctag2">>, true, [],
- undefined),
-
- %% list users
- ok = control_action(add_user,
- ["list_operations_timeout_pass-user",
- "list_operations_timeout_pass-password"]),
- {error, {user_already_exists, _}} =
- control_action(add_user,
- ["list_operations_timeout_pass-user",
- "list_operations_timeout_pass-password"]),
- ok = control_action_t(list_users, [], ?TIMEOUT_LIST_OPS_PASS),
-
- %% list parameters
- ok = dummy_runtime_parameters:register(),
- ok = control_action(set_parameter, ["test", "good", "123"]),
- ok = control_action_t(list_parameters, [], ?TIMEOUT_LIST_OPS_PASS),
- ok = control_action(clear_parameter, ["test", "good"]),
- dummy_runtime_parameters:unregister(),
-
- %% list vhosts
- ok = control_action(add_vhost, ["/list_operations_timeout_pass-vhost"]),
- {error, {vhost_already_exists, _}} =
- control_action(add_vhost, ["/list_operations_timeout_pass-vhost"]),
- ok = control_action_t(list_vhosts, [], ?TIMEOUT_LIST_OPS_PASS),
-
- %% list permissions
- ok = control_action(set_permissions,
- ["list_operations_timeout_pass-user", ".*", ".*", ".*"],
- [{"-p", "/list_operations_timeout_pass-vhost"}]),
- ok = control_action_t(list_permissions, [],
- [{"-p", "/list_operations_timeout_pass-vhost"}],
- ?TIMEOUT_LIST_OPS_PASS),
-
- %% list user permissions
- ok = control_action_t(list_user_permissions,
- ["list_operations_timeout_pass-user"],
- ?TIMEOUT_LIST_OPS_PASS),
-
- %% list policies
- ok = control_action_opts(
- ["set_policy", "list_operations_timeout_pass-policy", ".*",
- "{\"ha-mode\":\"all\"}"]),
- ok = control_action_t(list_policies, [], ?TIMEOUT_LIST_OPS_PASS),
- ok = control_action(clear_policy, ["list_operations_timeout_pass-policy"]),
-
- %% list queues
- ok = info_action_t(list_queues,
- rabbit_amqqueue:info_keys(), false,
- ?TIMEOUT_LIST_OPS_PASS),
-
- %% list exchanges
- ok = info_action_t(list_exchanges,
- rabbit_exchange:info_keys(), true,
- ?TIMEOUT_LIST_OPS_PASS),
-
- %% list bindings
- ok = info_action_t(list_bindings,
- rabbit_binding:info_keys(), true,
- ?TIMEOUT_LIST_OPS_PASS),
-
- %% list connections
- H = ?config(rmq_hostname, Config),
- P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
- {ok, C1} = gen_tcp:connect(H, P, [binary, {active, false}]),
- gen_tcp:send(C1, <<"AMQP", 0, 0, 9, 1>>),
- {ok, <<1,0,0>>} = gen_tcp:recv(C1, 3, 100),
-
- {ok, C2} = gen_tcp:connect(H, P, [binary, {active, false}]),
- gen_tcp:send(C2, <<"AMQP", 0, 0, 9, 1>>),
- {ok, <<1,0,0>>} = gen_tcp:recv(C2, 3, 100),
-
- ok = info_action_t(
- list_connections, rabbit_networking:connection_info_keys(), false,
- ?TIMEOUT_LIST_OPS_PASS),
-
- %% list consumers
- ok = info_action_t(
- list_consumers, rabbit_amqqueue:consumer_info_keys(), false,
- ?TIMEOUT_LIST_OPS_PASS),
-
- %% list channels
- ok = info_action_t(
- list_channels, rabbit_channel:info_keys(), false,
- ?TIMEOUT_LIST_OPS_PASS),
-
- %% do some cleaning up
- ok = control_action(delete_user, ["list_operations_timeout_pass-user"]),
- {error, {no_such_user, _}} =
- control_action(delete_user, ["list_operations_timeout_pass-user"]),
-
- ok = control_action(delete_vhost, ["/list_operations_timeout_pass-vhost"]),
- {error, {no_such_vhost, _}} =
- control_action(delete_vhost, ["/list_operations_timeout_pass-vhost"]),
-
- %% close_connection
- Conns = rabbit_ct_broker_helpers:get_connection_pids([C1, C2]),
- [ok, ok] = [ok = control_action(
- close_connection, [rabbit_misc:pid_to_string(ConnPid), "go away"])
- || ConnPid <- Conns],
-
- %% cleanup queues
- [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
-
- [begin
- unlink(Chan),
- ok = rabbit_channel:shutdown(Chan)
- end || Chan <- [Ch1, Ch2]],
- passed.
-
-user_management(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, user_management1, [Config]).
-
-user_management1(_Config) ->
-
- %% lots if stuff that should fail
- {error, {no_such_user, _}} =
- control_action(delete_user,
- ["user_management-user"]),
- {error, {no_such_user, _}} =
- control_action(change_password,
- ["user_management-user", "user_management-password"]),
- {error, {no_such_vhost, _}} =
- control_action(delete_vhost,
- ["/user_management-vhost"]),
- {error, {no_such_user, _}} =
- control_action(set_permissions,
- ["user_management-user", ".*", ".*", ".*"]),
- {error, {no_such_user, _}} =
- control_action(clear_permissions,
- ["user_management-user"]),
- {error, {no_such_user, _}} =
- control_action(list_user_permissions,
- ["user_management-user"]),
- {error, {no_such_vhost, _}} =
- control_action(list_permissions, [],
- [{"-p", "/user_management-vhost"}]),
- {error, {invalid_regexp, _, _}} =
- control_action(set_permissions,
- ["guest", "+foo", ".*", ".*"]),
- {error, {no_such_user, _}} =
- control_action(set_user_tags,
- ["user_management-user", "bar"]),
-
- %% user creation
- ok = control_action(add_user,
- ["user_management-user", "user_management-password"]),
- {error, {user_already_exists, _}} =
- control_action(add_user,
- ["user_management-user", "user_management-password"]),
- ok = control_action(clear_password,
- ["user_management-user"]),
- ok = control_action(change_password,
- ["user_management-user", "user_management-newpassword"]),
-
- TestTags = fun (Tags) ->
- Args = ["user_management-user" | [atom_to_list(T) || T <- Tags]],
- ok = control_action(set_user_tags, Args),
- {ok, #internal_user{tags = Tags}} =
- rabbit_auth_backend_internal:lookup_user(
- <<"user_management-user">>),
- ok = control_action(list_users, [])
- end,
- TestTags([foo, bar, baz]),
- TestTags([administrator]),
- TestTags([]),
-
- %% user authentication
- ok = control_action(authenticate_user,
- ["user_management-user", "user_management-newpassword"]),
- {refused, _User, _Format, _Params} =
- control_action(authenticate_user,
- ["user_management-user", "user_management-password"]),
-
- %% vhost creation
- ok = control_action(add_vhost,
- ["/user_management-vhost"]),
- {error, {vhost_already_exists, _}} =
- control_action(add_vhost,
- ["/user_management-vhost"]),
- ok = control_action(list_vhosts, []),
-
- %% user/vhost mapping
- ok = control_action(set_permissions,
- ["user_management-user", ".*", ".*", ".*"],
- [{"-p", "/user_management-vhost"}]),
- ok = control_action(set_permissions,
- ["user_management-user", ".*", ".*", ".*"],
- [{"-p", "/user_management-vhost"}]),
- ok = control_action(set_permissions,
- ["user_management-user", ".*", ".*", ".*"],
- [{"-p", "/user_management-vhost"}]),
- ok = control_action(list_permissions, [],
- [{"-p", "/user_management-vhost"}]),
- ok = control_action(list_permissions, [],
- [{"-p", "/user_management-vhost"}]),
- ok = control_action(list_user_permissions,
- ["user_management-user"]),
-
- %% user/vhost unmapping
- ok = control_action(clear_permissions,
- ["user_management-user"], [{"-p", "/user_management-vhost"}]),
- ok = control_action(clear_permissions,
- ["user_management-user"], [{"-p", "/user_management-vhost"}]),
-
- %% vhost deletion
- ok = control_action(delete_vhost,
- ["/user_management-vhost"]),
- {error, {no_such_vhost, _}} =
- control_action(delete_vhost,
- ["/user_management-vhost"]),
-
- %% deleting a populated vhost
- ok = control_action(add_vhost,
- ["/user_management-vhost"]),
- ok = control_action(set_permissions,
- ["user_management-user", ".*", ".*", ".*"],
- [{"-p", "/user_management-vhost"}]),
- {new, _} = rabbit_amqqueue:declare(
- rabbit_misc:r(<<"/user_management-vhost">>, queue,
- <<"user_management-vhost-queue">>),
- true, false, [], none),
- ok = control_action(delete_vhost,
- ["/user_management-vhost"]),
-
- %% user deletion
- ok = control_action(delete_user,
- ["user_management-user"]),
- {error, {no_such_user, _}} =
- control_action(delete_user,
- ["user_management-user"]),
-
- passed.
-
-runtime_parameters(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, runtime_parameters1, [Config]).
-
-runtime_parameters1(_Config) ->
- dummy_runtime_parameters:register(),
- Good = fun(L) -> ok = control_action(set_parameter, L) end,
- Bad = fun(L) -> {error_string, _} = control_action(set_parameter, L) end,
-
- %% Acceptable for bijection
- Good(["test", "good", "\"ignore\""]),
- Good(["test", "good", "123"]),
- Good(["test", "good", "true"]),
- Good(["test", "good", "false"]),
- Good(["test", "good", "null"]),
- Good(["test", "good", "{\"key\": \"value\"}"]),
-
- %% Invalid json
- Bad(["test", "good", "atom"]),
- Bad(["test", "good", "{\"foo\": \"bar\""]),
- Bad(["test", "good", "{foo: \"bar\"}"]),
-
- %% Test actual validation hook
- Good(["test", "maybe", "\"good\""]),
- Bad(["test", "maybe", "\"bad\""]),
- Good(["test", "admin", "\"ignore\""]), %% ctl means 'user' -> none
-
- ok = control_action(list_parameters, []),
-
- ok = control_action(clear_parameter, ["test", "good"]),
- ok = control_action(clear_parameter, ["test", "maybe"]),
- ok = control_action(clear_parameter, ["test", "admin"]),
- {error_string, _} =
- control_action(clear_parameter, ["test", "neverexisted"]),
-
- %% We can delete for a component that no longer exists
- Good(["test", "good", "\"ignore\""]),
- dummy_runtime_parameters:unregister(),
- ok = control_action(clear_parameter, ["test", "good"]),
- passed.
-
-policy_validation(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, policy_validation1, [Config]).
-
-policy_validation1(_Config) ->
- PolicyName = "runtime_parameters-policy",
- dummy_runtime_parameters:register_policy_validator(),
- SetPol = fun (Key, Val) ->
- control_action_opts(
- ["set_policy", PolicyName, ".*",
- rabbit_misc:format("{\"~s\":~p}", [Key, Val])])
- end,
- OK = fun (Key, Val) ->
- ok = SetPol(Key, Val),
- true = does_policy_exist(PolicyName,
- [{definition, [{list_to_binary(Key), Val}]}])
- end,
-
- OK("testeven", []),
- OK("testeven", [1, 2]),
- OK("testeven", [1, 2, 3, 4]),
- OK("testpos", [2, 5, 5678]),
-
- {error_string, _} = SetPol("testpos", [-1, 0, 1]),
- {error_string, _} = SetPol("testeven", [ 1, 2, 3]),
-
- ok = control_action(clear_policy, [PolicyName]),
- dummy_runtime_parameters:unregister_policy_validator(),
- passed.
-
-policy_opts_validation(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, policy_opts_validation1, [Config]).
-
-policy_opts_validation1(_Config) ->
- PolicyName = "policy_opts_validation-policy",
- Set = fun (Extra) -> control_action_opts(
- ["set_policy", PolicyName,
- ".*", "{\"ha-mode\":\"all\"}"
- | Extra]) end,
- OK = fun (Extra, Props) ->
- ok = Set(Extra),
- true = does_policy_exist(PolicyName, Props)
- end,
- Fail = fun (Extra) ->
- case Set(Extra) of
- {error_string, _} -> ok;
- no_command when Extra =:= ["--priority"] -> ok;
- no_command when Extra =:= ["--apply-to"] -> ok;
- {'EXIT',
- {function_clause,
- [{rabbit_control_main,action, _, _} | _]}}
- when Extra =:= ["--offline"] -> ok
- end
- end,
-
- OK ([], [{priority, 0}, {'apply-to', <<"all">>}]),
-
- OK (["--priority", "0"], [{priority, 0}]),
- OK (["--priority", "3"], [{priority, 3}]),
- Fail(["--priority", "banana"]),
- Fail(["--priority"]),
-
- OK (["--apply-to", "all"], [{'apply-to', <<"all">>}]),
- OK (["--apply-to", "queues"], [{'apply-to', <<"queues">>}]),
- Fail(["--apply-to", "bananas"]),
- Fail(["--apply-to"]),
-
- OK (["--priority", "3", "--apply-to", "queues"], [{priority, 3}, {'apply-to', <<"queues">>}]),
- Fail(["--priority", "banana", "--apply-to", "queues"]),
- Fail(["--priority", "3", "--apply-to", "bananas"]),
-
- Fail(["--offline"]),
-
- ok = control_action(clear_policy, [PolicyName]),
- passed.
-
-ha_policy_validation(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, ha_policy_validation1, [Config]).
-
-ha_policy_validation1(_Config) ->
- PolicyName = "ha_policy_validation-policy",
- Set = fun (JSON) -> control_action_opts(
- ["set_policy", PolicyName,
- ".*", JSON]) end,
- OK = fun (JSON, Def) ->
- ok = Set(JSON),
- true = does_policy_exist(PolicyName, [{definition, Def}])
- end,
- Fail = fun (JSON) -> {error_string, _} = Set(JSON) end,
-
- OK ("{\"ha-mode\":\"all\"}", [{<<"ha-mode">>, <<"all">>}]),
- Fail("{\"ha-mode\":\"made_up\"}"),
-
- Fail("{\"ha-mode\":\"nodes\"}"),
- Fail("{\"ha-mode\":\"nodes\",\"ha-params\":2}"),
- Fail("{\"ha-mode\":\"nodes\",\"ha-params\":[\"a\",2]}"),
- OK ("{\"ha-mode\":\"nodes\",\"ha-params\":[\"a\",\"b\"]}",
- [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [<<"a">>, <<"b">>]}]),
- Fail("{\"ha-params\":[\"a\",\"b\"]}"),
-
- Fail("{\"ha-mode\":\"exactly\"}"),
- Fail("{\"ha-mode\":\"exactly\",\"ha-params\":[\"a\",\"b\"]}"),
- OK ("{\"ha-mode\":\"exactly\",\"ha-params\":2}",
- [{<<"ha-mode">>, <<"exactly">>}, {<<"ha-params">>, 2}]),
- Fail("{\"ha-params\":2}"),
-
- OK ("{\"ha-mode\":\"all\",\"ha-sync-mode\":\"manual\"}",
- [{<<"ha-mode">>, <<"all">>}, {<<"ha-sync-mode">>, <<"manual">>}]),
- OK ("{\"ha-mode\":\"all\",\"ha-sync-mode\":\"automatic\"}",
- [{<<"ha-mode">>, <<"all">>}, {<<"ha-sync-mode">>, <<"automatic">>}]),
- Fail("{\"ha-mode\":\"all\",\"ha-sync-mode\":\"made_up\"}"),
- Fail("{\"ha-sync-mode\":\"manual\"}"),
- Fail("{\"ha-sync-mode\":\"automatic\"}"),
-
- ok = control_action(clear_policy, [PolicyName]),
- passed.
-
-queue_master_location_policy_validation(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, queue_master_location_policy_validation1, [Config]).
-
-queue_master_location_policy_validation1(_Config) ->
- PolicyName = "queue_master_location_policy_validation-policy",
- Set = fun (JSON) ->
- control_action_opts(
- ["set_policy", PolicyName, ".*", JSON])
- end,
- OK = fun (JSON, Def) ->
- ok = Set(JSON),
- true = does_policy_exist(PolicyName, [{definition, Def}])
- end,
- Fail = fun (JSON) -> {error_string, _} = Set(JSON) end,
-
- OK ("{\"queue-master-locator\":\"min-masters\"}",
- [{<<"queue-master-locator">>, <<"min-masters">>}]),
- OK ("{\"queue-master-locator\":\"client-local\"}",
- [{<<"queue-master-locator">>, <<"client-local">>}]),
- OK ("{\"queue-master-locator\":\"random\"}",
- [{<<"queue-master-locator">>, <<"random">>}]),
- Fail("{\"queue-master-locator\":\"made_up\"}"),
-
- ok = control_action(clear_policy, [PolicyName]),
- passed.
-
-queue_modes_policy_validation(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, queue_modes_policy_validation1, [Config]).
-
-queue_modes_policy_validation1(_Config) ->
- PolicyName = "queue_modes_policy_validation-policy",
- Set = fun (JSON) ->
- control_action_opts(
- ["set_policy", PolicyName, ".*", JSON])
- end,
- OK = fun (JSON, Def) ->
- ok = Set(JSON),
- true = does_policy_exist(PolicyName, [{definition, Def}])
- end,
- Fail = fun (JSON) -> {error_string, _} = Set(JSON) end,
-
- OK ("{\"queue-mode\":\"lazy\"}",
- [{<<"queue-mode">>, <<"lazy">>}]),
- OK ("{\"queue-mode\":\"default\"}",
- [{<<"queue-mode">>, <<"default">>}]),
- Fail("{\"queue-mode\":\"wrong\"}"),
-
- ok = control_action(clear_policy, [PolicyName]),
- passed.
-
-vhost_removed_while_updating_policy(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, vhost_removed_while_updating_policy1, [Config]).
-
-vhost_removed_while_updating_policy1(_Config) ->
- VHost = "/vhost_removed_while_updating_policy-vhost",
- PolicyName = "vhost_removed_while_updating_policy-policy",
-
- ok = control_action(add_vhost, [VHost]),
- ok = control_action_opts(
- ["set_policy", "-p", VHost, PolicyName, ".*", "{\"ha-mode\":\"all\"}"]),
- true = does_policy_exist(PolicyName, []),
-
- %% Removing the vhost triggers the deletion of the policy. Once
- %% the policy and the vhost are actually removed, RabbitMQ calls
- %% update_policies() which lists policies on the given vhost. This
- %% obviously fails because the vhost is gone, but the call should
- %% still succeed.
- ok = control_action(delete_vhost, [VHost]),
- false = does_policy_exist(PolicyName, []),
-
- passed.
-
-does_policy_exist(PolicyName, Props) ->
- PolicyNameBin = list_to_binary(PolicyName),
- Policies = lists:filter(
- fun(Policy) ->
- lists:member({name, PolicyNameBin}, Policy)
- end, rabbit_policy:list()),
- case Policies of
- [Policy] -> check_policy_props(Policy, Props);
- [] -> false;
- _ -> false
- end.
-
-check_policy_props(Policy, [Prop | Rest]) ->
- case lists:member(Prop, Policy) of
- true -> check_policy_props(Policy, Rest);
- false -> false
- end;
-check_policy_props(_Policy, []) ->
- true.
-
-server_status(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, server_status1, [Config]).
-
-server_status1(Config) ->
- %% create a few things so there is some useful information to list
- {_Writer, Limiter, Ch} = rabbit_ct_broker_helpers:test_channel(),
- [Q, Q2] = [Queue || {Name, Owner} <- [{<<"server_status-q1">>, none},
- {<<"server_status-q2">>, self()}],
- {new, Queue = #amqqueue{}} <-
- [rabbit_amqqueue:declare(
- rabbit_misc:r(<<"/">>, queue, Name),
- false, false, [], Owner)]],
- ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, Limiter, false, 0, <<"ctag">>, true, [], undefined),
-
- %% list queues
- ok = info_action(list_queues,
- rabbit_amqqueue:info_keys(), true),
-
- %% as we have no way to collect output of
- %% info_action/3 call, the only way we
- %% can test individual queueinfoitems is by directly calling
- %% rabbit_amqqueue:info/2
- [{exclusive, false}] = rabbit_amqqueue:info(Q, [exclusive]),
- [{exclusive, true}] = rabbit_amqqueue:info(Q2, [exclusive]),
-
- %% list exchanges
- ok = info_action(list_exchanges,
- rabbit_exchange:info_keys(), true),
-
- %% list bindings
- ok = info_action(list_bindings,
- rabbit_binding:info_keys(), true),
- %% misc binding listing APIs
- [_|_] = rabbit_binding:list_for_source(
- rabbit_misc:r(<<"/">>, exchange, <<"">>)),
- [_] = rabbit_binding:list_for_destination(
- rabbit_misc:r(<<"/">>, queue, <<"server_status-q1">>)),
- [_] = rabbit_binding:list_for_source_and_destination(
- rabbit_misc:r(<<"/">>, exchange, <<"">>),
- rabbit_misc:r(<<"/">>, queue, <<"server_status-q1">>)),
-
- %% list connections
- H = ?config(rmq_hostname, Config),
- P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
- {ok, C} = gen_tcp:connect(H, P, []),
- gen_tcp:send(C, <<"AMQP", 0, 0, 9, 1>>),
- timer:sleep(100),
- ok = info_action(list_connections,
- rabbit_networking:connection_info_keys(), false),
- %% close_connection
- [ConnPid] = rabbit_ct_broker_helpers:get_connection_pids([C]),
- ok = control_action(close_connection,
- [rabbit_misc:pid_to_string(ConnPid), "go away"]),
-
- %% list channels
- ok = info_action(list_channels, rabbit_channel:info_keys(), false),
-
- %% list consumers
- ok = control_action(list_consumers, []),
-
- %% set vm memory high watermark
- HWM = vm_memory_monitor:get_vm_memory_high_watermark(),
- ok = control_action(set_vm_memory_high_watermark, ["1"]),
- ok = control_action(set_vm_memory_high_watermark, ["1.0"]),
- %% this will trigger an alarm
- ok = control_action(set_vm_memory_high_watermark, ["0.0"]),
- %% reset
- ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]),
-
- %% eval
- {error_string, _} = control_action(eval, ["\""]),
- {error_string, _} = control_action(eval, ["a("]),
- ok = control_action(eval, ["a."]),
-
- %% cleanup
- [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
-
- unlink(Ch),
- ok = rabbit_channel:shutdown(Ch),
-
- passed.
amqp_connection_refusal(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
@@ -3395,39 +2801,6 @@ configurable_server_properties1(_Config) ->
application:set_env(rabbit, server_properties, ServerProperties),
passed.
-memory_high_watermark(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, memory_high_watermark1, [Config]).
-
-memory_high_watermark1(_Config) ->
- %% set vm memory high watermark
- HWM = vm_memory_monitor:get_vm_memory_high_watermark(),
- %% this will trigger an alarm
- ok = control_action(set_vm_memory_high_watermark,
- ["absolute", "2000"]),
- [{{resource_limit,memory,_},[]}] = rabbit_alarm:get_alarms(),
- %% reset
- ok = control_action(set_vm_memory_high_watermark,
- [float_to_list(HWM)]),
-
- passed.
-
-set_disk_free_limit_command(Config) ->
- passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, set_disk_free_limit_command1, [Config]).
-
-set_disk_free_limit_command1(_Config) ->
- ok = control_action(set_disk_free_limit,
- ["2000kiB"]),
- 2048000 = rabbit_disk_monitor:get_disk_free_limit(),
- ok = control_action(set_disk_free_limit,
- ["mem_relative", "1.1"]),
- ExpectedLimit = 1.1 * vm_memory_monitor:get_total_memory(),
- % Total memory is unstable, so checking order
- true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() < 1.2,
- true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() > 0.98,
- ok = control_action(set_disk_free_limit, ["50MB"]),
- passed.
disk_monitor(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
@@ -3715,83 +3088,6 @@ expect_event(Tag, Key, Type) ->
%% rabbitmqctl helpers.
%% ---------------------------------------------------------------------------
-control_action(Command, Args) ->
- control_action(Command, node(), Args, default_options()).
-
-control_action(Command, Args, NewOpts) ->
- control_action(Command, node(), Args,
- expand_options(default_options(), NewOpts)).
-
-control_action(Command, Node, Args, Opts) ->
- case catch rabbit_control_main:action(
- Command, Node, Args, Opts,
- fun (Format, Args1) ->
- io:format(Format ++ " ...~n", Args1)
- end) of
- ok ->
- io:format("done.~n"),
- ok;
- {ok, Result} ->
- rabbit_control_misc:print_cmd_result(Command, Result),
- ok;
- Other ->
- io:format("failed: ~p~n", [Other]),
- Other
- end.
-
-control_action_t(Command, Args, Timeout) when is_number(Timeout) ->
- control_action_t(Command, node(), Args, default_options(), Timeout).
-
-control_action_t(Command, Args, NewOpts, Timeout) when is_number(Timeout) ->
- control_action_t(Command, node(), Args,
- expand_options(default_options(), NewOpts),
- Timeout).
-
-control_action_t(Command, Node, Args, Opts, Timeout) when is_number(Timeout) ->
- case catch rabbit_control_main:action(
- Command, Node, Args, Opts,
- fun (Format, Args1) ->
- io:format(Format ++ " ...~n", Args1)
- end, Timeout) of
- ok ->
- io:format("done.~n"),
- ok;
- {ok, Result} ->
- rabbit_control_misc:print_cmd_result(Command, Result),
- ok;
- Other ->
- io:format("failed: ~p~n", [Other]),
- Other
- end.
-
-control_action_opts(Raw) ->
- NodeStr = atom_to_list(node()),
- case rabbit_control_main:parse_arguments(Raw, NodeStr) of
- {ok, {Cmd, Opts, Args}} ->
- case control_action(Cmd, node(), Args, Opts) of
- ok -> ok;
- Error -> Error
- end;
- Error ->
- Error
- end.
-
-info_action(Command, Args, CheckVHost) ->
- ok = control_action(Command, []),
- if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]);
- true -> ok
- end,
- ok = control_action(Command, lists:map(fun atom_to_list/1, Args)),
- {bad_argument, dummy} = control_action(Command, ["dummy"]),
- ok.
-
-info_action_t(Command, Args, CheckVHost, Timeout) when is_number(Timeout) ->
- if CheckVHost -> ok = control_action_t(Command, [], ["-p", "/"], Timeout);
- true -> ok
- end,
- ok = control_action_t(Command, lists:map(fun atom_to_list/1, Args), Timeout),
- ok.
-
default_options() -> [{"-p", "/"}, {"-q", "false"}].
expand_options(As, Bs) ->