summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-08-09 14:41:01 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2017-08-09 14:41:01 +0100
commitad9f941f95718e5cef9ee544d20a66e8c3b0b88c (patch)
tree9b7ff7b39d4c6ff93fd65b31f4296eb84028db6a
parente7a67da88293ebe5d8baf2f73eeb04d7d235a2dc (diff)
parent70fabf3eae712ee5e13f5131b5d8443b7147203c (diff)
downloadrabbitmq-server-git-ad9f941f95718e5cef9ee544d20a66e8c3b0b88c.tar.gz
Merge branch 'master' into rabbitmq-cli-207
-rwxr-xr-x.travis.sh258
-rw-r--r--.travis.yml65
-rwxr-xr-xscripts/rabbitmq-env3
-rw-r--r--scripts/rabbitmq-env.bat15
-rwxr-xr-xscripts/rabbitmq-server2
-rw-r--r--scripts/rabbitmq-server.bat3
-rw-r--r--scripts/rabbitmq-service.bat5
-rw-r--r--src/rabbit.erl16
-rw-r--r--src/rabbit_amqqueue.erl64
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl6
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_connection_tracking_handler.erl19
-rw-r--r--src/rabbit_mirror_queue_misc.erl43
-rw-r--r--src/rabbit_reader.erl59
-rw-r--r--src/rabbit_recovery_terms.erl4
-rw-r--r--src/rabbit_variable_queue.erl19
-rw-r--r--src/rabbit_vhost.erl21
-rw-r--r--src/rabbit_vhost_limit.erl7
-rw-r--r--src/rabbit_vhost_msg_store.erl6
-rw-r--r--src/rabbit_vhost_process.erl1
-rw-r--r--src/rabbit_vhost_sup_sup.erl97
-rw-r--r--src/rabbit_vhost_sup_wrapper.erl7
-rw-r--r--test/dynamic_ha_SUITE.erl106
-rw-r--r--test/rabbitmqctl_integration_SUITE.erl33
-rw-r--r--test/vhost_SUITE.erl168
26 files changed, 822 insertions, 220 deletions
diff --git a/.travis.sh b/.travis.sh
new file mode 100755
index 0000000000..f5f786b907
--- /dev/null
+++ b/.travis.sh
@@ -0,0 +1,258 @@
+#!/usr/bin/env bash
+
+set -o nounset
+set -o errexit
+
+declare -r tmp_file="$(mktemp)"
+declare -r script_arg="${1:-unset}"
+
+function onexit
+{
+ rm -vf "$tmp_file"
+}
+
+trap onexit EXIT
+
+function main
+{
+ # Note: if script_arg is kiex_cleanup,
+ # this function exits early
+ kiex_cleanup
+
+ # Note: if script_arg is tests,
+ # this function exits early
+ maybe_run_tests "$@"
+
+ ensure_directories
+ ensure_kerl
+ ensure_kiex
+ ensure_make
+ ensure_otp
+}
+
+function test_group_0
+{
+ make ct-backing_queue
+ make ct-channel_interceptor
+ make ct-channel_operation_timeout
+ make ct-cluster_formation_locking
+}
+
+function test_group_1
+{
+ make ct-clustering_management
+ make ct-cluster_rename
+ make ct-cluster
+ make ct-config_schema
+}
+
+function test_group_2
+{
+ make ct-crashing_queues
+ make ct-credential_validation
+ make ct-disconnect_detected_during_alarm
+ make ct-dynamic_ha
+}
+
+function test_group_3
+{
+ make ct-eager_sync
+ make ct-gm
+ make ct-health_check
+ make ct-lazy_queue
+}
+
+function test_group_4
+{
+ make ct-list_consumers_sanity_check
+ make ct-list_queues_online_and_offline
+ make ct-many_node_ha
+ make ct-metrics
+}
+
+function test_group_5
+{
+ make ct-mirrored_supervisor
+ make ct-msg_store
+ # TODO FUTURE HACK
+ # This suite fails frequently on Travis CI
+ # make ct-partitions
+ make ct-peer_discovery_dns
+}
+
+function test_group_6
+{
+ make ct-per_user_connection_tracking
+ make ct-per_vhost_connection_limit_partitions
+ make ct-per_vhost_connection_limit
+ make ct-per_vhost_msg_store
+}
+
+function test_group_7
+{
+ make ct-per_vhost_queue_limit
+ make ct-plugin_versioning
+ make ct-policy
+ make ct-priority_queue_recovery
+}
+
+function test_group_8
+{
+ make ct-priority_queue
+ make ct-proxy_protocol
+ make ct-queue_master_location
+ make ct-rabbit_core_metrics_gc
+}
+
+function test_group_9
+{
+ make ct-rabbitmqctl_integration
+ make ct-rabbitmqctl_shutdown
+ make ct-simple_ha
+ make ct-sup_delayed_restart
+}
+
+function test_group_10
+{
+ make ct-sync_detection
+ make ct-term_to_binary_compat_prop
+ make ct-topic_permission
+ make ct-unit_inbroker_non_parallel
+}
+
+function test_group_11
+{
+ make ct-unit_inbroker_parallel
+ make ct-unit
+ make ct-worker_pool
+}
+
+function maybe_run_tests
+{
+ if [[ $script_arg == 'tests' ]]
+ then
+ # Note: Travis env specifies test suite number
+ local -ri group="${2:-999}"
+
+ local -r test_func="test_group_$group"
+ "$test_func"
+
+ # Only doing tests, so early exit
+ exit 0
+ fi
+}
+
+function kiex_cleanup
+{
+ rm -vf "$HOME"/.kiex/bin/*.bak*
+ rm -vf "$HOME"/.kiex/elixirs/.*.old
+ rm -vf "$HOME"/.kiex/elixirs/*.old
+ rm -vf "$HOME"/.kiex/scripts/*.bak*
+
+ if [[ $script_arg == 'kiex_cleanup' ]]
+ then
+ # Only doing cleanup, so early exit
+ exit 0
+ fi
+}
+
+
+function ensure_directories
+{
+ set +o errexit
+ mkdir "$HOME/otp"
+ mkdir "$HOME/bin"
+ set -o errexit
+ export PATH="$HOME/bin:$PATH"
+}
+
+function ensure_kerl
+{
+ curl -Lo "$HOME/bin/kerl" https://raw.githubusercontent.com/kerl/kerl/master/kerl
+ chmod 755 "$HOME/bin/kerl"
+}
+
+function ensure_kiex
+{
+ curl -sSL https://raw.githubusercontent.com/taylor/kiex/master/install | /usr/bin/env bash -s
+ local -r kiex_script="$HOME/.kiex/scripts/kiex"
+ if [[ -s $kiex_script ]]
+ then
+ source "$kiex_script"
+ # Note: this produces a lot of output but without running
+ # "list known" first, kiex install ... sometimes fails
+ kiex list known
+ kiex_cleanup
+ else
+ echo "Did not find kiex at $kiex_script" 1>&2
+ exit 1
+ fi
+}
+
+function ensure_make
+{
+ # GNU Make build variables
+ local -r make_install_dir="$HOME/gmake"
+ local -r make_bin_dir="$make_install_dir/bin"
+
+ export PATH="$make_bin_dir:$PATH"
+
+ if [[ -x $make_bin_dir/make ]]
+ then
+ echo "Found GNU Make installation at $make_install_dir"
+ else
+ mkdir -p "$make_install_dir"
+ curl -sLO http://ftp.gnu.org/gnu/make/make-4.2.1.tar.gz
+ tar xf make-4.2.1.tar.gz
+ pushd make-4.2.1
+ ./configure --prefix="$make_install_dir"
+ make
+ make install
+ popd
+ fi
+}
+
+function build_ticker
+{
+ local status
+
+ status=$(< "$tmp_file")
+ while [[ $status == 'true' ]]
+ do
+ echo '------------------------------------------------------------------------------------------------------------------------------------------------'
+ echo "$(date) building $otp_tag_name ..."
+ if ls "$otp_build_log_dir"/otp_build*.log > /dev/null
+ then
+ tail "$otp_build_log_dir"/otp_build*.log
+ fi
+ sleep 10
+ status=$(< "$tmp_file")
+ done
+ echo '.'
+}
+
+function ensure_otp
+{
+ # OTP build variables
+ local -r otp_tag_name="$script_arg"
+ local -r otp_build_log_dir="$HOME/.kerl/builds/$otp_tag_name"
+ local -r otp_install_dir="$HOME/otp/$otp_tag_name"
+ if [[ -s $otp_install_dir/activate ]]
+ then
+ echo "Found OTP installation at $otp_install_dir"
+ else
+ export KERL_CONFIGURE_OPTIONS='--enable-hipe --enable-smp-support --enable-threads --enable-kernel-poll'
+ rm -rf "$otp_install_dir"
+ mkdir -p "$otp_install_dir"
+
+ echo -n 'true' > "$tmp_file"
+ build_ticker &
+ kerl build git https://github.com/erlang/otp.git "$otp_tag_name" "$otp_tag_name"
+ echo -n 'false' > "$tmp_file"
+ wait
+
+ kerl install "$otp_tag_name" "$otp_install_dir"
+ fi
+}
+
+main "$@"
diff --git a/.travis.yml b/.travis.yml
index 7a46f085b5..41f45de163 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,32 +1,34 @@
# vim:sw=2:et:
+sudo: false
-# Use a real VM so we can install all the packages we want.
-sudo: required
+language: generic
-language: erlang
-notifications:
- email:
- - alerts@rabbitmq.com
addons:
apt:
- sources:
- - sourceline: deb https://packages.erlang-solutions.com/ubuntu precise contrib
- key_url: https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc
packages:
- # Use Elixir from Erlang Solutions. The provided Elixir is
- # installed with kiex but is old. By using an prebuilt Debian
- # package, we save the compilation time.
- - elixir
- - xsltproc
+ - unixodbc
+ - unixodbc-dev
+ - libwxgtk2.8-dev
+
otp_release:
- - "19.2"
- - "19.3"
+ - "20.0"
+
services:
- docker
+
env:
- matrix:
- - GROUP=1
- - GROUP=2
+ - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=0
+ - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=1
+ - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=2
+ - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=3
+ - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=4
+ - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=5
+ - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=6
+ - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=7
+ - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=8
+ - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=9
+ - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=10
+ - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=11
before_script:
# The checkout made by Travis is a "detached HEAD" and branches
@@ -40,13 +42,26 @@ before_script:
git remote add upstream https://github.com/$TRAVIS_REPO_SLUG.git
git fetch upstream stable:stable || :
git fetch upstream master:master || :
- # Remove all kiex installations. This makes sure that the Erlang
- # Solutions one is picked: it's after the kiex installations in $PATH.
- - echo YES | kiex implode
+ # Install kerl; build gmake 4.2.1 and OTP
+ - $TRAVIS_BUILD_DIR/.travis.sh $OTP_TAG_NAME
+ - export PATH="$HOME/bin:$HOME/gmake/bin:$PATH"
+ - source "$HOME/otp/$OTP_TAG_NAME/activate"
+ - kerl active
+ - test -s "$HOME/.kiex/scripts/kiex" && source "$HOME/.kiex/scripts/kiex"
+ - test -x "$HOME/.kiex/elixirs/elixir-1.4.5/bin/elixir" || kiex install 1.4.5
+ - kiex use 1.4.5 --default
+ - mix local.hex --force
+ - make --version
script:
- - if test "${GROUP}" = '1'; then make tests; fi
- - if test "${GROUP}" = '2'; then sh ./scripts/travis_test_ocf_ra.sh; fi
+ - $TRAVIS_BUILD_DIR/.travis.sh tests $TEST_SUITE
+
+before_cache:
+ - $TRAVIS_BUILD_DIR/.travis.sh kiex_cleanup
cache:
- apt: true
+ directories:
+ - "$HOME/otp"
+ - "$HOME/.kiex"
+ - "$HOME/gmake"
+ - "$HOME/bin"
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index d74a3b173a..e422557cac 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -80,7 +80,7 @@ ESCRIPT_DIR="${RABBITMQ_HOME}/escript"
DEFAULT_SCHEDULER_BIND_TYPE="db"
[ "x" = "x$RABBITMQ_SCHEDULER_BIND_TYPE" ] && RABBITMQ_SCHEDULER_BIND_TYPE=${DEFAULT_SCHEDULER_BIND_TYPE}
-DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+DEFAULT_DISTRIBUTION_BUFFER_SIZE=128000
[ "x" = "x$RABBITMQ_DISTRIBUTION_BUFFER_SIZE" ] && RABBITMQ_DISTRIBUTION_BUFFER_SIZE=${DEFAULT_DISTRIBUTION_BUFFER_SIZE}
## Common defaults
@@ -237,6 +237,7 @@ rmq_normalize_path_var RABBITMQ_PLUGINS_DIR
[ "x" != "x$RABBITMQ_LOGS" ] && export RABBITMQ_LOGS_source=environment
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log"
[ "x" = "x$RABBITMQ_UPGRADE_LOG" ] && RABBITMQ_UPGRADE_LOG="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}_upgrade.log"
+[ "x" = "x$ERL_CRASH_DUMP" ] && ERL_CRASH_DUMP="${RABBITMQ_LOG_BASE}/erl_crash.dump"
rmq_normalize_path_var RABBITMQ_LOGS
diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat
index e3fddbb3eb..335e39c2fb 100644
--- a/scripts/rabbitmq-env.bat
+++ b/scripts/rabbitmq-env.bat
@@ -38,10 +38,10 @@ if "!RABBITMQ_SCHEDULER_BIND_TYPE!"=="" (
set RABBITMQ_SCHEDULER_BIND_TYPE=!DEFAULT_SCHEDULER_BIND_TYPE!
)
-REM DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+REM DEFAULT_DISTRIBUTION_BUFFER_SIZE=128000
REM set the VM distribution buffer size
REM [ "x" = "x$RABBITMQ_DISTRIBUTION_BUFFER_SIZE" ] && RABBITMQ_DISTRIBUTION_BUFFER_SIZE=${DEFAULT_DISTRIBUTION_BUFFER_SIZE}
-set DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+set DEFAULT_DISTRIBUTION_BUFFER_SIZE=128000
if "!RABBITMQ_DISTRIBUTION_BUFFER_SIZE!"=="" (
set RABBITMQ_DISTRIBUTION_BUFFER_SIZE=!DEFAULT_DISTRIBUTION_BUFFER_SIZE!
)
@@ -316,13 +316,22 @@ if "!RABBITMQ_LOGS!"=="" (
set RABBITMQ_LOGS=!LOGS!
)
)
-if not "!RABBITMQ_LOGS" == "-" (
+if not "!RABBITMQ_LOGS!" == "-" (
if not exist "!RABBITMQ_LOGS!" (
for /f "delims=" %%F in ("!RABBITMQ_LOGS!") do mkdir %%~dpF 2>NUL
copy /y NUL "!RABBITMQ_LOGS!" >NUL
)
for /f "delims=" %%F in ("!RABBITMQ_LOGS!") do set RABBITMQ_LOGS=%%~sF
)
+rem [ "x" = "x$RABBITMQ_UPGRADE_LOG" ] && RABBITMQ_UPGRADE_LOG="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}_upgrade.log"
+if "!RABBITMQ_UPGRADE_LOG!" == "" (
+ set RABBITMQ_UPGRADE_LOG=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!_upgrade.log
+)
+
+REM [ "x" = "x$ERL_CRASH_DUMP"] && ERL_CRASH_DUMP="${RABBITMQ_LOG_BASE}/erl_crash.dump"
+if "!ERL_CRASH_DUMP!"=="" (
+ set ERL_CRASH_DUMP=!RABBITMQ_LOG_BASE!\erl_crash.dump
+)
REM [ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
if "!$RABBITMQ_CTL_ERL_ARGS!"=="" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index d307e79138..4edec5b470 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -84,6 +84,7 @@ RABBITMQ_PRELAUNCH_NODENAME="rabbitmqprelaunch${$}@localhost"
# success of our startup sequence to systemd.
NOTIFY_SOCKET= \
RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \
+ERL_CRASH_DUMP=$ERL_CRASH_DUMP \
RABBITMQ_DIST_PORT=$RABBITMQ_DIST_PORT \
${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \
-boot "${CLEAN_BOOT_FILE}" \
@@ -203,6 +204,7 @@ start_rabbitmq_server() {
check_start_params &&
RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \
ERL_MAX_ETS_TABLES=$ERL_MAX_ETS_TABLES \
+ ERL_CRASH_DUMP=$ERL_CRASH_DUMP \
exec ${ERL_DIR}erl \
-pa ${RABBITMQ_SERVER_CODE_PATH} ${RABBITMQ_EBIN_ROOT} \
${RABBITMQ_START_RABBIT} \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index ea417dcad4..54e60a8847 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -130,9 +130,11 @@ REM particularly useful for Docker images.
if "!RABBITMQ_LOGS!" == "-" (
set SASL_ERROR_LOGGER=tty
set RABBIT_LAGER_HANDLER=tty
+ set RABBITMQ_LAGER_HANDLER_UPGRADE=tty
) else (
set SASL_ERROR_LOGGER=false
set RABBIT_LAGER_HANDLER=\""!RABBITMQ_LOGS:\=/!"\"
+ set RABBITMQ_LAGER_HANDLER_UPGRADE=\""!RABBITMQ_UPGRADE_LOG:\=/!"\"
)
set RABBITMQ_START_RABBIT=
@@ -178,6 +180,7 @@ if "!ENV_OK!"=="false" (
-sasl sasl_error_logger !SASL_ERROR_LOGGER! ^
-rabbit lager_log_root \""!RABBITMQ_LOG_BASE:\=/!"\" ^
-rabbit lager_handler !RABBIT_LAGER_HANDLER! ^
+-rabbit lager_handler_upgrade !RABBITMQ_LAGER_HANDLER_UPGRADE! ^
-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 2f118205ab..b80271bc9a 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -217,9 +217,13 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
)
if "!RABBITMQ_LOGS!" == "-" (
+ set SASL_ERROR_LOGGER=tty
set RABBIT_LAGER_HANDLER=tty
+ set RABBITMQ_LAGER_HANDLER_UPGRADE=tty
) else (
+ set SASL_ERROR_LOGGER=false
set RABBIT_LAGER_HANDLER=\""!RABBITMQ_LOGS:\=/!"\"
+ set RABBITMQ_LAGER_HANDLER_UPGRADE=\""!RABBITMQ_UPGRADE_LOG:\=/!"\"
)
set RABBITMQ_START_RABBIT=
@@ -255,6 +259,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-sasl sasl_error_logger false ^
-rabbit lager_log_root \""!RABBITMQ_LOG_BASE:\=/!"\" ^
-rabbit lager_handler !RABBIT_LAGER_HANDLER! ^
+-rabbit lager_handler_upgrade !RABBITMQ_LAGER_HANDLER_UPGRADE! ^
-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
diff --git a/src/rabbit.erl b/src/rabbit.erl
index fd2f980455..0a0eb6b71a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -168,12 +168,6 @@
{requires, recovery},
{enables, routing_ready}]}).
--rabbit_boot_step({mirrored_queues,
- [{description, "adding mirrors to queues"},
- {mfa, {rabbit_mirror_queue_misc, on_node_up, []}},
- {requires, recovery},
- {enables, routing_ready}]}).
-
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"},
{requires, core_initialized}]}).
@@ -803,6 +797,16 @@ start(normal, []) ->
warn_if_disc_io_options_dubious(),
rabbit_boot_steps:run_boot_steps(),
{ok, SupPid};
+ {error, {erlang_version_too_old,
+ {found, OTPRel, ERTSVer},
+ {required, ?OTP_MINIMUM, ?ERTS_MINIMUM}}} ->
+ Msg = "This RabbitMQ version cannot run on Erlang ~s (erts ~s): "
+ "minimum required version is ~s (erts ~s)",
+ Args = [OTPRel, ERTSVer, ?OTP_MINIMUM, ?ERTS_MINIMUM],
+ rabbit_log:error(Msg, Args),
+ %% also print to stderr to make this more visible
+ io:format(standard_error, "Error: " ++ Msg ++ "~n", Args),
+ {error, {erlang_version_too_old, rabbit_misc:format("Erlang ~s or later is required, started on ~s", [?OTP_MINIMUM, OTPRel])}};
Error ->
Error
end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f76566ffb9..2d85a9f04b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -41,6 +41,7 @@
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([pid_of/1, pid_of/2]).
+-export([mark_local_durable_queues_stopped/1]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -256,6 +257,15 @@ start(Qs) ->
[Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
ok.
+mark_local_durable_queues_stopped(VHost) ->
+ Qs = find_durable_queues(VHost),
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ [ store_queue(Q#amqqueue{ state = stopped })
+ || Q = #amqqueue{ state = State } <- Qs,
+ State =/= stopped ]
+ end).
+
find_durable_queues(VHost) ->
Node = node(),
mnesia:async_dirty(
@@ -330,11 +340,17 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
{ok, Node0} -> Node0;
{error, _} -> Node
end,
-
Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1),
- gen_server2:call(
- rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare),
- {init, new}, infinity).
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node1) of
+ {ok, _} ->
+ gen_server2:call(
+ rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare),
+ {init, new}, infinity);
+ {error, Error} ->
+ rabbit_misc:protocol_error(internal_error,
+ "Cannot declare a queue '~s' on node '~s': ~255p",
+ [rabbit_misc:rs(QueueName), Node1, Error])
+ end.
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -447,13 +463,28 @@ with(Name, F, E) ->
with(Name, F, E, RetriesLeft) ->
case lookup(Name) of
- {ok, Q = #amqqueue{}} when RetriesLeft =:= 0 ->
+ {ok, Q = #amqqueue{state = live}} when RetriesLeft =:= 0 ->
%% Something bad happened to that queue, we are bailing out
%% on processing current request.
E({absent, Q, timeout});
+ {ok, Q = #amqqueue{state = stopped}} when RetriesLeft =:= 0 ->
+ %% The queue was stopped and not migrated
+ E({absent, Q, stopped});
+ %% The queue process has crashed with unknown error
{ok, Q = #amqqueue{state = crashed}} ->
E({absent, Q, crashed});
- {ok, Q = #amqqueue{pid = QPid}} ->
+ %% The queue process has been stopped by a supervisor.
+ %% In that case a synchronised slave can take over
+ %% so we should retry.
+ {ok, Q = #amqqueue{state = stopped}} ->
+ %% The queue process was stopped by the supervisor
+ rabbit_misc:with_exit_handler(
+ fun () -> retry_wait(Q, F, E, RetriesLeft) end,
+ fun () -> F(Q) end);
+ %% The queue is supposed to be active.
+ %% The master node can go away or queue can be killed
+ %% so we retry, waiting for a slave to take over.
+ {ok, Q = #amqqueue{state = live}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
%% with the QPid. F() should be written s.t. that this
@@ -461,14 +492,24 @@ with(Name, F, E, RetriesLeft) ->
%% indicates a code bug and we don't want to get stuck in
%% the retry loop.
rabbit_misc:with_exit_handler(
- fun () -> false = rabbit_mnesia:is_process_alive(QPid),
- timer:sleep(30),
- with(Name, F, E, RetriesLeft - 1)
- end, fun () -> F(Q) end);
+ fun () -> retry_wait(Q, F, E, RetriesLeft) end,
+ fun () -> F(Q) end);
{error, not_found} ->
E(not_found_or_absent_dirty(Name))
end.
+retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, RetriesLeft) ->
+ case {QState, is_mirrored(Q)} of
+ %% We don't want to repeat an operation if
+ %% there are no slaves to migrate to
+ {stopped, false} ->
+ E({absent, Q, stopped});
+ _ ->
+ false = rabbit_mnesia:is_process_alive(QPid),
+ timer:sleep(30),
+ with(Name, F, E, RetriesLeft - 1)
+ end.
+
with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
with_or_die(Name, F) ->
@@ -655,10 +696,13 @@ is_unresponsive(#amqqueue{ pid = QPid }, Timeout) ->
end.
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
+info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped);
info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
info(Q = #amqqueue{ state = crashed }, Items) ->
info_down(Q, Items, crashed);
+info(Q = #amqqueue{ state = stopped }, Items) ->
+ info_down(Q, Items, stopped);
info(#amqqueue{ pid = QPid }, Items) ->
case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of
{ok, Res} -> Res;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4e43104de2..678f1136c3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -265,9 +265,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
notify_decorators(startup, State3),
State3.
-terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) ->
rabbit_core_metrics:queue_deleted(qname(State)),
- terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+ terminate_shutdown(
+ fun (BQS) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ Q2 = Q#amqqueue{state = stopped},
+ rabbit_amqqueue:store_queue(Q2)
+ end),
+ BQ:terminate(R, BQS)
+ end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
%% if the owner was missing then there will be no queue, so don't emit stats
terminate_shutdown(terminate_delete(false, Reason, State), State);
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index b5ef86255d..f0bcbd7c60 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -57,7 +57,7 @@ find_for_vhost(VHost) ->
-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}.
find_for_vhost(VHost, Node) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
+ {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node),
case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
[QSup] -> {ok, QSup};
Result -> {error, {queue_supervisor_not_found, Result}}
@@ -65,7 +65,7 @@ find_for_vhost(VHost, Node) ->
-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_for_vhost(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
supervisor2:start_child(
VHostSup,
@@ -82,7 +82,7 @@ start_for_vhost(VHost) ->
-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
stop_for_vhost(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 00a6607dfb..c69a27d57c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2142,6 +2142,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
fun (not_found) -> {ok, 0};
({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username),
{ok, 0};
+ ({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username),
+ {ok, 0};
({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end) of
{error, in_use} ->
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index 3ae17677e0..ca13700da0 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -82,11 +82,14 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
close_connections(rabbit_connection_tracking:list(VHost),
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
{ok, State};
+%% Note: under normal circumstances this will be called immediately
+%% after the vhost_deleted above. Therefore we should be careful about
+%% what we log and be more defensive.
handle_event(#event{type = vhost_down, props = Details}, State) ->
VHost = pget(name, Details),
Node = pget(node, Details),
- rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
- " because the vhost database has stopped working",
+ rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'"
+ " because the vhost is stopping",
[VHost, Node]),
close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
rabbit_misc:format("vhost '~s' is down", [VHost])),
@@ -131,7 +134,17 @@ close_connections(Tracked, Message, Delay) ->
ok.
close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
- rabbit_networking:close_connection(Pid, Message);
+ try
+ rabbit_networking:close_connection(Pid, Message)
+ catch error:{not_a_connection, _} ->
+ %% could has been closed concurrently, or the input
+ %% is bogus. In any case, we should not terminate
+ ok;
+ _:Err ->
+ %% ignore, don't terminate
+ rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]),
+ ok
+ end;
close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
%% Do an RPC call to the node running the direct client.
Node = node(Pid),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 59522da4a9..a6571defcb 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_misc).
-behaviour(rabbit_policy_validator).
--export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
+-export([remove_from_queue/3, on_vhost_up/1, add_mirrors/3,
report_deaths/4, store_updated_slaves/1,
initial_queue_node/2, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1,
@@ -53,7 +53,6 @@
-spec remove_from_queue
(rabbit_amqqueue:name(), pid(), [pid()]) ->
{'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}.
--spec on_node_up() -> 'ok'.
-spec add_mirrors(rabbit_amqqueue:name(), [node()], 'sync' | 'async') ->
'ok'.
-spec store_updated_slaves(rabbit_types:amqqueue()) ->
@@ -167,12 +166,16 @@ slaves_to_start_on_failure(Q, DeadGMPids) ->
{_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes),
NewNodes -- OldNodes.
-on_node_up() ->
+on_vhost_up(VHost) ->
QNames =
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:foldl(
- fun (Q = #amqqueue{name = QName,
+ fun
+ (#amqqueue{name = #resource{virtual_host = OtherVhost}},
+ QNames0) when OtherVhost =/= VHost ->
+ QNames0;
+ (Q = #amqqueue{name = QName,
pid = Pid,
slave_pids = SPids}, QNames0) ->
%% We don't want to pass in the whole
@@ -228,11 +231,21 @@ add_mirror(QName, MirrorNode, SyncMode) ->
rabbit_misc:with_exit_handler(
rabbit_misc:const(ok),
fun () ->
- SPid = rabbit_amqqueue_sup_sup:start_queue_process(
- MirrorNode, Q, slave),
- log_info(QName, "Adding mirror on node ~p: ~p~n",
- [MirrorNode, SPid]),
- rabbit_mirror_queue_slave:go(SPid, SyncMode)
+ #amqqueue{name = #resource{virtual_host = VHost}} = Q,
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of
+ {ok, _} ->
+ SPid = rabbit_amqqueue_sup_sup:start_queue_process(
+ MirrorNode, Q, slave),
+ log_info(QName, "Adding mirror on node ~p: ~p~n",
+ [MirrorNode, SPid]),
+ rabbit_mirror_queue_slave:go(SPid, SyncMode);
+ {error, Error} ->
+ log_warning(QName,
+ "Unable to start queue mirror on node '~p'. "
+ "Target virtual host is not running: ~p~n",
+ [MirrorNode, Error]),
+ ok
+ end
end);
{error, not_found} = E ->
E
@@ -249,12 +262,12 @@ report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
rabbit_misc:pid_to_string(MirrorPid),
[[$ , rabbit_misc:pid_to_string(P)] || P <- DeadPids]]).
-log_info (QName, Fmt, Args) -> log(info, QName, Fmt, Args).
-log_warning(QName, Fmt, Args) -> log(warning, QName, Fmt, Args).
-
-log(Level, QName, Fmt, Args) ->
- rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt,
- [rabbit_misc:rs(QName) | Args]).
+log_info (QName, Fmt, Args) ->
+ rabbit_log_mirroring:info("Mirrored ~s: " ++ Fmt,
+ [rabbit_misc:rs(QName) | Args]).
+log_warning(QName, Fmt, Args) ->
+ rabbit_log_mirroring:warning("Mirrored ~s: " ++ Fmt,
+ [rabbit_misc:rs(QName) | Args]).
store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
sync_slave_pids = SSPids,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 77914a00bf..6e2ed2a889 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -279,18 +279,20 @@ socket_error(Reason) when is_atom(Reason) ->
rabbit_log_connection:error("Error on AMQP connection ~p: ~s~n",
[self(), rabbit_misc:format_inet_error(Reason)]);
socket_error(Reason) ->
- Level =
- case Reason of
- {ssl_upgrade_error, closed} ->
- %% The socket was closed while upgrading to SSL.
- %% This is presumably a TCP healthcheck, so don't log
- %% it unless specified otherwise.
- debug;
- _ ->
- error
- end,
- rabbit_log:log(rabbit_log_connection, Level,
- "Error on AMQP connection ~p:~n~p~n", [self(), Reason]).
+ Fmt = "Error on AMQP connection ~p:~n~p~n",
+ Args = [self(), Reason],
+ case Reason of
+ %% The socket was closed while upgrading to SSL.
+ %% This is presumably a TCP healthcheck, so don't log
+ %% it unless specified otherwise.
+ {ssl_upgrade_error, closed} ->
+ %% Lager sinks (rabbit_log_connection)
+ %% are handled by the lager parse_transform.
+ %% Hence have to define the loglevel as a function call.
+ rabbit_log_connection:debug(Fmt, Args);
+ _ ->
+ rabbit_log_connection:error(Fmt, Args)
+ end.
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -402,31 +404,41 @@ log_connection_exception(Name, Ex) ->
log_connection_exception(Severity, Name, {heartbeat_timeout, TimeoutSec}) ->
%% Long line to avoid extra spaces and line breaks in log
- rabbit_log:log(rabbit_log_connection, Severity,
+ log_connection_exception_with_severity(Severity,
"closing AMQP connection ~p (~s):~n"
"missed heartbeats from client, timeout: ~ps~n",
[self(), Name, TimeoutSec]);
log_connection_exception(Severity, Name, {connection_closed_abruptly,
#v1{connection = #connection{user = #user{username = Username},
vhost = VHost}}}) ->
- rabbit_log:log(rabbit_log_connection, Severity, "closing AMQP connection ~p (~s, vhost: '~s', user: '~s'):~nclient unexpectedly closed TCP connection~n",
+ log_connection_exception_with_severity(Severity,
+ "closing AMQP connection ~p (~s, vhost: '~s', user: '~s'):~nclient unexpectedly closed TCP connection~n",
[self(), Name, VHost, Username]);
%% when client abruptly closes connection before connection.open/authentication/authorization
%% succeeded, don't log username and vhost as 'none'
log_connection_exception(Severity, Name, {connection_closed_abruptly, _}) ->
- rabbit_log:log(rabbit_log_connection, Severity, "closing AMQP connection ~p (~s):~nclient unexpectedly closed TCP connection~n",
+ log_connection_exception_with_severity(Severity,
+ "closing AMQP connection ~p (~s):~nclient unexpectedly closed TCP connection~n",
[self(), Name]);
%% old exception structure
log_connection_exception(Severity, Name, connection_closed_abruptly) ->
- rabbit_log:log(rabbit_log_connection, Severity,
+ log_connection_exception_with_severity(Severity,
"closing AMQP connection ~p (~s):~n"
"client unexpectedly closed TCP connection~n",
[self(), Name]);
log_connection_exception(Severity, Name, Ex) ->
- rabbit_log:log(rabbit_log_connection, Severity,
+ log_connection_exception_with_severity(Severity,
"closing AMQP connection ~p (~s):~n~p~n",
[self(), Name, Ex]).
+log_connection_exception_with_severity(Severity, Fmt, Args) ->
+ case Severity of
+ debug -> rabbit_log_connection:debug(Fmt, Args);
+ info -> rabbit_log_connection:info(Fmt, Args);
+ warning -> rabbit_log_connection:warning(Fmt, Args);
+ error -> rabbit_log_connection:warning(Fmt, Args)
+ end.
+
run({M, F, A}) ->
try apply(M, F, A)
catch {become, MFA} -> run(MFA)
@@ -475,13 +487,12 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
%%
%% The goal is to not log TCP healthchecks (a connection
%% with no data received) unless specified otherwise.
- Level = case Recv of
- closed -> debug;
- _ -> info
- end,
- rabbit_log:log(rabbit_log_connection, Level,
- "accepting AMQP connection ~p (~s)~n",
- [self(), ConnName]);
+ Fmt = "accepting AMQP connection ~p (~s)~n",
+ Args = [self(), ConnName],
+ case Recv of
+ closed -> rabbit_log_connection:debug(Fmt, Args);
+ _ -> rabbit_log_connection:info(Fmt, Args)
+ end;
_ ->
ok
end,
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index 73fc9c7449..b73f3add7c 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -48,7 +48,7 @@
%%----------------------------------------------------------------------------
start(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
{ok, _} = supervisor2:start_child(
VHostSup,
@@ -65,7 +65,7 @@ start(VHost) ->
ok.
stop(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
case supervisor:terminate_child(VHostSup, ?MODULE) of
ok -> supervisor:delete_child(VHostSup, ?MODULE);
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b4945fe3d3..4d61bb4b03 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -2766,9 +2766,15 @@ transform_store(Store, TransformFun) ->
move_messages_to_vhost_store() ->
case list_persistent_queues() of
- % [] -> ok;
- Queues -> move_messages_to_vhost_store(Queues)
- end.
+ [] ->
+ log_upgrade("No durable queues found."
+ " Skipping message store migration"),
+ ok;
+ Queues ->
+ move_messages_to_vhost_store(Queues)
+ end,
+ ok = delete_old_store(),
+ ok = rabbit_queue_index:cleanup_global_recovery_terms().
move_messages_to_vhost_store(Queues) ->
log_upgrade("Moving messages to per-vhost message store"),
@@ -2802,8 +2808,7 @@ move_messages_to_vhost_store(Queues) ->
"message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"),
log_upgrade("Message store migration finished"),
- ok = delete_old_store(OldStore),
- ok = rabbit_queue_index:cleanup_global_recovery_terms(),
+ ok = rabbit_sup:stop_child(OldStore),
[ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts],
ok = stop_new_store(NewMsgStore).
@@ -2936,8 +2941,8 @@ stop_new_store(NewStore) ->
NewStore),
ok.
-delete_old_store(OldStore) ->
- ok = rabbit_sup:stop_child(OldStore),
+delete_old_store() ->
+ log_upgrade("Removing the old message store data"),
rabbit_file:recursive_delete(
[filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]),
%% Delete old transient store as well
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 30557fc7be..73c05389be 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -71,11 +71,13 @@ recover(VHost) ->
ok = rabbit_binding:recover(rabbit_exchange:recover(VHost),
[QName || #amqqueue{name = QName} <- Qs]),
ok = rabbit_amqqueue:start(Qs),
+ %% Start queue mirrors.
+ ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
ok.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, tracing, state]).
+-define(INFO_KEYS, [name, tracing, cluster_state]).
add(VHostPath, ActingUser) ->
rabbit_log:info("Adding vhost '~s'~n", [VHostPath]),
@@ -261,10 +263,19 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, VHost) -> VHost;
i(tracing, VHost) -> rabbit_trace:enabled(VHost);
-i(state, VHost) -> case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
- true -> running;
- false -> down
- end;
+i(cluster_state, VHost) ->
+ Nodes = rabbit_nodes:all_running(),
+ lists:map(fun(Node) ->
+ State = case rabbit_misc:rpc_call(Node,
+ rabbit_vhost_sup_sup, is_vhost_alive,
+ [VHost]) of
+ {badrpc, nodedown} -> nodedown;
+ true -> running;
+ false -> stopped
+ end,
+ {Node, State}
+ end,
+ Nodes);
i(Item, _) -> throw({bad_argument, Item}).
info(VHost) -> infos(?INFO_KEYS, VHost).
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
index 7b797e46b2..9d8a6795b4 100644
--- a/src/rabbit_vhost_limit.erl
+++ b/src/rabbit_vhost_limit.erl
@@ -55,7 +55,12 @@ notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits, ActingUser) ->
notify_clear(VHost, <<"vhost-limits">>, <<"limits">>, ActingUser) ->
rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>},
{user_who_performed_action, ActingUser}]),
- update_vhost(VHost, undefined).
+ %% If the function is called as a part of vhost deletion, the vhost can
+ %% be already deleted.
+ case rabbit_vhost:exists(VHost) of
+ true -> update_vhost(VHost, undefined);
+ false -> ok
+ end.
connection_limit(VirtualHost) ->
get_limit(VirtualHost, <<"max-connections">>).
diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl
index 3c633875bc..b9af37c258 100644
--- a/src/rabbit_vhost_msg_store.erl
+++ b/src/rabbit_vhost_msg_store.erl
@@ -23,7 +23,7 @@
start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
ClientRefs == undefined ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
supervisor2:start_child(VHostSup,
@@ -39,7 +39,7 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
end.
stop(VHost, Type) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
ok = supervisor2:terminate_child(VHostSup, Type),
ok = supervisor2:delete_child(VHostSup, Type);
@@ -65,7 +65,7 @@ with_vhost_store(VHost, Type, Fun) ->
end.
vhost_store_pid(VHost, Type) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost),
case supervisor2:find_child(VHostSup, Type) of
[Pid] -> Pid;
[] -> no_pid
diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl
index e3c815a727..f6e4a83daa 100644
--- a/src/rabbit_vhost_process.erl
+++ b/src/rabbit_vhost_process.erl
@@ -55,6 +55,7 @@ init([VHost]) ->
timer:send_interval(Interval, check_vhost),
{ok, VHost}
catch _:Reason ->
+ rabbit_amqqueue:mark_local_durable_queues_stopped(VHost),
rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
" Stacktrace ~p",
[VHost, Reason, erlang:get_stacktrace()]),
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index 1d5db93fda..19d7cf61b7 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -23,15 +23,16 @@
-export([init/1]).
-export([start_link/0, start/0]).
--export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
--export([delete_on_all_nodes/1]).
--export([start_on_all_nodes/1]).
-
--export([save_vhost_process/2]).
+-export([init_vhost/1,
+ start_vhost/1, start_vhost/2,
+ get_vhost_sup/1, get_vhost_sup/2,
+ save_vhost_sup/3,
+ save_vhost_process/2]).
+-export([delete_on_all_nodes/1, start_on_all_nodes/1]).
-export([is_vhost_alive/1]).
%% Internal
--export([stop_and_delete_vhost/1, start_vhost/1]).
+-export([stop_and_delete_vhost/1]).
-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}).
@@ -61,7 +62,12 @@ init([]) ->
start_on_all_nodes(VHost) ->
NodesStart = [ {Node, start_vhost(VHost, Node)}
|| Node <- rabbit_nodes:all_running() ],
- Failures = lists:filter(fun({_, {ok, _}}) -> false; (_) -> true end, NodesStart),
+ Failures = lists:filter(fun
+ ({_, {ok, _}}) -> false;
+ ({_, {error, {already_started, _}}}) -> false;
+ (_) -> true
+ end,
+ NodesStart),
case Failures of
[] -> ok;
Errors -> {error, {failed_to_start_vhost_on_nodes, Errors}}
@@ -72,7 +78,7 @@ delete_on_all_nodes(VHost) ->
ok.
stop_and_delete_vhost(VHost) ->
- case get_vhost_sup(VHost) of
+ StopResult = case lookup_vhost_sup_record(VHost) of
not_found -> ok;
#vhost_sup{wrapper_pid = WrapperPid,
vhost_sup_pid = VHostSupPid} ->
@@ -84,13 +90,15 @@ stop_and_delete_vhost(VHost) ->
[VHostSupPid, VHost]),
case supervisor2:terminate_child(?MODULE, WrapperPid) of
ok ->
- ets:delete(?MODULE, VHost),
- ok = rabbit_vhost:delete_storage(VHost);
+ true = ets:delete(?MODULE, VHost),
+ ok;
Other ->
Other
end
end
- end.
+ end,
+ ok = rabbit_vhost:delete_storage(VHost),
+ StopResult.
%% We take an optimistic approach whan stopping a remote VHost supervisor.
stop_and_delete_vhost(VHost, Node) when Node == node(self()) ->
@@ -106,10 +114,15 @@ stop_and_delete_vhost(VHost, Node) ->
{error, RpcErr}
end.
--spec init_vhost(rabbit_types:vhost()) -> ok.
+-spec init_vhost(rabbit_types:vhost()) -> ok | {error, {no_such_vhost, rabbit_types:vhsot()}}.
init_vhost(VHost) ->
case start_vhost(VHost) of
{ok, _} -> ok;
+ {error, {already_started, _}} ->
+ rabbit_log:warning(
+ "Attempting to start an already started vhost '~s'.",
+ [VHost]),
+ ok;
{error, {no_such_vhost, VHost}} ->
{error, {no_such_vhost, VHost}};
{error, Reason} ->
@@ -130,58 +143,54 @@ init_vhost(VHost) ->
end
end.
--spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}.
-vhost_sup(VHost, Node) ->
- case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of
+-type vhost_error() :: {no_such_vhost, rabbit_types:vhost()} |
+ {vhost_supervisor_not_running, rabbit_types:vhost()}.
+
+-spec get_vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, vhost_error() | term()}.
+get_vhost_sup(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, get_vhost_sup, [VHost]) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
+ {error, Err} ->
+ {error, Err};
{badrpc, RpcErr} ->
{error, RpcErr}
end.
--spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}.
-vhost_sup(VHost) ->
- case vhost_sup_pid(VHost) of
- no_pid ->
- case start_vhost(VHost) of
- {ok, Pid} ->
- true = is_vhost_alive(VHost),
- {ok, Pid};
- {error, {no_such_vhost, VHost}} ->
- {error, {no_such_vhost, VHost}};
- Error ->
- throw(Error)
- end;
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid}
+-spec get_vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, vhost_error()}.
+get_vhost_sup(VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ false ->
+ {error, {no_such_vhost, VHost}};
+ true ->
+ case vhost_sup_pid(VHost) of
+ no_pid ->
+ {error, {vhost_supervisor_not_running, VHost}};
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid}
+ end
end.
-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}.
start_vhost(VHost, Node) ->
case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid};
- {badrpc, RpcErr} ->
- {error, RpcErr}
+ {ok, Pid} -> {ok, Pid};
+ {error, Err} -> {error, Err};
+ {badrpc, RpcErr} -> {error, RpcErr}
end.
-spec start_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_vhost(VHost) ->
case rabbit_vhost:exists(VHost) of
false -> {error, {no_such_vhost, VHost}};
- true ->
- case supervisor2:start_child(?MODULE, [VHost]) of
- {ok, Pid} -> {ok, Pid};
- {error, {already_started, Pid}} -> {ok, Pid};
- {error, Err} -> {error, Err}
- end
+ true -> supervisor2:start_child(?MODULE, [VHost])
end.
-spec is_vhost_alive(rabbit_types:vhost()) -> boolean().
is_vhost_alive(VHost) ->
%% A vhost is considered alive if it's supervision tree is alive and
%% saved in the ETS table
- case get_vhost_sup(VHost) of
+ case lookup_vhost_sup_record(VHost) of
#vhost_sup{wrapper_pid = WrapperPid,
vhost_sup_pid = VHostSupPid,
vhost_process_pid = VHostProcessPid}
@@ -210,8 +219,8 @@ save_vhost_process(VHost, VHostProcessPid) ->
{#vhost_sup.vhost_process_pid, VHostProcessPid}),
ok.
--spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
-get_vhost_sup(VHost) ->
+-spec lookup_vhost_sup_record(rabbit_types:vhost()) -> #vhost_sup{} | not_found.
+lookup_vhost_sup_record(VHost) ->
case ets:lookup(?MODULE, VHost) of
[] -> not_found;
[#vhost_sup{} = VHostSup] -> VHostSup
@@ -219,7 +228,7 @@ get_vhost_sup(VHost) ->
-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}.
vhost_sup_pid(VHost) ->
- case get_vhost_sup(VHost) of
+ case lookup_vhost_sup_record(VHost) of
not_found ->
no_pid;
#vhost_sup{vhost_sup_pid = Pid} = VHostSup ->
diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl
index 8e23389bb9..4ae68cdd75 100644
--- a/src/rabbit_vhost_sup_wrapper.erl
+++ b/src/rabbit_vhost_sup_wrapper.erl
@@ -29,7 +29,12 @@
start_link(VHost) ->
%% Using supervisor, because supervisor2 does not stop a started child when
%% another one fails to start. Bug?
- supervisor:start_link(?MODULE, [VHost]).
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
+ {ok, Pid} ->
+ {error, {already_started, Pid}};
+ {error, _} ->
+ supervisor:start_link(?MODULE, [VHost])
+ end.
init([VHost]) ->
%% 2 restarts in 5 minutes. One per message store.
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index b2f212fe75..c70f23c066 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -57,13 +57,19 @@ groups() ->
{clustered, [], [
{cluster_size_2, [], [
vhost_deletion,
- promote_on_shutdown
+ promote_on_shutdown,
+ slave_recovers_after_vhost_failure,
+ slave_recovers_after_vhost_down_an_up,
+ master_migrates_on_vhost_down,
+ slave_recovers_after_vhost_down_and_master_migrated,
+ queue_survive_adding_dead_vhost_mirror
]},
{cluster_size_3, [], [
change_policy,
rapid_change,
nodes_policy_should_pick_master_from_its_params,
- promote_slave_after_standalone_restart
+ promote_slave_after_standalone_restart,
+ queue_survive_adding_dead_vhost_mirror
% FIXME: Re-enable those tests when the know issues are
% fixed.
% failing_random_policies,
@@ -218,6 +224,19 @@ rapid_loop(Config, Node, MRef) ->
rapid_loop(Config, Node, MRef)
end.
+queue_survive_adding_dead_vhost_mirror(Config) ->
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, 1, <<"/">>),
+ NodeA = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ ChA = rabbit_ct_client_helpers:open_channel(Config, NodeA),
+ QName = <<"queue_survive_adding_dead_vhost_mirror-q-1">>,
+ amqp_channel:call(ChA, #'queue.declare'{queue = QName}),
+ Q = find_queue(QName, NodeA),
+ Pid = proplists:get_value(pid, Q),
+ rabbit_ct_broker_helpers:set_ha_policy_all(Config),
+ %% Queue should not fail
+ Q1 = find_queue(QName, NodeA),
+ Pid = proplists:get_value(pid, Q1).
+
%% Vhost deletion needs to successfully tear down policies and queues
%% with policies. At least smoke-test that it doesn't blow up.
vhost_deletion(Config) ->
@@ -303,6 +322,72 @@ nodes_policy_should_pick_master_from_its_params(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}),
_ = rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY).
+slave_recovers_after_vhost_failure(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy_all(Config),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ QName = <<"slave_recovers_after_vhost_failure-q">>,
+ amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
+ timer:sleep(300),
+ assert_slaves(A, QName, {A, [B]}, [{A, []}]),
+
+ %% Crash vhost on a node hosting a mirror
+ {ok, Sup} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, get_vhost_sup, [<<"/">>]),
+ exit(Sup, foo),
+
+ assert_slaves(A, QName, {A, [B]}, [{A, []}]).
+
+slave_recovers_after_vhost_down_an_up(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy_all(Config),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ QName = <<"slave_recovers_after_vhost_down_an_up-q">>,
+ amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
+ timer:sleep(100),
+ assert_slaves(A, QName, {A, [B]}, [{A, []}]),
+
+ %% Crash vhost on a node hosting a mirror
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, B, <<"/">>),
+ %% Vhost is down now
+ false = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, is_vhost_alive, [<<"/">>]),
+ timer:sleep(300),
+ %% Vhost is back up
+ {ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, B, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]),
+
+ assert_slaves(A, QName, {A, [B]}, [{A, []}]).
+
+master_migrates_on_vhost_down(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy_all(Config),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ QName = <<"master_migrates_on_vhost_down-q">>,
+ amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
+ timer:sleep(100),
+ assert_slaves(A, QName, {A, [B]}, [{A, []}]),
+
+ %% Crash vhost on the node hosting queue master
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>),
+ timer:sleep(300),
+ assert_slaves(A, QName, {B, []}).
+
+slave_recovers_after_vhost_down_and_master_migrated(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy_all(Config),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ QName = <<"slave_recovers_after_vhost_down_and_master_migrated-q">>,
+ amqp_channel:call(ACh, #'queue.declare'{queue = QName}),
+ timer:sleep(100),
+ assert_slaves(A, QName, {A, [B]}, [{A, []}]),
+ %% Crash vhost on the node hosting queue master
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, A, <<"/">>),
+ timer:sleep(300),
+ assert_slaves(B, QName, {B, []}),
+
+ %% Restart the vhost on the node (previously) hosting queue master
+ {ok, _Sup} = rabbit_ct_broker_helpers:rpc(Config, A, rabbit_vhost_sup_sup, start_vhost, [<<"/">>]),
+ timer:sleep(300),
+ assert_slaves(B, QName, {B, [A]}, [{B, []}]).
+
random_policy(Config) ->
run_proper(fun prop_random_policy/1, [Config]).
@@ -341,7 +426,7 @@ promote_slave_after_standalone_restart(Config) ->
rabbit_ct_broker_helpers:stop_node(Config, B),
rabbit_ct_broker_helpers:stop_node(Config, A),
- %% Restart one slave
+ %% Restart one mirror
forget_cluster_node(Config, B, C),
forget_cluster_node(Config, B, A),
@@ -367,9 +452,11 @@ assert_slaves(RPCNode, QName, Exp) ->
assert_slaves(RPCNode, QName, Exp, PermittedIntermediate) ->
assert_slaves0(RPCNode, QName, Exp,
[{get(previous_exp_m_node), get(previous_exp_s_nodes)} |
- PermittedIntermediate]).
+ PermittedIntermediate], 1000).
-assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate) ->
+assert_slaves0(_, _, _, _, 0) ->
+ error(give_up_waiting_for_slaves);
+assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Attempts) ->
Q = find_queue(QName, RPCNode),
Pid = proplists:get_value(pid, Q),
SPids = proplists:get_value(slave_pids, Q),
@@ -395,7 +482,8 @@ assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate) ->
[State, {ExpMNode, ExpSNodes}]),
timer:sleep(100),
assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes},
- PermittedIntermediate)
+ PermittedIntermediate,
+ Attempts - 1)
end;
true ->
put(previous_exp_m_node, ExpMNode),
@@ -415,10 +503,14 @@ equal_list([H|T], Act) -> case lists:member(H, Act) of
end.
find_queue(QName, RPCNode) ->
+ find_queue(QName, RPCNode, 1000).
+
+find_queue(QName, RPCNode, 0) -> error({did_not_find_queue, QName, RPCNode});
+find_queue(QName, RPCNode, Attempts) ->
Qs = rpc:call(RPCNode, rabbit_amqqueue, info_all, [?VHOST], infinity),
case find_queue0(QName, Qs) of
did_not_find_queue -> timer:sleep(100),
- find_queue(QName, RPCNode);
+ find_queue(QName, RPCNode, Attempts - 1);
Q -> Q
end.
diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl
index 535725d585..71b74ea104 100644
--- a/test/rabbitmqctl_integration_SUITE.erl
+++ b/test/rabbitmqctl_integration_SUITE.erl
@@ -31,6 +31,7 @@
-export([list_queues_local/1
,list_queues_offline/1
,list_queues_online/1
+ ,list_queues_stopped/1
]).
all() ->
@@ -44,6 +45,7 @@ groups() ->
[list_queues_local
,list_queues_online
,list_queues_offline
+ ,list_queues_stopped
]}
].
@@ -96,13 +98,19 @@ end_per_group(list_queues, Config0) ->
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps());
-end_per_group(global_parameters, Config) ->
- rabbit_ct_helpers:run_teardown_steps(Config,
- rabbit_ct_client_helpers:teardown_steps() ++
- rabbit_ct_broker_helpers:teardown_steps());
end_per_group(_, Config) ->
Config.
+init_per_testcase(list_queues_stopped, Config0) ->
+ %% Start node 3 to crash it's queues
+ rabbit_ct_broker_helpers:start_node(Config0, 2),
+ %% Make vhost "down" on nodes 2 and 3
+ rabbit_ct_broker_helpers:force_vhost_failure(Config0, 1, <<"/">>),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config0, 2, <<"/">>),
+
+ rabbit_ct_broker_helpers:stop_node(Config0, 2),
+ rabbit_ct_helpers:testcase_started(Config0, list_queues_stopped);
+
init_per_testcase(Testcase, Config0) ->
rabbit_ct_helpers:testcase_started(Config0, Testcase).
@@ -134,6 +142,23 @@ list_queues_offline(Config) ->
assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues),
ok.
+list_queues_stopped(Config) ->
+ Node1Queues = lists:sort(lists:nth(1, ?config(per_node_queues, Config))),
+ Node2Queues = lists:sort(lists:nth(2, ?config(per_node_queues, Config))),
+ Node3Queues = lists:sort(lists:nth(3, ?config(per_node_queues, Config))),
+
+ %% All queues are listed
+ ListedQueues =
+ [ {Name, State}
+ || [Name, State] <- rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "state"]) ],
+
+ [ <<"running">> = proplists:get_value(Q, ListedQueues) || Q <- Node1Queues ],
+ %% Node is running. Vhost is down
+ [ <<"stopped">> = proplists:get_value(Q, ListedQueues) || Q <- Node2Queues ],
+ %% Node is not running. Vhost is down
+ [ <<"down">> = proplists:get_value(Q, ListedQueues) || Q <- Node3Queues ].
+
%%----------------------------------------------------------------------------
%% Helpers
%%----------------------------------------------------------------------------
diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl
index a519d01af5..6ed84dcfe3 100644
--- a/test/vhost_SUITE.erl
+++ b/test/vhost_SUITE.erl
@@ -41,7 +41,9 @@ groups() ->
vhost_failure_forces_connection_closure,
dead_vhost_connection_refused,
vhost_failure_forces_connection_closure_on_failure_node,
- dead_vhost_connection_refused_on_failure_node
+ dead_vhost_connection_refused_on_failure_node,
+ node_starts_with_dead_vhosts,
+ node_starts_with_dead_vhosts_and_ignore_slaves
],
[
{cluster_size_1_network, [], ClusterSize1Tests},
@@ -85,7 +87,7 @@ init_per_group(cluster_size_2_direct, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
init_per_multinode_group(cluster_size_2_direct, Config1, 2).
-init_per_multinode_group(Group, Config, NodeCount) ->
+init_per_multinode_group(_Group, Config, NodeCount) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodes_count, NodeCount},
@@ -107,9 +109,24 @@ init_per_testcase(Testcase, Config) ->
Config.
end_per_testcase(Testcase, Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+ case Testcase of
+ cluster_vhost_deletion_forces_connection_closure -> ok;
+ single_node_vhost_deletion_forces_connection_closure -> ok;
+ _ ->
+ delete_vhost(Config, VHost2)
+ end,
+ delete_vhost(Config, VHost1),
clear_all_connection_tracking_tables(Config),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
+delete_vhost(Config, VHost) ->
+ case rabbit_ct_broker_helpers:delete_vhost(Config, VHost) of
+ ok -> ok;
+ {error, {no_such_vhost, _}} -> ok
+ end.
+
clear_all_connection_tracking_tables(Config) ->
[rabbit_ct_broker_helpers:rpc(Config,
N,
@@ -120,6 +137,7 @@ clear_all_connection_tracking_tables(Config) ->
%% -------------------------------------------------------------------
%% Test cases.
%% -------------------------------------------------------------------
+
single_node_vhost_deletion_forces_connection_closure(Config) ->
VHost1 = <<"vhost1">>,
VHost2 = <<"vhost2">>,
@@ -141,9 +159,7 @@ single_node_vhost_deletion_forces_connection_closure(Config) ->
?assertEqual(0, count_connections_in(Config, VHost2)),
close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, VHost1)),
-
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+ ?assertEqual(0, count_connections_in(Config, VHost1)).
vhost_failure_forces_connection_closure(Config) ->
VHost1 = <<"vhost1">>,
@@ -161,15 +177,12 @@ vhost_failure_forces_connection_closure(Config) ->
[_Conn2] = open_connections(Config, [{0, VHost2}]),
?assertEqual(1, count_connections_in(Config, VHost2)),
- force_vhost_failure(Config, VHost2),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2),
timer:sleep(200),
?assertEqual(0, count_connections_in(Config, VHost2)),
close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, VHost1)),
-
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+ ?assertEqual(0, count_connections_in(Config, VHost1)).
dead_vhost_connection_refused(Config) ->
VHost1 = <<"vhost1">>,
@@ -181,7 +194,7 @@ dead_vhost_connection_refused(Config) ->
?assertEqual(0, count_connections_in(Config, VHost1)),
?assertEqual(0, count_connections_in(Config, VHost2)),
- force_vhost_failure(Config, VHost2),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2),
timer:sleep(200),
[_Conn1] = open_connections(Config, [{0, VHost1}]),
@@ -190,10 +203,7 @@ dead_vhost_connection_refused(Config) ->
[_Conn2] = open_connections(Config, [{0, VHost2}]),
?assertEqual(0, count_connections_in(Config, VHost2)),
- expect_that_client_connection_is_rejected(Config, 0, VHost2),
-
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+ expect_that_client_connection_is_rejected(Config, 0, VHost2).
vhost_failure_forces_connection_closure_on_failure_node(Config) ->
@@ -213,7 +223,7 @@ vhost_failure_forces_connection_closure_on_failure_node(Config) ->
[_Conn21] = open_connections(Config, [{1, VHost2}]),
?assertEqual(2, count_connections_in(Config, VHost2)),
- force_vhost_failure(Config, 0, VHost2),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2),
timer:sleep(200),
%% Vhost2 connection on node 1 is still alive
?assertEqual(1, count_connections_in(Config, VHost2)),
@@ -221,10 +231,7 @@ vhost_failure_forces_connection_closure_on_failure_node(Config) ->
?assertEqual(1, count_connections_in(Config, VHost1)),
close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, VHost1)),
-
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+ ?assertEqual(0, count_connections_in(Config, VHost1)).
dead_vhost_connection_refused_on_failure_node(Config) ->
VHost1 = <<"vhost1">>,
@@ -236,7 +243,7 @@ dead_vhost_connection_refused_on_failure_node(Config) ->
?assertEqual(0, count_connections_in(Config, VHost1)),
?assertEqual(0, count_connections_in(Config, VHost2)),
- force_vhost_failure(Config, 0, VHost2),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2),
timer:sleep(200),
%% Can open connections to vhost1 on node 0 and 1
[_Conn10] = open_connections(Config, [{0, VHost1}]),
@@ -257,37 +264,6 @@ dead_vhost_connection_refused_on_failure_node(Config) ->
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
-force_vhost_failure(Config, VHost) -> force_vhost_failure(Config, 0, VHost).
-
-force_vhost_failure(Config, Node, VHost) ->
- force_vhost_failure(Config, Node, VHost, 10).
-
-force_vhost_failure(_Config, _Node, VHost, 0) ->
- error({failed_to_force_vhost_failure, no_more_attempts_left, VHost});
-force_vhost_failure(Config, Node, VHost, Attempts) ->
- MessageStorePid = get_message_store_pid(Config, VHost),
- rabbit_ct_broker_helpers:rpc(Config, Node,
- erlang, exit,
- [MessageStorePid, force_vhost_failure]),
- %% Give it a time to fail
- timer:sleep(200),
- case rabbit_ct_broker_helpers:rpc(Config, 0,
- rabbit_vhost_sup_sup, is_vhost_alive,
- [VHost]) of
- true -> force_vhost_failure(Config, Node, VHost, Attempts - 1);
- false -> ok
- end.
-
-get_message_store_pid(Config, VHost) ->
- {ok, VHostSup} = rabbit_ct_broker_helpers:rpc(Config, 0,
- rabbit_vhost_sup_sup, vhost_sup, [VHost]),
- Children = rabbit_ct_broker_helpers:rpc(Config, 0,
- supervisor, which_children,
- [VHostSup]),
- [MsgStorePid] = [Pid || {Name, Pid, _, _} <- Children,
- Name == msg_store_persistent],
- MsgStorePid.
-
cluster_vhost_deletion_forces_connection_closure(Config) ->
VHost1 = <<"vhost1">>,
VHost2 = <<"vhost2">>,
@@ -309,9 +285,93 @@ cluster_vhost_deletion_forces_connection_closure(Config) ->
?assertEqual(0, count_connections_in(Config, VHost2)),
close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost1)).
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+node_starts_with_dead_vhosts(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1, VHost1),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+
+ QName = <<"node_starts_with_dead_vhosts-q-1">>,
+ amqp_channel:call(Chan, #'queue.declare'{queue = QName, durable = true}),
+ rabbit_ct_client_helpers:publish(Chan, QName, 10),
+
+ DataStore1 = rabbit_ct_broker_helpers:rpc(
+ Config, 1, rabbit_vhost, msg_store_dir_path, [VHost1]),
+
+ rabbit_ct_broker_helpers:stop_node(Config, 1),
+
+ file:write_file(filename:join(DataStore1, "recovery.dets"), <<"garbage">>),
+
+ %% The node should start without a vhost
+ ok = rabbit_ct_broker_helpers:start_node(Config, 1),
+
+ timer:sleep(500),
+
+ false = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]),
+ true = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]).
+
+node_starts_with_dead_vhosts_and_ignore_slaves(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ true = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]),
+ true = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]),
+
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost1),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+
+ QName = <<"node_starts_with_dead_vhosts_and_ignore_slaves-q-0">>,
+ amqp_channel:call(Chan, #'queue.declare'{queue = QName, durable = true}),
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_policy, set,
+ [VHost1, <<"mirror">>, <<".*">>, [{<<"ha-mode">>, <<"all">>}],
+ 0, <<"queues">>, <<"acting-user">>]),
+
+ %% Wait for the queue to create a slave
+ timer:sleep(300),
+
+ rabbit_ct_client_helpers:publish(Chan, QName, 10),
+
+ {ok, Q} = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_amqqueue, lookup,
+ [rabbit_misc:r(VHost1, queue, QName)], infinity),
+
+ Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+
+ #amqqueue{sync_slave_pids = [Pid]} = Q,
+
+ Node1 = node(Pid),
+
+ DataStore1 = rabbit_ct_broker_helpers:rpc(
+ Config, 1, rabbit_vhost, msg_store_dir_path, [VHost1]),
+
+ rabbit_ct_broker_helpers:stop_node(Config, 1),
+
+ file:write_file(filename:join(DataStore1, "recovery.dets"), <<"garbage">>),
+
+ %% The node should start without a vhost
+ ok = rabbit_ct_broker_helpers:start_node(Config, 1),
+
+ timer:sleep(500),
+
+ false = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]),
+ true = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]).
%% -------------------------------------------------------------------
%% Helpers