diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2017-08-01 03:44:22 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2017-08-01 03:44:22 +0300 |
| commit | 1342b04a177207c66d562e4d33610efcd3eac216 (patch) | |
| tree | cd584cbdce16720b82fd3cfb6ebc225d3d323cf4 | |
| parent | 7bca93c7f931fbff8862077ea07b710004a4f12a (diff) | |
| parent | c58a15e7893bae019418c486d971046e879e0385 (diff) | |
| download | rabbitmq-server-git-1342b04a177207c66d562e4d33610efcd3eac216.tar.gz | |
Merge branch 'master' into rabbitmq-server-1314
Conflicts:
test/dynamic_ha_SUITE.erl
| -rw-r--r-- | src/rabbit_amqqueue.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_sup_sup.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_recovery_terms.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_vhost_limit.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_vhost_msg_store.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_vhost_process.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 60 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_wrapper.erl | 7 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 19 | ||||
| -rw-r--r-- | test/rabbitmqctl_integration_SUITE.erl | 33 | ||||
| -rw-r--r-- | test/vhost_SUITE.erl | 166 |
15 files changed, 272 insertions, 107 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ff57593374..5537634144 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,6 +40,7 @@ -export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]). -export([pid_of/1, pid_of/2]). +-export([mark_local_durable_queues_stopped/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -255,6 +256,15 @@ start(Qs) -> [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs], ok. +mark_local_durable_queues_stopped(VHost) -> + Qs = find_durable_queues(VHost), + rabbit_misc:execute_mnesia_transaction( + fun() -> + [ store_queue(Q#amqqueue{ state = stopped }) + || Q = #amqqueue{ state = State } <- Qs, + State =/= stopped ] + end). + find_durable_queues(VHost) -> Node = node(), mnesia:async_dirty( @@ -452,6 +462,9 @@ with(Name, F, E, RetriesLeft) -> E({absent, Q, timeout}); {ok, Q = #amqqueue{state = crashed}} -> E({absent, Q, crashed}); + {ok, Q = #amqqueue{state = stopped}} -> + %% The queue process was stopped by the supervisor + E({absent, Q, stopped}); {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do @@ -642,10 +655,13 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed); +info(Q = #amqqueue{ state = stopped }) -> info_down(Q, stopped); info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}). info(Q = #amqqueue{ state = crashed }, Items) -> info_down(Q, Items, crashed); +info(Q = #amqqueue{ state = stopped }, Items) -> + info_down(Q, Items, stopped); info(#amqqueue{ pid = QPid }, Items) -> case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of {ok, Res} -> Res; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4e43104de2..678f1136c3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -265,9 +265,18 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, notify_decorators(startup, State3), State3. -terminate(shutdown = R, State = #q{backing_queue = BQ}) -> +terminate(shutdown = R, State = #q{backing_queue = BQ, q = #amqqueue{ name = QName }}) -> rabbit_core_metrics:queue_deleted(qname(State)), - terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); + terminate_shutdown( + fun (BQS) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + [Q] = mnesia:read({rabbit_queue, QName}), + Q2 = Q#amqqueue{state = stopped}, + rabbit_amqqueue:store_queue(Q2) + end), + BQ:terminate(R, BQS) + end, State); terminate({shutdown, missing_owner} = Reason, State) -> %% if the owner was missing then there will be no queue, so don't emit stats terminate_shutdown(terminate_delete(false, Reason, State), State); diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index b5ef86255d..f0bcbd7c60 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -57,7 +57,7 @@ find_for_vhost(VHost) -> -spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}. find_for_vhost(VHost, Node) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node), + {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node), case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of [QSup] -> {ok, QSup}; Result -> {error, {queue_supervisor_not_found, Result}} @@ -65,7 +65,7 @@ find_for_vhost(VHost, Node) -> -spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. start_for_vhost(VHost) -> - case rabbit_vhost_sup_sup:vhost_sup(VHost) of + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> supervisor2:start_child( VHostSup, @@ -82,7 +82,7 @@ start_for_vhost(VHost) -> -spec stop_for_vhost(rabbit_types:vhost()) -> ok. stop_for_vhost(VHost) -> - case rabbit_vhost_sup_sup:vhost_sup(VHost) of + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 00a6607dfb..c69a27d57c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2142,6 +2142,8 @@ handle_method(#'queue.delete'{queue = QueueNameBin, fun (not_found) -> {ok, 0}; ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username), {ok, 0}; + ({absent, Q, stopped}) -> rabbit_amqqueue:delete_crashed(Q, Username), + {ok, 0}; ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end) of {error, in_use} -> diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index 3ae17677e0..ca13700da0 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -82,11 +82,14 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) -> close_connections(rabbit_connection_tracking:list(VHost), rabbit_misc:format("vhost '~s' is deleted", [VHost])), {ok, State}; +%% Note: under normal circumstances this will be called immediately +%% after the vhost_deleted above. Therefore we should be careful about +%% what we log and be more defensive. handle_event(#event{type = vhost_down, props = Details}, State) -> VHost = pget(name, Details), Node = pget(node, Details), - rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'" - " because the vhost database has stopped working", + rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'" + " because the vhost is stopping", [VHost, Node]), close_connections(rabbit_connection_tracking:list_on_node(Node, VHost), rabbit_misc:format("vhost '~s' is down", [VHost])), @@ -131,7 +134,17 @@ close_connections(Tracked, Message, Delay) -> ok. close_connection(#tracked_connection{pid = Pid, type = network}, Message) -> - rabbit_networking:close_connection(Pid, Message); + try + rabbit_networking:close_connection(Pid, Message) + catch error:{not_a_connection, _} -> + %% could has been closed concurrently, or the input + %% is bogus. In any case, we should not terminate + ok; + _:Err -> + %% ignore, don't terminate + rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]), + ok + end; close_connection(#tracked_connection{pid = Pid, type = direct}, Message) -> %% Do an RPC call to the node running the direct client. Node = node(Pid), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index dab98c740e..725cd7d089 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -231,11 +231,21 @@ add_mirror(QName, MirrorNode, SyncMode) -> rabbit_misc:with_exit_handler( rabbit_misc:const(ok), fun () -> - SPid = rabbit_amqqueue_sup_sup:start_queue_process( - MirrorNode, Q, slave), - log_info(QName, "Adding mirror on node ~p: ~p~n", - [MirrorNode, SPid]), - rabbit_mirror_queue_slave:go(SPid, SyncMode) + #amqqueue{name = #resource{virtual_host = VHost}} = Q, + case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of + {ok, _} -> + SPid = rabbit_amqqueue_sup_sup:start_queue_process( + MirrorNode, Q, slave), + log_info(QName, "Adding mirror on node ~p: ~p~n", + [MirrorNode, SPid]), + rabbit_mirror_queue_slave:go(SPid, SyncMode); + {error, Error} -> + log_warning(QName, + "Unable to start queue mirror on node '~p'. " + "Target virtual host is not running: ~p~n", + [MirrorNode, Error]), + ok + end end); {error, not_found} = E -> E diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index 73fc9c7449..b73f3add7c 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -48,7 +48,7 @@ %%---------------------------------------------------------------------------- start(VHost) -> - case rabbit_vhost_sup_sup:vhost_sup(VHost) of + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> {ok, _} = supervisor2:start_child( VHostSup, @@ -65,7 +65,7 @@ start(VHost) -> ok. stop(VHost) -> - case rabbit_vhost_sup_sup:vhost_sup(VHost) of + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> case supervisor:terminate_child(VHostSup, ?MODULE) of ok -> supervisor:delete_child(VHostSup, ?MODULE); diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl index 7b797e46b2..9d8a6795b4 100644 --- a/src/rabbit_vhost_limit.erl +++ b/src/rabbit_vhost_limit.erl @@ -55,7 +55,12 @@ notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits, ActingUser) -> notify_clear(VHost, <<"vhost-limits">>, <<"limits">>, ActingUser) -> rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>}, {user_who_performed_action, ActingUser}]), - update_vhost(VHost, undefined). + %% If the function is called as a part of vhost deletion, the vhost can + %% be already deleted. + case rabbit_vhost:exists(VHost) of + true -> update_vhost(VHost, undefined); + false -> ok + end. connection_limit(VirtualHost) -> get_limit(VirtualHost, <<"max-connections">>). diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl index 3c633875bc..b9af37c258 100644 --- a/src/rabbit_vhost_msg_store.erl +++ b/src/rabbit_vhost_msg_store.erl @@ -23,7 +23,7 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs); ClientRefs == undefined -> - case rabbit_vhost_sup_sup:vhost_sup(VHost) of + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> VHostDir = rabbit_vhost:msg_store_dir_path(VHost), supervisor2:start_child(VHostSup, @@ -39,7 +39,7 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs); end. stop(VHost, Type) -> - case rabbit_vhost_sup_sup:vhost_sup(VHost) of + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of {ok, VHostSup} -> ok = supervisor2:terminate_child(VHostSup, Type), ok = supervisor2:delete_child(VHostSup, Type); @@ -65,7 +65,7 @@ with_vhost_store(VHost, Type, Fun) -> end. vhost_store_pid(VHost, Type) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost), case supervisor2:find_child(VHostSup, Type) of [Pid] -> Pid; [] -> no_pid diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl index e3c815a727..f6e4a83daa 100644 --- a/src/rabbit_vhost_process.erl +++ b/src/rabbit_vhost_process.erl @@ -55,6 +55,7 @@ init([VHost]) -> timer:send_interval(Interval, check_vhost), {ok, VHost} catch _:Reason -> + rabbit_amqqueue:mark_local_durable_queues_stopped(VHost), rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n" " Stacktrace ~p", [VHost, Reason, erlang:get_stacktrace()]), diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index 1d5db93fda..93c26d4e0f 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -23,7 +23,7 @@ -export([init/1]). -export([start_link/0, start/0]). --export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). +-export([init_vhost/1, get_vhost_sup/1, get_vhost_sup/2, save_vhost_sup/3]). -export([delete_on_all_nodes/1]). -export([start_on_all_nodes/1]). @@ -72,7 +72,7 @@ delete_on_all_nodes(VHost) -> ok. stop_and_delete_vhost(VHost) -> - case get_vhost_sup(VHost) of + StopResult = case lookup_vhost_sup_record(VHost) of not_found -> ok; #vhost_sup{wrapper_pid = WrapperPid, vhost_sup_pid = VHostSupPid} -> @@ -84,13 +84,15 @@ stop_and_delete_vhost(VHost) -> [VHostSupPid, VHost]), case supervisor2:terminate_child(?MODULE, WrapperPid) of ok -> - ets:delete(?MODULE, VHost), - ok = rabbit_vhost:delete_storage(VHost); + true = ets:delete(?MODULE, VHost), + ok; Other -> Other end end - end. + end, + ok = rabbit_vhost:delete_storage(VHost), + StopResult. %% We take an optimistic approach whan stopping a remote VHost supervisor. stop_and_delete_vhost(VHost, Node) when Node == node(self()) -> @@ -106,7 +108,7 @@ stop_and_delete_vhost(VHost, Node) -> {error, RpcErr} end. --spec init_vhost(rabbit_types:vhost()) -> ok. +-spec init_vhost(rabbit_types:vhost()) -> ok | {error, {no_such_vhost, rabbit_types:vhsot()}}. init_vhost(VHost) -> case start_vhost(VHost) of {ok, _} -> ok; @@ -130,30 +132,32 @@ init_vhost(VHost) -> end end. --spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}. -vhost_sup(VHost, Node) -> - case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of +-type vhost_error() :: {no_such_vhost, rabbit_types:vhost()} | + {vhost_supervisor_not_running, rabbit_types:vhost()}. + +-spec get_vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, vhost_error() | term()}. +get_vhost_sup(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, get_vhost_sup, [VHost]) of {ok, Pid} when is_pid(Pid) -> {ok, Pid}; + {error, Err} -> + {error, Err}; {badrpc, RpcErr} -> {error, RpcErr} end. --spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}. -vhost_sup(VHost) -> - case vhost_sup_pid(VHost) of - no_pid -> - case start_vhost(VHost) of - {ok, Pid} -> - true = is_vhost_alive(VHost), - {ok, Pid}; - {error, {no_such_vhost, VHost}} -> - {error, {no_such_vhost, VHost}}; - Error -> - throw(Error) - end; - {ok, Pid} when is_pid(Pid) -> - {ok, Pid} +-spec get_vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, vhost_error()}. +get_vhost_sup(VHost) -> + case rabbit_vhost:exists(VHost) of + false -> + {error, {no_such_vhost, VHost}}; + true -> + case vhost_sup_pid(VHost) of + no_pid -> + {error, {vhost_supervisor_not_running, VHost}}; + {ok, Pid} when is_pid(Pid) -> + {ok, Pid} + end end. -spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}. @@ -181,7 +185,7 @@ start_vhost(VHost) -> is_vhost_alive(VHost) -> %% A vhost is considered alive if it's supervision tree is alive and %% saved in the ETS table - case get_vhost_sup(VHost) of + case lookup_vhost_sup_record(VHost) of #vhost_sup{wrapper_pid = WrapperPid, vhost_sup_pid = VHostSupPid, vhost_process_pid = VHostProcessPid} @@ -210,8 +214,8 @@ save_vhost_process(VHost, VHostProcessPid) -> {#vhost_sup.vhost_process_pid, VHostProcessPid}), ok. --spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}. -get_vhost_sup(VHost) -> +-spec lookup_vhost_sup_record(rabbit_types:vhost()) -> #vhost_sup{} | not_found. +lookup_vhost_sup_record(VHost) -> case ets:lookup(?MODULE, VHost) of [] -> not_found; [#vhost_sup{} = VHostSup] -> VHostSup @@ -219,7 +223,7 @@ get_vhost_sup(VHost) -> -spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}. vhost_sup_pid(VHost) -> - case get_vhost_sup(VHost) of + case lookup_vhost_sup_record(VHost) of not_found -> no_pid; #vhost_sup{vhost_sup_pid = Pid} = VHostSup -> diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl index 8e23389bb9..4ae68cdd75 100644 --- a/src/rabbit_vhost_sup_wrapper.erl +++ b/src/rabbit_vhost_sup_wrapper.erl @@ -29,7 +29,12 @@ start_link(VHost) -> %% Using supervisor, because supervisor2 does not stop a started child when %% another one fails to start. Bug? - supervisor:start_link(?MODULE, [VHost]). + case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of + {ok, Pid} -> + {error, {already_started, Pid}}; + {error, _} -> + supervisor:start_link(?MODULE, [VHost]) + end. init([VHost]) -> %% 2 restarts in 5 minutes. One per message store. diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index f1ff2f6685..3a4146fcab 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -61,13 +61,15 @@ groups() -> slave_recovers_after_vhost_failure, slave_recovers_after_vhost_down_an_up, master_migrates_on_vhost_down, - slave_recovers_after_vhost_down_and_master_migrated + slave_recovers_after_vhost_down_and_master_migrated, + queue_survive_adding_dead_vhost_mirror ]}, {cluster_size_3, [], [ change_policy, rapid_change, nodes_policy_should_pick_master_from_its_params, - promote_slave_after_standalone_restart + promote_slave_after_standalone_restart, + queue_survive_adding_dead_vhost_mirror % FIXME: Re-enable those tests when the know issues are % fixed. % failing_random_policies, @@ -222,6 +224,19 @@ rapid_loop(Config, Node, MRef) -> rapid_loop(Config, Node, MRef) end. +queue_survive_adding_dead_vhost_mirror(Config) -> + rabbit_ct_broker_helpers:force_vhost_failure(Config, 1, <<"/">>), + NodeA = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + ChA = rabbit_ct_client_helpers:open_channel(Config, NodeA), + QName = <<"queue_survive_adding_dead_vhost_mirror-q-1">>, + amqp_channel:call(ChA, #'queue.declare'{queue = QName}), + Q = find_queue(QName, NodeA), + Pid = proplists:get_value(pid, Q), + rabbit_ct_broker_helpers:set_ha_policy_all(Config), + %% Queue should not fail + Q1 = find_queue(QName, NodeA), + Pid = proplists:get_value(pid, Q1). + %% Vhost deletion needs to successfully tear down policies and queues %% with policies. At least smoke-test that it doesn't blow up. vhost_deletion(Config) -> diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl index 535725d585..71b74ea104 100644 --- a/test/rabbitmqctl_integration_SUITE.erl +++ b/test/rabbitmqctl_integration_SUITE.erl @@ -31,6 +31,7 @@ -export([list_queues_local/1 ,list_queues_offline/1 ,list_queues_online/1 + ,list_queues_stopped/1 ]). all() -> @@ -44,6 +45,7 @@ groups() -> [list_queues_local ,list_queues_online ,list_queues_offline + ,list_queues_stopped ]} ]. @@ -96,13 +98,19 @@ end_per_group(list_queues, Config0) -> rabbit_ct_helpers:run_steps(Config1, rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()); -end_per_group(global_parameters, Config) -> - rabbit_ct_helpers:run_teardown_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()); end_per_group(_, Config) -> Config. +init_per_testcase(list_queues_stopped, Config0) -> + %% Start node 3 to crash it's queues + rabbit_ct_broker_helpers:start_node(Config0, 2), + %% Make vhost "down" on nodes 2 and 3 + rabbit_ct_broker_helpers:force_vhost_failure(Config0, 1, <<"/">>), + rabbit_ct_broker_helpers:force_vhost_failure(Config0, 2, <<"/">>), + + rabbit_ct_broker_helpers:stop_node(Config0, 2), + rabbit_ct_helpers:testcase_started(Config0, list_queues_stopped); + init_per_testcase(Testcase, Config0) -> rabbit_ct_helpers:testcase_started(Config0, Testcase). @@ -134,6 +142,23 @@ list_queues_offline(Config) -> assert_ctl_queues(Config, 1, ["--offline"], OfflineQueues), ok. +list_queues_stopped(Config) -> + Node1Queues = lists:sort(lists:nth(1, ?config(per_node_queues, Config))), + Node2Queues = lists:sort(lists:nth(2, ?config(per_node_queues, Config))), + Node3Queues = lists:sort(lists:nth(3, ?config(per_node_queues, Config))), + + %% All queues are listed + ListedQueues = + [ {Name, State} + || [Name, State] <- rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "state"]) ], + + [ <<"running">> = proplists:get_value(Q, ListedQueues) || Q <- Node1Queues ], + %% Node is running. Vhost is down + [ <<"stopped">> = proplists:get_value(Q, ListedQueues) || Q <- Node2Queues ], + %% Node is not running. Vhost is down + [ <<"down">> = proplists:get_value(Q, ListedQueues) || Q <- Node3Queues ]. + %%---------------------------------------------------------------------------- %% Helpers %%---------------------------------------------------------------------------- diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl index fd613a2f1c..6ed84dcfe3 100644 --- a/test/vhost_SUITE.erl +++ b/test/vhost_SUITE.erl @@ -41,7 +41,9 @@ groups() -> vhost_failure_forces_connection_closure, dead_vhost_connection_refused, vhost_failure_forces_connection_closure_on_failure_node, - dead_vhost_connection_refused_on_failure_node + dead_vhost_connection_refused_on_failure_node, + node_starts_with_dead_vhosts, + node_starts_with_dead_vhosts_and_ignore_slaves ], [ {cluster_size_1_network, [], ClusterSize1Tests}, @@ -107,9 +109,24 @@ init_per_testcase(Testcase, Config) -> Config. end_per_testcase(Testcase, Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + case Testcase of + cluster_vhost_deletion_forces_connection_closure -> ok; + single_node_vhost_deletion_forces_connection_closure -> ok; + _ -> + delete_vhost(Config, VHost2) + end, + delete_vhost(Config, VHost1), clear_all_connection_tracking_tables(Config), rabbit_ct_helpers:testcase_finished(Config, Testcase). +delete_vhost(Config, VHost) -> + case rabbit_ct_broker_helpers:delete_vhost(Config, VHost) of + ok -> ok; + {error, {no_such_vhost, _}} -> ok + end. + clear_all_connection_tracking_tables(Config) -> [rabbit_ct_broker_helpers:rpc(Config, N, @@ -120,6 +137,7 @@ clear_all_connection_tracking_tables(Config) -> %% ------------------------------------------------------------------- %% Test cases. %% ------------------------------------------------------------------- + single_node_vhost_deletion_forces_connection_closure(Config) -> VHost1 = <<"vhost1">>, VHost2 = <<"vhost2">>, @@ -141,9 +159,7 @@ single_node_vhost_deletion_forces_connection_closure(Config) -> ?assertEqual(0, count_connections_in(Config, VHost2)), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, VHost1)), - - rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + ?assertEqual(0, count_connections_in(Config, VHost1)). vhost_failure_forces_connection_closure(Config) -> VHost1 = <<"vhost1">>, @@ -161,15 +177,12 @@ vhost_failure_forces_connection_closure(Config) -> [_Conn2] = open_connections(Config, [{0, VHost2}]), ?assertEqual(1, count_connections_in(Config, VHost2)), - force_vhost_failure(Config, VHost2), + rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2), timer:sleep(200), ?assertEqual(0, count_connections_in(Config, VHost2)), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, VHost1)), - - rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), - rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + ?assertEqual(0, count_connections_in(Config, VHost1)). dead_vhost_connection_refused(Config) -> VHost1 = <<"vhost1">>, @@ -181,7 +194,7 @@ dead_vhost_connection_refused(Config) -> ?assertEqual(0, count_connections_in(Config, VHost1)), ?assertEqual(0, count_connections_in(Config, VHost2)), - force_vhost_failure(Config, VHost2), + rabbit_ct_broker_helpers:force_vhost_failure(Config, VHost2), timer:sleep(200), [_Conn1] = open_connections(Config, [{0, VHost1}]), @@ -190,10 +203,7 @@ dead_vhost_connection_refused(Config) -> [_Conn2] = open_connections(Config, [{0, VHost2}]), ?assertEqual(0, count_connections_in(Config, VHost2)), - expect_that_client_connection_is_rejected(Config, 0, VHost2), - - rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), - rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + expect_that_client_connection_is_rejected(Config, 0, VHost2). vhost_failure_forces_connection_closure_on_failure_node(Config) -> @@ -213,7 +223,7 @@ vhost_failure_forces_connection_closure_on_failure_node(Config) -> [_Conn21] = open_connections(Config, [{1, VHost2}]), ?assertEqual(2, count_connections_in(Config, VHost2)), - force_vhost_failure(Config, 0, VHost2), + rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2), timer:sleep(200), %% Vhost2 connection on node 1 is still alive ?assertEqual(1, count_connections_in(Config, VHost2)), @@ -221,10 +231,7 @@ vhost_failure_forces_connection_closure_on_failure_node(Config) -> ?assertEqual(1, count_connections_in(Config, VHost1)), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, VHost1)), - - rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), - rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + ?assertEqual(0, count_connections_in(Config, VHost1)). dead_vhost_connection_refused_on_failure_node(Config) -> VHost1 = <<"vhost1">>, @@ -236,7 +243,7 @@ dead_vhost_connection_refused_on_failure_node(Config) -> ?assertEqual(0, count_connections_in(Config, VHost1)), ?assertEqual(0, count_connections_in(Config, VHost2)), - force_vhost_failure(Config, 0, VHost2), + rabbit_ct_broker_helpers:force_vhost_failure(Config, 0, VHost2), timer:sleep(200), %% Can open connections to vhost1 on node 0 and 1 [_Conn10] = open_connections(Config, [{0, VHost1}]), @@ -257,37 +264,6 @@ dead_vhost_connection_refused_on_failure_node(Config) -> rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). -force_vhost_failure(Config, VHost) -> force_vhost_failure(Config, 0, VHost). - -force_vhost_failure(Config, Node, VHost) -> - force_vhost_failure(Config, Node, VHost, 10). - -force_vhost_failure(_Config, _Node, VHost, 0) -> - error({failed_to_force_vhost_failure, no_more_attempts_left, VHost}); -force_vhost_failure(Config, Node, VHost, Attempts) -> - MessageStorePid = get_message_store_pid(Config, VHost), - rabbit_ct_broker_helpers:rpc(Config, Node, - erlang, exit, - [MessageStorePid, force_vhost_failure]), - %% Give it a time to fail - timer:sleep(200), - case rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_vhost_sup_sup, is_vhost_alive, - [VHost]) of - true -> force_vhost_failure(Config, Node, VHost, Attempts - 1); - false -> ok - end. - -get_message_store_pid(Config, VHost) -> - {ok, VHostSup} = rabbit_ct_broker_helpers:rpc(Config, 0, - rabbit_vhost_sup_sup, vhost_sup, [VHost]), - Children = rabbit_ct_broker_helpers:rpc(Config, 0, - supervisor, which_children, - [VHostSup]), - [MsgStorePid] = [Pid || {Name, Pid, _, _} <- Children, - Name == msg_store_persistent], - MsgStorePid. - cluster_vhost_deletion_forces_connection_closure(Config) -> VHost1 = <<"vhost1">>, VHost2 = <<"vhost2">>, @@ -309,9 +285,93 @@ cluster_vhost_deletion_forces_connection_closure(Config) -> ?assertEqual(0, count_connections_in(Config, VHost2)), close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost1)). - rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). +node_starts_with_dead_vhosts(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1, VHost1), + {ok, Chan} = amqp_connection:open_channel(Conn), + + QName = <<"node_starts_with_dead_vhosts-q-1">>, + amqp_channel:call(Chan, #'queue.declare'{queue = QName, durable = true}), + rabbit_ct_client_helpers:publish(Chan, QName, 10), + + DataStore1 = rabbit_ct_broker_helpers:rpc( + Config, 1, rabbit_vhost, msg_store_dir_path, [VHost1]), + + rabbit_ct_broker_helpers:stop_node(Config, 1), + + file:write_file(filename:join(DataStore1, "recovery.dets"), <<"garbage">>), + + %% The node should start without a vhost + ok = rabbit_ct_broker_helpers:start_node(Config, 1), + + timer:sleep(500), + + false = rabbit_ct_broker_helpers:rpc(Config, 1, + rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]), + true = rabbit_ct_broker_helpers:rpc(Config, 1, + rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]). + +node_starts_with_dead_vhosts_and_ignore_slaves(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + true = rabbit_ct_broker_helpers:rpc(Config, 1, + rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]), + true = rabbit_ct_broker_helpers:rpc(Config, 1, + rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]), + + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost1), + {ok, Chan} = amqp_connection:open_channel(Conn), + + QName = <<"node_starts_with_dead_vhosts_and_ignore_slaves-q-0">>, + amqp_channel:call(Chan, #'queue.declare'{queue = QName, durable = true}), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_policy, set, + [VHost1, <<"mirror">>, <<".*">>, [{<<"ha-mode">>, <<"all">>}], + 0, <<"queues">>, <<"acting-user">>]), + + %% Wait for the queue to create a slave + timer:sleep(300), + + rabbit_ct_client_helpers:publish(Chan, QName, 10), + + {ok, Q} = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_amqqueue, lookup, + [rabbit_misc:r(VHost1, queue, QName)], infinity), + + Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + + #amqqueue{sync_slave_pids = [Pid]} = Q, + + Node1 = node(Pid), + + DataStore1 = rabbit_ct_broker_helpers:rpc( + Config, 1, rabbit_vhost, msg_store_dir_path, [VHost1]), + + rabbit_ct_broker_helpers:stop_node(Config, 1), + + file:write_file(filename:join(DataStore1, "recovery.dets"), <<"garbage">>), + + %% The node should start without a vhost + ok = rabbit_ct_broker_helpers:start_node(Config, 1), + + timer:sleep(500), + + false = rabbit_ct_broker_helpers:rpc(Config, 1, + rabbit_vhost_sup_sup, is_vhost_alive, [VHost1]), + true = rabbit_ct_broker_helpers:rpc(Config, 1, + rabbit_vhost_sup_sup, is_vhost_alive, [VHost2]). %% ------------------------------------------------------------------- %% Helpers |
