summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-08-01 03:44:22 +0300
committerMichael Klishin <michael@clojurewerkz.org>2017-08-01 03:44:22 +0300
commit1342b04a177207c66d562e4d33610efcd3eac216 (patch)
treecd584cbdce16720b82fd3cfb6ebc225d3d323cf4
parent7bca93c7f931fbff8862077ea07b710004a4f12a (diff)
parentc58a15e7893bae019418c486d971046e879e0385 (diff)
downloadrabbitmq-server-git-1342b04a177207c66d562e4d33610efcd3eac216.tar.gz
Merge branch 'master' into rabbitmq-server-1314
Conflicts: test/dynamic_ha_SUITE.erl
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl6
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_connection_tracking_handler.erl19
-rw-r--r--src/rabbit_mirror_queue_misc.erl20
-rw-r--r--src/rabbit_recovery_terms.erl4
-rw-r--r--src/rabbit_vhost_limit.erl7
-rw-r--r--src/rabbit_vhost_msg_store.erl6
-rw-r--r--src/rabbit_vhost_process.erl1
-rw-r--r--src/rabbit_vhost_sup_sup.erl60
-rw-r--r--src/rabbit_vhost_sup_wrapper.erl7
-rw-r--r--test/dynamic_ha_SUITE.erl19
-rw-r--r--test/rabbitmqctl_integration_SUITE.erl33
-rw-r--r--test/vhost_SUITE.erl166
15 files changed, 272 insertions, 107 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ff57593374..5537634144 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,6 +40,7 @@
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]).
-export([pid_of/1, pid_of/2]).
+-export([mark_local_durable_queues_stopped/1]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -255,6 +256,15 @@ start(Qs) ->
[Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
ok.
+mark_local_durable_queues_stopped(VHost) ->
+ Qs = find_durable_queues(VHost),
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ [ store_queue(Q#amqqueue{ state = stopped })
+ || Q = #amqqueue{ state = State } <- Qs,
+ State =/= stopped ]
+ end).
+
find_durable_queues(VHost) ->
Node = node(),
mnesia:async_dirty(
@@ -452,6 +462,9 @@ with(Name, F, E, RetriesLeft) ->
E({absent, Q, timeout});
{ok, Q = #amqqueue{state = crashed}} ->
E({absent, Q, crashed});
+ {ok, Q = #amqqueue{state = stopped}} ->
+ %% The queue process was stopped by the supervisor
+ E({absent, Q, stopped});
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
@@ -642,10 +655,13 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
+info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped);
info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
info(Q = #amqqueue{ state = crashed }, Items) ->
info_down(Q, Items, crashed);
+info(Q = #amqqueue{ state = stopped }, Items) ->
+ info_down(Q, Items, stopped);
info(#amqqueue{ pid = QPid }, Items) ->
case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of
{ok, Res} -> Res;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4e43104de2..678f1136c3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -265,9 +265,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
notify_decorators(startup, State3),
State3.
-terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) ->
rabbit_core_metrics:queue_deleted(qname(State)),
- terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+ terminate_shutdown(
+ fun (BQS) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ Q2 = Q#amqqueue{state = stopped},
+ rabbit_amqqueue:store_queue(Q2)
+ end),
+ BQ:terminate(R, BQS)
+ end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
%% if the owner was missing then there will be no queue, so don't emit stats
terminate_shutdown(terminate_delete(false, Reason, State), State);
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index b5ef86255d..f0bcbd7c60 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -57,7 +57,7 @@ find_for_vhost(VHost) ->
-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}.
find_for_vhost(VHost, Node) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
+ {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node),
case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
[QSup] -> {ok, QSup};
Result -> {error, {queue_supervisor_not_found, Result}}
@@ -65,7 +65,7 @@ find_for_vhost(VHost, Node) ->
-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_for_vhost(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
supervisor2:start_child(
VHostSup,
@@ -82,7 +82,7 @@ start_for_vhost(VHost) ->
-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
stop_for_vhost(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 00a6607dfb..c69a27d57c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2142,6 +2142,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
fun (not_found) -> {ok, 0};
({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username),
{ok, 0};
+ ({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username),
+ {ok, 0};
({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
end) of
{error, in_use} ->
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index 3ae17677e0..ca13700da0 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -82,11 +82,14 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
close_connections(rabbit_connection_tracking:list(VHost),
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
{ok, State};
+%% Note: under normal circumstances this will be called immediately
+%% after the vhost_deleted above. Therefore we should be careful about
+%% what we log and be more defensive.
handle_event(#event{type = vhost_down, props = Details}, State) ->
VHost = pget(name, Details),
Node = pget(node, Details),
- rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
- " because the vhost database has stopped working",
+ rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'"
+ " because the vhost is stopping",
[VHost, Node]),
close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
rabbit_misc:format("vhost '~s' is down", [VHost])),
@@ -131,7 +134,17 @@ close_connections(Tracked, Message, Delay) ->
ok.
close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
- rabbit_networking:close_connection(Pid, Message);
+ try
+ rabbit_networking:close_connection(Pid, Message)
+ catch error:{not_a_connection, _} ->
+ %% could has been closed concurrently, or the input
+ %% is bogus. In any case, we should not terminate
+ ok;
+ _:Err ->
+ %% ignore, don't terminate
+ rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]),
+ ok
+ end;
close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
%% Do an RPC call to the node running the direct client.
Node = node(Pid),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index dab98c740e..725cd7d089 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -231,11 +231,21 @@ add_mirror(QName, MirrorNode, SyncMode) ->
rabbit_misc:with_exit_handler(
rabbit_misc:const(ok),
fun () ->
- SPid = rabbit_amqqueue_sup_sup:start_queue_process(
- MirrorNode, Q, slave),
- log_info(QName, "Adding mirror on node ~p: ~p~n",
- [MirrorNode, SPid]),
- rabbit_mirror_queue_slave:go(SPid, SyncMode)
+ #amqqueue{name = #resource{virtual_host = VHost}} = Q,
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of
+ {ok, _} ->
+ SPid = rabbit_amqqueue_sup_sup:start_queue_process(
+ MirrorNode, Q, slave),
+ log_info(QName, "Adding mirror on node ~p: ~p~n",
+ [MirrorNode, SPid]),
+ rabbit_mirror_queue_slave:go(SPid, SyncMode);
+ {error, Error} ->
+ log_warning(QName,
+ "Unable to start queue mirror on node '~p'. "
+ "Target virtual host is not running: ~p~n",
+ [MirrorNode, Error]),
+ ok
+ end
end);
{error, not_found} = E ->
E
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index 73fc9c7449..b73f3add7c 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -48,7 +48,7 @@
%%----------------------------------------------------------------------------
start(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
{ok, _} = supervisor2:start_child(
VHostSup,
@@ -65,7 +65,7 @@ start(VHost) ->
ok.
stop(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
case supervisor:terminate_child(VHostSup, ?MODULE) of
ok -> supervisor:delete_child(VHostSup, ?MODULE);
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
index 7b797e46b2..9d8a6795b4 100644
--- a/src/rabbit_vhost_limit.erl
+++ b/src/rabbit_vhost_limit.erl
@@ -55,7 +55,12 @@ notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits, ActingUser) ->
notify_clear(VHost, <<"vhost-limits">>, <<"limits">>, ActingUser) ->
rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>},
{user_who_performed_action, ActingUser}]),
- update_vhost(VHost, undefined).
+ %% If the function is called as a part of vhost deletion, the vhost can
+ %% be already deleted.
+ case rabbit_vhost:exists(VHost) of
+ true -> update_vhost(VHost, undefined);
+ false -> ok
+ end.
connection_limit(VirtualHost) ->
get_limit(VirtualHost, <<"max-connections">>).
diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl
index 3c633875bc..b9af37c258 100644
--- a/src/rabbit_vhost_msg_store.erl
+++ b/src/rabbit_vhost_msg_store.erl
@@ -23,7 +23,7 @@
start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
ClientRefs == undefined ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
supervisor2:start_child(VHostSup,
@@ -39,7 +39,7 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
end.
stop(VHost, Type) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
ok = supervisor2:terminate_child(VHostSup, Type),
ok = supervisor2:delete_child(VHostSup, Type);
@@ -65,7 +65,7 @@ with_vhost_store(VHost, Type, Fun) ->
end.
vhost_store_pid(VHost, Type) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost),
case supervisor2:find_child(VHostSup, Type) of
[Pid] -> Pid;
[] -> no_pid
diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl
index e3c815a727..f6e4a83daa 100644
--- a/src/rabbit_vhost_process.erl
+++ b/src/rabbit_vhost_process.erl
@@ -55,6 +55,7 @@ init([VHost]) ->
timer:send_interval(Interval, check_vhost),
{ok, VHost}
catch _:Reason ->
+ rabbit_amqqueue:mark_local_durable_queues_stopped(VHost),
rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
" Stacktrace ~p",
[VHost, Reason, erlang:get_stacktrace()]),
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index 1d5db93fda..93c26d4e0f 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -23,7 +23,7 @@
-export([init/1]).
-export([start_link/0, start/0]).
--export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
+-export([init_vhost/1, get_vhost_sup/1, get_vhost_sup/2, save_vhost_sup/3]).
-export([delete_on_all_nodes/1]).
-export([start_on_all_nodes/1]).
@@ -72,7 +72,7 @@ delete_on_all_nodes(VHost) ->
ok.
stop_and_delete_vhost(VHost) ->
- case get_vhost_sup(VHost) of
+ StopResult = case lookup_vhost_sup_record(VHost) of
not_found -> ok;
#vhost_sup{wrapper_pid = WrapperPid,
vhost_sup_pid = VHostSupPid} ->
@@ -84,13 +84,15 @@ stop_and_delete_vhost(VHost) ->
[VHostSupPid, VHost]),
case supervisor2:terminate_child(?MODULE, WrapperPid) of
ok ->
- ets:delete(?MODULE, VHost),
- ok = rabbit_vhost:delete_storage(VHost);
+ true = ets:delete(?MODULE, VHost),
+ ok;
Other ->
Other
end
end
- end.
+ end,
+ ok = rabbit_vhost:delete_storage(VHost),
+ StopResult.
%% We take an optimistic approach whan stopping a remote VHost supervisor.
stop_and_delete_vhost(VHost, Node) when Node == node(self()) ->
@@ -106,7 +108,7 @@ stop_and_delete_vhost(VHost, Node) ->
{error, RpcErr}
end.
--spec init_vhost(rabbit_types:vhost()) -> ok.
+-spec init_vhost(rabbit_types:vhost()) -> ok | {error, {no_such_vhost, rabbit_types:vhsot()}}.
init_vhost(VHost) ->
case start_vhost(VHost) of
{ok, _} -> ok;
@@ -130,30 +132,32 @@ init_vhost(VHost) ->
end
end.
--spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}.
-vhost_sup(VHost, Node) ->
- case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of
+-type vhost_error() :: {no_such_vhost, rabbit_types:vhost()} |
+ {vhost_supervisor_not_running, rabbit_types:vhost()}.
+
+-spec get_vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, vhost_error() | term()}.
+get_vhost_sup(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, get_vhost_sup, [VHost]) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
+ {error, Err} ->
+ {error, Err};
{badrpc, RpcErr} ->
{error, RpcErr}
end.
--spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}.
-vhost_sup(VHost) ->
- case vhost_sup_pid(VHost) of
- no_pid ->
- case start_vhost(VHost) of
- {ok, Pid} ->
- true = is_vhost_alive(VHost),
- {ok, Pid};
- {error, {no_such_vhost, VHost}} ->
- {error, {no_such_vhost, VHost}};
- Error ->
- throw(Error)
- end;
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid}
+-spec get_vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, vhost_error()}.
+get_vhost_sup(VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ false ->
+ {error, {no_such_vhost, VHost}};
+ true ->
+ case vhost_sup_pid(VHost) of
+ no_pid ->
+ {error, {vhost_supervisor_not_running, VHost}};
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid}
+ end
end.
-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}.
@@ -181,7 +185,7 @@ start_vhost(VHost) ->
is_vhost_alive(VHost) ->
%% A vhost is considered alive if it's supervision tree is alive and
%% saved in the ETS table
- case get_vhost_sup(VHost) of
+ case lookup_vhost_sup_record(VHost) of
#vhost_sup{wrapper_pid = WrapperPid,
vhost_sup_pid = VHostSupPid,
vhost_process_pid = VHostProcessPid}
@@ -210,8 +214,8 @@ save_vhost_process(VHost, VHostProcessPid) ->
{#vhost_sup.vhost_process_pid, VHostProcessPid}),
ok.
--spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
-get_vhost_sup(VHost) ->
+-spec lookup_vhost_sup_record(rabbit_types:vhost()) -> #vhost_sup{} | not_found.
+lookup_vhost_sup_record(VHost) ->
case ets:lookup(?MODULE, VHost) of
[] -> not_found;
[#vhost_sup{} = VHostSup] -> VHostSup
@@ -219,7 +223,7 @@ get_vhost_sup(VHost) ->
-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}.
vhost_sup_pid(VHost) ->
- case get_vhost_sup(VHost) of
+ case lookup_vhost_sup_record(VHost) of
not_found ->
no_pid;
#vhost_sup{vhost_sup_pid = Pid} = VHostSup ->
diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl
index 8e23389bb9..4ae68cdd75 100644
--- a/src/rabbit_vhost_sup_wrapper.erl
+++ b/src/rabbit_vhost_sup_wrapper.erl
@@ -29,7 +29,12 @@
start_link(VHost) ->
%% Using supervisor, because supervisor2 does not stop a started child when
%% another one fails to start. Bug?
- supervisor:start_link(?MODULE, [VHost]).
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
+ {ok, Pid} ->
+ {error, {already_started, Pid}};
+ {error, _} ->
+ supervisor:start_link(?MODULE, [VHost])
+ end.
init([VHost]) ->
%% 2 restarts in 5 minutes. One per message store.
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index f1ff2f6685..3a4146fcab 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -61,13 +61,15 @@ groups() ->
slave_recovers_after_vhost_failure,
slave_recovers_after_vhost_down_an_up,
master_migrates_on_vhost_down,
- slave_recovers_after_vhost_down_and_master_migrated
+ slave_recovers_after_vhost_down_and_master_migrated,
+ queue_survive_adding_dead_vhost_mirror
]},
{cluster_size_3, [], [
change_policy,
rapid_change,
nodes_policy_should_pick_master_from_its_params,
- promote_slave_after_standalone_restart
+ promote_slave_after_standalone_restart,
+ queue_survive_adding_dead_vhost_mirror
% FIXME: Re-enable those tests when the know issues are
% fixed.
% failing_random_policies,
@@ -222,6 +224,19 @@ rapid_loop(Config, Node, MRef) ->
rapid_loop(Config, Node, MRef)
end.
+queue_survive_adding_dead_vhost_mirror(Config) ->
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, 1, <<"/">>),
+ NodeA = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ ChA = rabbit_ct_client_helpers:open_channel(Config, NodeA),
+ QName = <<"queue_survive_adding_dead_vhost_mirror-q-1">>,
+ amqp_channel:call(ChA, #'queue.declare'{queue = QName}),
+ Q = find_queue(QName, NodeA),
+ Pid = proplists:get_value(pid, Q),
+ rabbit_ct_broker_helpers:set_ha_policy_all(Config),
+ %% Queue should not fail
+ Q1 = find_queue(QName, NodeA),
+ Pid = proplists:get_value(pid, Q1).
+
%% Vhost deletion needs to successfully tear down policies and queues
%% with policies. At least smoke-test that it doesn't blow up.
vhost_deletion(Config) ->
diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl
index 535725d585..71b74ea104 100644
--- a/test/rabbitmqctl_integration_SUITE.erl
+++ b/test/rabbitmqctl_integration_SUITE.erl
@@ -31,6 +31,7 @@
-export([list_queues_local/1
,list_queues_offline/1
,list_queues_online/1
+ ,list_queues_stopped/1
]).
all() ->
@@ -44,6 +45,7 @@ groups() ->
[list_queues_local
,list_queues_online
,list_queues_offline
+ ,list_queues_stopped
]}
].
@@ -96,13 +98,19 @@ end_per_group(list_queues, Config0) ->
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps());
-end_per_group(global_parameters, Config) ->
- rabbit_ct_helpers:run_teardown_steps(Config,
- rabbit_ct_client_helpers:teardown_steps() ++
- rabbit_ct_broker_helpers:teardown_steps());
end_per_group(_, Config) ->
Config.
+init_per_testcase(list_queues_stopped, Config0) ->
+ %% Start node 3 to crash it's queues
+ rabbit_ct_broker_helpers:start_node(Config0, 2),
+ %% Make vhost "down" on nodes 2 and 3
+ rabbit_ct_broker_helpers:force_vhost_failure(Config0, 1, <<"/">>),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config0, 2, <<"/">>),
+
+ rabbit_ct_broker_helpers:stop_node(Config0, 2),
+ rabbit_ct_helpers:testcase_started(Config0, list_queues_stopped);
+
init_per_testcase(Testcase, Config0) ->
rabbit_ct_helpers:testcase_started(Config0, Testcase).
@@ -134,6 +142,23 @@ list_queues_offline(Config) ->
assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues),
ok.
+list_queues_stopped(Config) ->
+ Node1Queues = lists:sort(lists:nth(1, ?config(per_node_queues, Config))),
+ Node2Queues = lists:sort(lists:nth(2, ?config(per_node_queues, Config))),
+ Node3Queues = lists:sort(lists:nth(3, ?config(per_node_queues, Config))),
+
+ %% All queues are listed
+ ListedQueues =
+ [ {Name, State}
+ || [Name, State] <- rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "state"]) ],
+
+ [ <<"running">> = proplists:get_value(Q, ListedQueues) || Q <- Node1Queues ],
+ %% Node is running. Vhost is down
+ [ <<"stopped">> = proplists:get_value(Q, ListedQueues) || Q <- Node2Queues ],
+ %% Node is not running. Vhost is down
+ [ <<"down">> = proplists:get_value(Q, ListedQueues) || Q <- Node3Queues ].
+
%%----------------------------------------------------------------------------
%% Helpers
%%----------------------------------------------------------------------------
diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl
index fd613a2f1c..6ed84dcfe3 100644
--- a/test/vhost_SUITE.erl
+++ b/test/vhost_SUITE.erl
@@ -41,7 +41,9 @@ groups() ->
vhost_failure_forces_connection_closure,
dead_vhost_connection_refused,
vhost_failure_forces_connection_closure_on_failure_node,
- dead_vhost_connection_refused_on_failure_node
+ dead_vhost_connection_refused_on_failure_node,
+ node_starts_with_dead_vhosts,
+ node_starts_with_dead_vhosts_and_ignore_slaves
],
[
{cluster_size_1_network, [], ClusterSize1Tests},
@@ -107,9 +109,24 @@ init_per_testcase(Testcase, Config) ->
Config.
end_per_testcase(Testcase, Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+ case Testcase of
+ cluster_vhost_deletion_forces_connection_closure -> ok;
+ single_node_vhost_deletion_forces_connection_closure -> ok;
+ _ ->
+ delete_vhost(Config, VHost2)
+ end,
+ delete_vhost(Config, VHost1),
clear_all_connection_tracking_tables(Config),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
+delete_vhost(Config, VHost) ->
+ case rabbit_ct_broker_helpers:delete_vhost(Config, VHost) of
+ ok -> ok;
+ {error, {no_such_vhost, _}} -> ok
+ end.
+
clear_all_connection_tracking_tables(Config) ->
[rabbit_ct_broker_helpers:rpc(Config,
N,
@@ -120,6 +137,7 @@ clear_all_connection_tracking_tables(Config) ->
%% -------------------------------------------------------------------
%% Test cases.
%% -------------------------------------------------------------------
+
single_node_vhost_deletion_forces_connection_closure(Config) ->
VHost1 = <<"vhost1">>,
VHost2 = <<"vhost2">>,
@@ -141,9 +159,7 @@ single_node_vhost_deletion_forces_connection_closure(Config) ->
?assertEqual(0, count_connections_in(Config, VHost2)),
close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, VHost1)),
-
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+ ?assertEqual(0, count_connections_in(Config, VHost1)).
vhost_failure_forces_connection_closure(Config) ->
VHost1 = <<"vhost1">>,
@@ -161,15 +177,12 @@ vhost_failure_forces_connection_closure(Config) ->
[_Conn2] = open_connections(Config, [{0, VHost2}]),
?assertEqual(1, count_connections_in(Config, VHost2)),
- force_vhost_failure(Config, VHost2),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2),
timer:sleep(200),
?assertEqual(0, count_connections_in(Config, VHost2)),
close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, VHost1)),
-
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+ ?assertEqual(0, count_connections_in(Config, VHost1)).
dead_vhost_connection_refused(Config) ->
VHost1 = <<"vhost1">>,
@@ -181,7 +194,7 @@ dead_vhost_connection_refused(Config) ->
?assertEqual(0, count_connections_in(Config, VHost1)),
?assertEqual(0, count_connections_in(Config, VHost2)),
- force_vhost_failure(Config, VHost2),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2),
timer:sleep(200),
[_Conn1] = open_connections(Config, [{0, VHost1}]),
@@ -190,10 +203,7 @@ dead_vhost_connection_refused(Config) ->
[_Conn2] = open_connections(Config, [{0, VHost2}]),
?assertEqual(0, count_connections_in(Config, VHost2)),
- expect_that_client_connection_is_rejected(Config, 0, VHost2),
-
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+ expect_that_client_connection_is_rejected(Config, 0, VHost2).
vhost_failure_forces_connection_closure_on_failure_node(Config) ->
@@ -213,7 +223,7 @@ vhost_failure_forces_connection_closure_on_failure_node(Config) ->
[_Conn21] = open_connections(Config, [{1, VHost2}]),
?assertEqual(2, count_connections_in(Config, VHost2)),
- force_vhost_failure(Config, 0, VHost2),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2),
timer:sleep(200),
%% Vhost2 connection on node 1 is still alive
?assertEqual(1, count_connections_in(Config, VHost2)),
@@ -221,10 +231,7 @@ vhost_failure_forces_connection_closure_on_failure_node(Config) ->
?assertEqual(1, count_connections_in(Config, VHost1)),
close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, VHost1)),
-
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+ ?assertEqual(0, count_connections_in(Config, VHost1)).
dead_vhost_connection_refused_on_failure_node(Config) ->
VHost1 = <<"vhost1">>,
@@ -236,7 +243,7 @@ dead_vhost_connection_refused_on_failure_node(Config) ->
?assertEqual(0, count_connections_in(Config, VHost1)),
?assertEqual(0, count_connections_in(Config, VHost2)),
- force_vhost_failure(Config, 0, VHost2),
+ rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2),
timer:sleep(200),
%% Can open connections to vhost1 on node 0 and 1
[_Conn10] = open_connections(Config, [{0, VHost1}]),
@@ -257,37 +264,6 @@ dead_vhost_connection_refused_on_failure_node(Config) ->
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
-force_vhost_failure(Config, VHost) -> force_vhost_failure(Config, 0, VHost).
-
-force_vhost_failure(Config, Node, VHost) ->
- force_vhost_failure(Config, Node, VHost, 10).
-
-force_vhost_failure(_Config, _Node, VHost, 0) ->
- error({failed_to_force_vhost_failure, no_more_attempts_left, VHost});
-force_vhost_failure(Config, Node, VHost, Attempts) ->
- MessageStorePid = get_message_store_pid(Config, VHost),
- rabbit_ct_broker_helpers:rpc(Config, Node,
- erlang, exit,
- [MessageStorePid, force_vhost_failure]),
- %% Give it a time to fail
- timer:sleep(200),
- case rabbit_ct_broker_helpers:rpc(Config, 0,
- rabbit_vhost_sup_sup, is_vhost_alive,
- [VHost]) of
- true -> force_vhost_failure(Config, Node, VHost, Attempts - 1);
- false -> ok
- end.
-
-get_message_store_pid(Config, VHost) ->
- {ok, VHostSup} = rabbit_ct_broker_helpers:rpc(Config, 0,
- rabbit_vhost_sup_sup, vhost_sup, [VHost]),
- Children = rabbit_ct_broker_helpers:rpc(Config, 0,
- supervisor, which_children,
- [VHostSup]),
- [MsgStorePid] = [Pid || {Name, Pid, _, _} <- Children,
- Name == msg_store_persistent],
- MsgStorePid.
-
cluster_vhost_deletion_forces_connection_closure(Config) ->
VHost1 = <<"vhost1">>,
VHost2 = <<"vhost2">>,
@@ -309,9 +285,93 @@ cluster_vhost_deletion_forces_connection_closure(Config) ->
?assertEqual(0, count_connections_in(Config, VHost2)),
close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost1)).
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+node_starts_with_dead_vhosts(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1, VHost1),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+
+ QName = <<"node_starts_with_dead_vhosts-q-1">>,
+ amqp_channel:call(Chan, #'queue.declare'{queue = QName, durable = true}),
+ rabbit_ct_client_helpers:publish(Chan, QName, 10),
+
+ DataStore1 = rabbit_ct_broker_helpers:rpc(
+ Config, 1, rabbit_vhost, msg_store_dir_path, [VHost1]),
+
+ rabbit_ct_broker_helpers:stop_node(Config, 1),
+
+ file:write_file(filename:join(DataStore1, "recovery.dets"), <<"garbage">>),
+
+ %% The node should start without a vhost
+ ok = rabbit_ct_broker_helpers:start_node(Config, 1),
+
+ timer:sleep(500),
+
+ false = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]),
+ true = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]).
+
+node_starts_with_dead_vhosts_and_ignore_slaves(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ true = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]),
+ true = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]),
+
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost1),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+
+ QName = <<"node_starts_with_dead_vhosts_and_ignore_slaves-q-0">>,
+ amqp_channel:call(Chan, #'queue.declare'{queue = QName, durable = true}),
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_policy, set,
+ [VHost1, <<"mirror">>, <<".*">>, [{<<"ha-mode">>, <<"all">>}],
+ 0, <<"queues">>, <<"acting-user">>]),
+
+ %% Wait for the queue to create a slave
+ timer:sleep(300),
+
+ rabbit_ct_client_helpers:publish(Chan, QName, 10),
+
+ {ok, Q} = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_amqqueue, lookup,
+ [rabbit_misc:r(VHost1, queue, QName)], infinity),
+
+ Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+
+ #amqqueue{sync_slave_pids = [Pid]} = Q,
+
+ Node1 = node(Pid),
+
+ DataStore1 = rabbit_ct_broker_helpers:rpc(
+ Config, 1, rabbit_vhost, msg_store_dir_path, [VHost1]),
+
+ rabbit_ct_broker_helpers:stop_node(Config, 1),
+
+ file:write_file(filename:join(DataStore1, "recovery.dets"), <<"garbage">>),
+
+ %% The node should start without a vhost
+ ok = rabbit_ct_broker_helpers:start_node(Config, 1),
+
+ timer:sleep(500),
+
+ false = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]),
+ true = rabbit_ct_broker_helpers:rpc(Config, 1,
+ rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]).
%% -------------------------------------------------------------------
%% Helpers