diff options
| author | Karl Nilsson <kjnilsson@gmail.com> | 2017-07-25 17:15:38 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-07-25 17:15:38 +0100 |
| commit | 6115475719cc01b582d8fa2440df23467e52bbe6 (patch) | |
| tree | f59e7f41c536407e300c7ae7945b86069d9f0b63 | |
| parent | e66809bbcd33593de73866dac6718726ef9053d8 (diff) | |
| parent | dc82c258e9a51a7b7a2f9fa45e31e09c4c614fa3 (diff) | |
| download | rabbitmq-server-git-6115475719cc01b582d8fa2440df23467e52bbe6.tar.gz | |
Merge pull request #1293 from rabbitmq/rabbitmq-server-close-connection-on-vhost-down
Close connections when vhost is unavailable (supervision tree is down)
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_vhost_process.erl (renamed from src/rabbit_vhost_sup_watcher.erl) | 42 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 146 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_wrapper.erl | 27 | ||||
| -rw-r--r-- | test/per_vhost_connection_limit_SUITE.erl | 57 | ||||
| -rw-r--r-- | test/vhost_SUITE.erl | 376 |
12 files changed, 613 insertions, 139 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index f8c4c6541b..27c4bff810 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -34,7 +34,7 @@ delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1, clear_tracked_connection_tables_for_this_node/0, register_connection/1, unregister_connection/1, - list/0, list/1, list_on_node/1, list_of_user/1, + list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1, tracked_connection_from_connection_created/1, tracked_connection_from_connection_state/1, count_connections_in/1]). @@ -217,6 +217,16 @@ list_on_node(Node) -> catch exit:{aborted, {no_exists, _}} -> [] end. +-spec list_on_node(node(), rabbit_types:vhsot()) -> [rabbit_types:tracked_connection()]. + +list_on_node(Node, VHost) -> + try mnesia:dirty_match_object( + tracked_connection_table_name_for(Node), + #tracked_connection{vhost = VHost, _ = '_'}) + catch exit:{aborted, {no_exists, _}} -> [] + end. + + -spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()]. list_of_user(Username) -> diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index f1b844c60c..3ae17677e0 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -82,6 +82,15 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) -> close_connections(rabbit_connection_tracking:list(VHost), rabbit_misc:format("vhost '~s' is deleted", [VHost])), {ok, State}; +handle_event(#event{type = vhost_down, props = Details}, State) -> + VHost = pget(name, Details), + Node = pget(node, Details), + rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'" + " because the vhost database has stopped working", + [VHost, Node]), + close_connections(rabbit_connection_tracking:list_on_node(Node, VHost), + rabbit_misc:format("vhost '~s' is down", [VHost])), + {ok, State}; handle_event(#event{type = user_deleted, props = Details}, State) -> Username = pget(name, Details), rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]), diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 4b7f06305a..26e8f4d452 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -90,16 +90,21 @@ connect(Creds, VHost, Protocol, Pid, Infos) -> true -> {error, not_allowed}; false -> - case AuthFun() of - {ok, User = #user{username = Username}} -> - notify_auth_result(Username, - user_authentication_success, []), - connect1(User, VHost, Protocol, Pid, Infos); - {refused, Username, Msg, Args} -> - notify_auth_result(Username, - user_authentication_failure, - [{error, rabbit_misc:format(Msg, Args)}]), - {error, {auth_failure, "Refused"}} + case is_vhost_alive(VHost, Creds, Pid) of + false -> + {error, {internal_error, vhost_is_down}}; + true -> + case AuthFun() of + {ok, User = #user{username = Username}} -> + notify_auth_result(Username, + user_authentication_success, []), + connect1(User, VHost, Protocol, Pid, Infos); + {refused, Username, Msg, Args} -> + notify_auth_result(Username, + user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}]), + {error, {auth_failure, "Refused"}} + end end end; false -> {error, broker_not_found_on_node} @@ -140,6 +145,21 @@ maybe_call_connection_info_module(Protocol, Creds, VHost, Pid, Infos) -> [] end. +is_vhost_alive(VHost, {Username, _Password}, Pid) -> + PrintedUsername = case Username of + none -> ""; + _ -> Username + end, + case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of + true -> true; + false -> + rabbit_log_connection:error( + "Error on Direct connection ~p~n" + "access to vhost '~s' refused for user '~s': " + "vhost '~s' is down", + [Pid, VHost, PrintedUsername, VHost]), + false + end. is_over_connection_limit(VHost, {Username, _Password}, Pid) -> PrintedUsername = case Username of diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 275a9127d1..fe78075d0f 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1010,7 +1010,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, ok; {error, RTErr} -> rabbit_log:error("Unable to save message store recovery terms" - "for directory ~p~nError: ~p~n", + " for directory ~p~nError: ~p~n", [Dir, RTErr]) end, State3 #msstate { index_state = undefined, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e23d382d6e..77914a00bf 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -567,7 +567,7 @@ handle_other(handshake_timeout, State) -> throw({handshake_timeout, State#v1.callback}); handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) -> State; -handle_other(heartbeat_timeout, +handle_other(heartbeat_timeout, State = #v1{connection = #connection{timeout_sec = T}}) -> maybe_emit_stats(State), throw({heartbeat_timeout, T}); @@ -623,7 +623,7 @@ send_blocked(#v1{connection = #connection{protocol = Protocol, sock = Sock}, Reason) -> case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of {bool, true} -> - + ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, Protocol); _ -> @@ -1164,6 +1164,7 @@ handle_method0(#'connection.open'{virtual_host = VHost}, ok = is_over_connection_limit(VHost, User), ok = rabbit_access_control:check_vhost_access(User, VHost, Sock), + ok = is_vhost_alive(VHost, User), NewConnection = Connection#connection{vhost = VHost}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), @@ -1209,6 +1210,16 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). +is_vhost_alive(VHostPath, User) -> + case rabbit_vhost_sup_sup:is_vhost_alive(VHostPath) of + true -> ok; + false -> + rabbit_misc:protocol_error(internal_error, + "access to vhost '~s' refused for user '~s': " + "vhost '~s' is down", + [VHostPath, User#user.username, VHostPath]) + end. + is_over_connection_limit(VHostPath, User) -> try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of false -> ok; @@ -1567,7 +1578,7 @@ maybe_block(State = #v1{connection_state = CS, throttle = Throttle}) -> State1 = State#v1{connection_state = blocked, throttle = update_last_blocked_at(Throttle)}, case CS of - running -> + running -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater); _ -> ok end, @@ -1589,7 +1600,7 @@ maybe_send_unblocked(State = #v1{throttle = Throttle}) -> case should_send_unblocked(Throttle) of true -> ok = send_unblocked(State), - State#v1{throttle = + State#v1{throttle = Throttle#throttle{connection_blocked_message_sent = false}}; false -> State end. @@ -1598,7 +1609,7 @@ maybe_send_blocked_or_unblocked(State = #v1{throttle = Throttle}) -> case should_send_blocked(Throttle) of true -> ok = send_blocked(State, blocked_by_message(Throttle)), - State#v1{throttle = + State#v1{throttle = Throttle#throttle{connection_blocked_message_sent = true}}; false -> maybe_send_unblocked(State) end. @@ -1624,7 +1635,7 @@ control_throttle(State = #v1{connection_state = CS, running -> maybe_block(State1); %% unblock or re-enable blocking blocked -> maybe_block(maybe_unblock(State1)); - _ -> State1 + _ -> State1 end. augment_connection_log_name(#connection{client_properties = ClientProperties, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 7513c23925..df4812da49 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -26,6 +26,7 @@ -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). -export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). -export([delete_storage/1]). +-export([vhost_down/1]). -spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). -spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). @@ -54,8 +55,9 @@ recover() -> %% rabbit_vhost_sup_sup will start the actual recovery. %% So recovery will be run every time a vhost supervisor is restarted. ok = rabbit_vhost_sup_sup:start(), - [{ok, _} = rabbit_vhost_sup_sup:vhost_sup(VHost) - || VHost <- rabbit_vhost:list()], + + [ ok = rabbit_vhost_sup_sup:init_vhost(VHost) + || VHost <- rabbit_vhost:list()], ok. recover(VHost) -> @@ -144,6 +146,12 @@ delete(VHostPath, ActingUser) -> rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath), ok. +vhost_down(VHostPath) -> + ok = rabbit_event:notify(vhost_down, + [{name, VHostPath}, + {node, node()}, + {user_who_performed_action, ?INTERNAL_USER}]). + delete_storage(VHost) -> VhostDir = msg_store_dir_path(VHost), rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]), diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_process.erl index be2c5f20bb..c16a24eb07 100644 --- a/src/rabbit_vhost_sup_watcher.erl +++ b/src/rabbit_vhost_process.erl @@ -14,10 +14,21 @@ %% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. %% -%% This module implements a watcher process which should stop -%% the parent supervisor if its vhost is missing from the mnesia DB +%% This module implements a vhost identity process. --module(rabbit_vhost_sup_watcher). +%% On start this process will try to recover the vhost data and +%% processes structure (queues and message stores). +%% If recovered successfully, the process will save it's PID +%% to vhost process registry. If vhost process PID is in the registry and the +%% process is alive - the vhost is considered running. + +%% On termination, the ptocess will notify of vhost going down. + +%% The process will also check periodically if the vhost still +%% present in mnesia DB and stop the vhost supervision tree when it +%% desapears. + +-module(rabbit_vhost_process). -include("rabbit.hrl"). @@ -29,15 +40,26 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - start_link(VHost) -> gen_server2:start_link(?MODULE, [VHost], []). init([VHost]) -> - Interval = interval(), - timer:send_interval(Interval, check_vhost), - {ok, VHost}. + process_flag(trap_exit, true), + rabbit_log:debug("Recovering data for VHost ~p~n", [VHost]), + try + %% Recover the vhost data and save it to vhost registry. + ok = rabbit_vhost:recover(VHost), + rabbit_vhost_sup_sup:save_vhost_process(VHost, self()), + Interval = interval(), + timer:send_interval(Interval, check_vhost), + {ok, VHost} + catch _:Reason -> + rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n" + " Stacktrace ~p", + [VHost, Reason, erlang:get_stacktrace()]), + {stop, Reason} + end. handle_call(_,_,VHost) -> {reply, ok, VHost}. @@ -64,7 +86,11 @@ handle_info(check_vhost, VHost) -> handle_info(_, VHost) -> {noreply, VHost}. -terminate(_, _) -> ok. +terminate(shutdown, VHost) -> + %% Notify that vhost is stopped. + rabbit_vhost:vhost_down(VHost); +terminate(_, _VHost) -> + ok. code_change(_OldVsn, VHost, _Extra) -> {ok, VHost}. diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl index bbf006fbd3..c2b9e4047d 100644 --- a/src/rabbit_vhost_sup.erl +++ b/src/rabbit_vhost_sup.erl @@ -28,8 +28,4 @@ start_link(VHost) -> supervisor2:start_link(?MODULE, [VHost]). init([VHost]) -> - {ok, {{one_for_all, 0, 1}, - [{rabbit_vhost_sup_watcher, - {rabbit_vhost_sup_watcher, start_link, [VHost]}, - intrinsic, ?WORKER_WAIT, worker, - [rabbit_vhost_sup]}]}}. + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index 7ecac7a5d4..1d5db93fda 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -23,14 +23,17 @@ -export([init/1]). -export([start_link/0, start/0]). --export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). +-export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). -export([delete_on_all_nodes/1]). --export([start_vhost/1, start_vhost/2, start_on_all_nodes/1, vhost_restart_strategy/0]). +-export([start_on_all_nodes/1]). + +-export([save_vhost_process/2]). +-export([is_vhost_alive/1]). %% Internal --export([stop_and_delete_vhost/1]). +-export([stop_and_delete_vhost/1, start_vhost/1]). --record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}). +-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}). start() -> case supervisor:start_child(rabbit_sup, {?MODULE, @@ -56,48 +59,23 @@ init([]) -> [rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}. start_on_all_nodes(VHost) -> - [ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], - ok. + NodesStart = [ {Node, start_vhost(VHost, Node)} + || Node <- rabbit_nodes:all_running() ], + Failures = lists:filter(fun({_, {ok, _}}) -> false; (_) -> true end, NodesStart), + case Failures of + [] -> ok; + Errors -> {error, {failed_to_start_vhost_on_nodes, Errors}} + end. delete_on_all_nodes(VHost) -> [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], ok. -start_vhost(VHost, Node) when Node == node(self()) -> - start_vhost(VHost); -start_vhost(VHost, Node) -> - case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of - {ok, Pid} when is_pid(Pid) -> - {ok, Pid}; - {badrpc, RpcErr} -> - {error, RpcErr} - end. - -start_vhost(VHost) -> - case rabbit_vhost:exists(VHost) of - false -> {error, {no_such_vhost, VHost}}; - true -> - case vhost_sup_pid(VHost) of - no_pid -> - case supervisor2:start_child(?MODULE, [VHost]) of - {ok, _} -> ok; - {error, {already_started, _}} -> ok; - Error -> - rabbit_log:error("Could not start process tree " - "for vhost '~s': ~p", [VHost, Error]), - throw(Error) - end, - {ok, _} = vhost_sup_pid(VHost); - {ok, Pid} when is_pid(Pid) -> - {ok, Pid} - end - end. - stop_and_delete_vhost(VHost) -> case get_vhost_sup(VHost) of not_found -> ok; #vhost_sup{wrapper_pid = WrapperPid, - vhost_sup_pid = VHostSupPid} = VHostSup -> + vhost_sup_pid = VHostSupPid} -> case is_process_alive(WrapperPid) of false -> ok; true -> @@ -106,7 +84,7 @@ stop_and_delete_vhost(VHost) -> [VHostSupPid, VHost]), case supervisor2:terminate_child(?MODULE, WrapperPid) of ok -> - ets:delete_object(?MODULE, VHostSup), + ets:delete(?MODULE, VHost), ok = rabbit_vhost:delete_storage(VHost); Other -> Other @@ -128,9 +106,31 @@ stop_and_delete_vhost(VHost, Node) -> {error, RpcErr} end. --spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}. -vhost_sup(VHost, Local) when Local == node(self()) -> - vhost_sup(VHost); +-spec init_vhost(rabbit_types:vhost()) -> ok. +init_vhost(VHost) -> + case start_vhost(VHost) of + {ok, _} -> ok; + {error, {no_such_vhost, VHost}} -> + {error, {no_such_vhost, VHost}}; + {error, Reason} -> + case vhost_restart_strategy() of + permanent -> + rabbit_log:error( + "Unable to initialize vhost data store for vhost '~s'." + " Reason: ~p", + [VHost, Reason]), + throw({error, Reason}); + transient -> + rabbit_log:warning( + "Unable to initialize vhost data store for vhost '~s'." + " The vhost will be stopped for this node. " + " Reason: ~p", + [VHost, Reason]), + ok + end + end. + +-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}. vhost_sup(VHost, Node) -> case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of {ok, Pid} when is_pid(Pid) -> @@ -139,9 +139,63 @@ vhost_sup(VHost, Node) -> {error, RpcErr} end. --spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}. +-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}. vhost_sup(VHost) -> - start_vhost(VHost). + case vhost_sup_pid(VHost) of + no_pid -> + case start_vhost(VHost) of + {ok, Pid} -> + true = is_vhost_alive(VHost), + {ok, Pid}; + {error, {no_such_vhost, VHost}} -> + {error, {no_such_vhost, VHost}}; + Error -> + throw(Error) + end; + {ok, Pid} when is_pid(Pid) -> + {ok, Pid} + end. + +-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}. +start_vhost(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {badrpc, RpcErr} -> + {error, RpcErr} + end. + +-spec start_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. +start_vhost(VHost) -> + case rabbit_vhost:exists(VHost) of + false -> {error, {no_such_vhost, VHost}}; + true -> + case supervisor2:start_child(?MODULE, [VHost]) of + {ok, Pid} -> {ok, Pid}; + {error, {already_started, Pid}} -> {ok, Pid}; + {error, Err} -> {error, Err} + end + end. + +-spec is_vhost_alive(rabbit_types:vhost()) -> boolean(). +is_vhost_alive(VHost) -> +%% A vhost is considered alive if it's supervision tree is alive and +%% saved in the ETS table + case get_vhost_sup(VHost) of + #vhost_sup{wrapper_pid = WrapperPid, + vhost_sup_pid = VHostSupPid, + vhost_process_pid = VHostProcessPid} + when is_pid(WrapperPid), + is_pid(VHostSupPid), + is_pid(VHostProcessPid) -> + is_process_alive(WrapperPid) + andalso + is_process_alive(VHostSupPid) + andalso + is_process_alive(VHostProcessPid); + _ -> false + end. + -spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok. save_vhost_sup(VHost, WrapperPid, VHostPid) -> @@ -150,6 +204,12 @@ save_vhost_sup(VHost, WrapperPid, VHostPid) -> wrapper_pid = WrapperPid}), ok. +-spec save_vhost_process(rabbit_types:vhost(), pid()) -> ok. +save_vhost_process(VHost, VHostProcessPid) -> + true = ets:update_element(?MODULE, VHost, + {#vhost_sup.vhost_process_pid, VHostProcessPid}), + ok. + -spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}. get_vhost_sup(VHost) -> case ets:lookup(?MODULE, VHost) of diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl index 8dbec30bff..8e23389bb9 100644 --- a/src/rabbit_vhost_sup_wrapper.erl +++ b/src/rabbit_vhost_sup_wrapper.erl @@ -27,24 +27,35 @@ -export([start_vhost_sup/1]). start_link(VHost) -> - supervisor2:start_link(?MODULE, [VHost]). + %% Using supervisor, because supervisor2 does not stop a started child when + %% another one fails to start. Bug? + supervisor:start_link(?MODULE, [VHost]). init([VHost]) -> %% 2 restarts in 5 minutes. One per message store. {ok, {{one_for_all, 2, 300}, - [{rabbit_vhost_sup, - {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, - permanent, infinity, supervisor, - [rabbit_vhost_sup]}]}}. + [ + %% rabbit_vhost_sup is an empty supervisor container for + %% all data processes. + {rabbit_vhost_sup, + {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, + permanent, infinity, supervisor, + [rabbit_vhost_sup]}, + %% rabbit_vhost_process is a vhost identity process, which + %% is responsible for data recovery and vhost aliveness status. + %% See the module comments for more info. + {rabbit_vhost_process, + {rabbit_vhost_process, start_link, [VHost]}, + permanent, ?WORKER_WAIT, worker, + [rabbit_vhost_process]}]}}. + start_vhost_sup(VHost) -> case rabbit_vhost_sup:start_link(VHost) of {ok, Pid} -> %% Save vhost sup record with wrapper pid and vhost sup pid. ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid), - %% We can start recover as soon as we have vhost_sup record saved - ok = rabbit_vhost:recover(VHost), {ok, Pid}; Other -> Other - end. + end.
\ No newline at end of file diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl index efc5ca830e..68deec12cb 100644 --- a/test/per_vhost_connection_limit_SUITE.erl +++ b/test/per_vhost_connection_limit_SUITE.erl @@ -39,8 +39,7 @@ groups() -> single_node_single_vhost_limit, single_node_single_vhost_zero_limit, single_node_multiple_vhosts_limit, - single_node_multiple_vhosts_zero_limit, - single_node_vhost_deletion_forces_connection_closure + single_node_multiple_vhosts_zero_limit ], ClusterSize2Tests = [ most_basic_cluster_connection_count, @@ -51,8 +50,7 @@ groups() -> cluster_single_vhost_limit, cluster_single_vhost_limit2, cluster_single_vhost_zero_limit, - cluster_multiple_vhosts_zero_limit, - cluster_vhost_deletion_forces_connection_closure + cluster_multiple_vhosts_zero_limit ], [ {cluster_size_1_network, [], ClusterSize1Tests}, @@ -639,57 +637,6 @@ cluster_multiple_vhosts_zero_limit(Config) -> set_vhost_connection_limit(Config, VHost1, -1), set_vhost_connection_limit(Config, VHost2, -1). - -single_node_vhost_deletion_forces_connection_closure(Config) -> - VHost1 = <<"vhost1">>, - VHost2 = <<"vhost2">>, - - set_up_vhost(Config, VHost1), - set_up_vhost(Config, VHost2), - - ?assertEqual(0, count_connections_in(Config, VHost1)), - ?assertEqual(0, count_connections_in(Config, VHost2)), - - [Conn1] = open_connections(Config, [{0, VHost1}]), - ?assertEqual(1, count_connections_in(Config, VHost1)), - - [_Conn2] = open_connections(Config, [{0, VHost2}]), - ?assertEqual(1, count_connections_in(Config, VHost2)), - - rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, VHost2)), - - close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, VHost1)), - - rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). - -cluster_vhost_deletion_forces_connection_closure(Config) -> - VHost1 = <<"vhost1">>, - VHost2 = <<"vhost2">>, - - set_up_vhost(Config, VHost1), - set_up_vhost(Config, VHost2), - - ?assertEqual(0, count_connections_in(Config, VHost1)), - ?assertEqual(0, count_connections_in(Config, VHost2)), - - [Conn1] = open_connections(Config, [{0, VHost1}]), - ?assertEqual(1, count_connections_in(Config, VHost1)), - - [_Conn2] = open_connections(Config, [{1, VHost2}]), - ?assertEqual(1, count_connections_in(Config, VHost2)), - - rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, VHost2)), - - close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, VHost1)), - - rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). - vhost_limit_after_node_renamed(Config) -> A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl new file mode 100644 index 0000000000..a519d01af5 --- /dev/null +++ b/test/vhost_SUITE.erl @@ -0,0 +1,376 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(vhost_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +all() -> + [ + {group, cluster_size_1_network}, + {group, cluster_size_2_network}, + {group, cluster_size_1_direct}, + {group, cluster_size_2_direct} + ]. + +groups() -> + ClusterSize1Tests = [ + single_node_vhost_deletion_forces_connection_closure, + vhost_failure_forces_connection_closure, + dead_vhost_connection_refused + ], + ClusterSize2Tests = [ + cluster_vhost_deletion_forces_connection_closure, + vhost_failure_forces_connection_closure, + dead_vhost_connection_refused, + vhost_failure_forces_connection_closure_on_failure_node, + dead_vhost_connection_refused_on_failure_node + ], + [ + {cluster_size_1_network, [], ClusterSize1Tests}, + {cluster_size_2_network, [], ClusterSize2Tests}, + {cluster_size_1_direct, [], ClusterSize1Tests}, + {cluster_size_2_direct, [], ClusterSize2Tests} + ]. + +suite() -> + [ + %% If a test hangs, no need to wait for 30 minutes. + {timetrap, {minutes, 8}} + ]. + +%% see partitions_SUITE +-define(DELAY, 9000). + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config, [ + fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1 + ]). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(cluster_size_1_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_1_network, Config1, 1); +init_per_group(cluster_size_2_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_2_network, Config1, 2); +init_per_group(cluster_size_1_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_1_direct, Config1, 1); +init_per_group(cluster_size_2_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_2_direct, Config1, 2). + +init_per_multinode_group(Group, Config, NodeCount) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, NodeCount}, + {rmq_nodename_suffix, Suffix} + ]), + + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + clear_all_connection_tracking_tables(Config), + Config. + +end_per_testcase(Testcase, Config) -> + clear_all_connection_tracking_tables(Config), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +clear_all_connection_tracking_tables(Config) -> + [rabbit_ct_broker_helpers:rpc(Config, + N, + rabbit_connection_tracking, + clear_tracked_connection_tables_for_this_node, + []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)]. + +%% ------------------------------------------------------------------- +%% Test cases. +%% ------------------------------------------------------------------- +single_node_vhost_deletion_forces_connection_closure(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + [Conn1] = open_connections(Config, [{0, VHost1}]), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + [_Conn2] = open_connections(Config, [{0, VHost2}]), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, VHost1)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + +vhost_failure_forces_connection_closure(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + [Conn1] = open_connections(Config, [{0, VHost1}]), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + [_Conn2] = open_connections(Config, [{0, VHost2}]), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + force_vhost_failure(Config, VHost2), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, VHost1)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + +dead_vhost_connection_refused(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + force_vhost_failure(Config, VHost2), + timer:sleep(200), + + [_Conn1] = open_connections(Config, [{0, VHost1}]), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + [_Conn2] = open_connections(Config, [{0, VHost2}]), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + expect_that_client_connection_is_rejected(Config, 0, VHost2), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + + +vhost_failure_forces_connection_closure_on_failure_node(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + [Conn1] = open_connections(Config, [{0, VHost1}]), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + [_Conn20] = open_connections(Config, [{0, VHost2}]), + [_Conn21] = open_connections(Config, [{1, VHost2}]), + ?assertEqual(2, count_connections_in(Config, VHost2)), + + force_vhost_failure(Config, 0, VHost2), + timer:sleep(200), + %% Vhost2 connection on node 1 is still alive + ?assertEqual(1, count_connections_in(Config, VHost2)), + %% Vhost1 connection on node 0 is still alive + ?assertEqual(1, count_connections_in(Config, VHost1)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, VHost1)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + +dead_vhost_connection_refused_on_failure_node(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + force_vhost_failure(Config, 0, VHost2), + timer:sleep(200), + %% Can open connections to vhost1 on node 0 and 1 + [_Conn10] = open_connections(Config, [{0, VHost1}]), + ?assertEqual(1, count_connections_in(Config, VHost1)), + [_Conn11] = open_connections(Config, [{1, VHost1}]), + ?assertEqual(2, count_connections_in(Config, VHost1)), + + %% Connection on vhost2 on node 0 is refused + [_Conn20] = open_connections(Config, [{0, VHost2}]), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + expect_that_client_connection_is_rejected(Config, 0, VHost2), + + %% Can open connections to vhost2 on node 1 + [_Conn21] = open_connections(Config, [{1, VHost2}]), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + +force_vhost_failure(Config, VHost) -> force_vhost_failure(Config, 0, VHost). + +force_vhost_failure(Config, Node, VHost) -> + force_vhost_failure(Config, Node, VHost, 10). + +force_vhost_failure(_Config, _Node, VHost, 0) -> + error({failed_to_force_vhost_failure, no_more_attempts_left, VHost}); +force_vhost_failure(Config, Node, VHost, Attempts) -> + MessageStorePid = get_message_store_pid(Config, VHost), + rabbit_ct_broker_helpers:rpc(Config, Node, + erlang, exit, + [MessageStorePid, force_vhost_failure]), + %% Give it a time to fail + timer:sleep(200), + case rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_vhost_sup_sup, is_vhost_alive, + [VHost]) of + true -> force_vhost_failure(Config, Node, VHost, Attempts - 1); + false -> ok + end. + +get_message_store_pid(Config, VHost) -> + {ok, VHostSup} = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_vhost_sup_sup, vhost_sup, [VHost]), + Children = rabbit_ct_broker_helpers:rpc(Config, 0, + supervisor, which_children, + [VHostSup]), + [MsgStorePid] = [Pid || {Name, Pid, _, _} <- Children, + Name == msg_store_persistent], + MsgStorePid. + +cluster_vhost_deletion_forces_connection_closure(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + [Conn1] = open_connections(Config, [{0, VHost1}]), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + [_Conn2] = open_connections(Config, [{1, VHost2}]), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, VHost1)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +open_connections(Config, NodesAndVHosts) -> + % Randomly select connection type + OpenConnectionFun = case ?config(connection_type, Config) of + network -> open_unmanaged_connection; + direct -> open_unmanaged_connection_direct + end, + Conns = lists:map(fun + ({Node, VHost}) -> + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node, + VHost); + (Node) -> + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node) + end, NodesAndVHosts), + timer:sleep(500), + Conns. + +close_connections(Conns) -> + lists:foreach(fun + (Conn) -> + rabbit_ct_client_helpers:close_connection(Conn) + end, Conns), + timer:sleep(500). + +count_connections_in(Config, VHost) -> + count_connections_in(Config, VHost, 0). +count_connections_in(Config, VHost, NodeIndex) -> + timer:sleep(200), + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + count_connections_in, [VHost]). + +set_up_vhost(Config, VHost) -> + rabbit_ct_broker_helpers:add_vhost(Config, VHost), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost), + set_vhost_connection_limit(Config, VHost, -1). + +set_vhost_connection_limit(Config, VHost, Count) -> + set_vhost_connection_limit(Config, 0, VHost, Count). + +set_vhost_connection_limit(Config, NodeIndex, VHost, Count) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + ok = rabbit_ct_broker_helpers:control_action( + set_vhost_limits, Node, + ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"], + [{"-p", binary_to_list(VHost)}]). + +expect_that_client_connection_is_rejected(Config) -> + expect_that_client_connection_is_rejected(Config, 0). + +expect_that_client_connection_is_rejected(Config, NodeIndex) -> + {error, _} = + rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex). + +expect_that_client_connection_is_rejected(Config, NodeIndex, VHost) -> + {error, _} = + rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, VHost). |
