diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-07-25 18:16:46 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-07-25 18:16:46 +0100 |
| commit | 762e7b0006118a2017dce03cfd0fd2ddc2f5b940 (patch) | |
| tree | c9d8b453678ebfa4e905ee343d464d2f44568136 /src | |
| parent | e8a8d90c0e5737503a547431d6ec847a569c7b8c (diff) | |
| parent | 481dd67c6c068d1e3740c6b48fb87c161f483e67 (diff) | |
| download | rabbitmq-server-git-762e7b0006118a2017dce03cfd0fd2ddc2f5b940.tar.gz | |
Merge branch 'master' into rabbitmq-management-446
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_vhost_process.erl (renamed from src/rabbit_vhost_sup_watcher.erl) | 42 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 146 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_wrapper.erl | 27 |
11 files changed, 260 insertions, 95 deletions
diff --git a/src/gm.erl b/src/gm.erl index f67050affb..0da190a57d 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -395,9 +395,8 @@ -define(GROUP_TABLE, gm_group). -define(MAX_BUFFER_SIZE, 100000000). %% 100MB --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). +-define(FORCE_GC_TIMER, 250). -define(VERSION_START, 0). -define(SETS, ordsets). -define(DICT, orddict). @@ -416,6 +415,7 @@ broadcast_buffer, broadcast_buffer_sz, broadcast_timer, + force_gc_timer, txn_executor, shutting_down }). @@ -508,7 +508,8 @@ table_definitions() -> [{Name, [?TABLE_MATCH | Attributes]}]. start_link(GroupName, Module, Args, TxnFun) -> - gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). + gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], + [{spawn_opt, [{fullsweep_after, 0}]}]). leave(Server) -> gen_server2:cast(Server, leave). @@ -551,9 +552,9 @@ init([GroupName, Module, Args, TxnFun]) -> broadcast_buffer = [], broadcast_buffer_sz = 0, broadcast_timer = undefined, + force_gc_timer = undefined, txn_executor = TxnFun, - shutting_down = false }, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + shutting_down = false }}. handle_call({confirmed_broadcast, _Msg}, _From, @@ -708,6 +709,10 @@ handle_cast(leave, State) -> {stop, normal, State}. +handle_info(force_gc, State) -> + garbage_collect(), + noreply(State #state { force_gc_timer = undefined }); + handle_info(flush, State) -> noreply( flush_broadcast_buffer(State #state { broadcast_timer = undefined })); @@ -883,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) -> noreply(State) -> - {noreply, ensure_broadcast_timer(State), flush_timeout(State)}. + {noreply, ensure_timers(State), flush_timeout(State)}. reply(Reply, State) -> - {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}. + {reply, Reply, ensure_timers(State), flush_timeout(State)}. + +ensure_timers(State) -> + ensure_force_gc_timer(ensure_broadcast_timer(State)). -flush_timeout(#state{broadcast_buffer = []}) -> hibernate; +flush_timeout(#state{broadcast_buffer = []}) -> infinity; flush_timeout(_) -> 0. +ensure_force_gc_timer(State = #state { force_gc_timer = TRef }) + when is_reference(TRef) -> + State; +ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) -> + TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc), + State #state { force_gc_timer = TRef }. + ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = undefined }) -> State; @@ -958,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self, end, Self, MembersState), State #state { members_state = MembersState1, broadcast_buffer = [], - broadcast_buffer_sz = 0}. - + broadcast_buffer_sz = 0 }. %% --------------------------------------------------------------------------- %% View construction and inspection diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index f8c4c6541b..27c4bff810 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -34,7 +34,7 @@ delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1, clear_tracked_connection_tables_for_this_node/0, register_connection/1, unregister_connection/1, - list/0, list/1, list_on_node/1, list_of_user/1, + list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1, tracked_connection_from_connection_created/1, tracked_connection_from_connection_state/1, count_connections_in/1]). @@ -217,6 +217,16 @@ list_on_node(Node) -> catch exit:{aborted, {no_exists, _}} -> [] end. +-spec list_on_node(node(), rabbit_types:vhsot()) -> [rabbit_types:tracked_connection()]. + +list_on_node(Node, VHost) -> + try mnesia:dirty_match_object( + tracked_connection_table_name_for(Node), + #tracked_connection{vhost = VHost, _ = '_'}) + catch exit:{aborted, {no_exists, _}} -> [] + end. + + -spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()]. list_of_user(Username) -> diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index f1b844c60c..3ae17677e0 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -82,6 +82,15 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) -> close_connections(rabbit_connection_tracking:list(VHost), rabbit_misc:format("vhost '~s' is deleted", [VHost])), {ok, State}; +handle_event(#event{type = vhost_down, props = Details}, State) -> + VHost = pget(name, Details), + Node = pget(node, Details), + rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'" + " because the vhost database has stopped working", + [VHost, Node]), + close_connections(rabbit_connection_tracking:list_on_node(Node, VHost), + rabbit_misc:format("vhost '~s' is down", [VHost])), + {ok, State}; handle_event(#event{type = user_deleted, props = Details}, State) -> Username = pget(name, Details), rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]), diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 4b7f06305a..26e8f4d452 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -90,16 +90,21 @@ connect(Creds, VHost, Protocol, Pid, Infos) -> true -> {error, not_allowed}; false -> - case AuthFun() of - {ok, User = #user{username = Username}} -> - notify_auth_result(Username, - user_authentication_success, []), - connect1(User, VHost, Protocol, Pid, Infos); - {refused, Username, Msg, Args} -> - notify_auth_result(Username, - user_authentication_failure, - [{error, rabbit_misc:format(Msg, Args)}]), - {error, {auth_failure, "Refused"}} + case is_vhost_alive(VHost, Creds, Pid) of + false -> + {error, {internal_error, vhost_is_down}}; + true -> + case AuthFun() of + {ok, User = #user{username = Username}} -> + notify_auth_result(Username, + user_authentication_success, []), + connect1(User, VHost, Protocol, Pid, Infos); + {refused, Username, Msg, Args} -> + notify_auth_result(Username, + user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}]), + {error, {auth_failure, "Refused"}} + end end end; false -> {error, broker_not_found_on_node} @@ -140,6 +145,21 @@ maybe_call_connection_info_module(Protocol, Creds, VHost, Pid, Infos) -> [] end. +is_vhost_alive(VHost, {Username, _Password}, Pid) -> + PrintedUsername = case Username of + none -> ""; + _ -> Username + end, + case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of + true -> true; + false -> + rabbit_log_connection:error( + "Error on Direct connection ~p~n" + "access to vhost '~s' refused for user '~s': " + "vhost '~s' is down", + [Pid, VHost, PrintedUsername, VHost]), + false + end. is_over_connection_limit(VHost, {Username, _Password}, Pid) -> PrintedUsername = case Username of diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 275a9127d1..fe78075d0f 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1010,7 +1010,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, ok; {error, RTErr} -> rabbit_log:error("Unable to save message store recovery terms" - "for directory ~p~nError: ~p~n", + " for directory ~p~nError: ~p~n", [Dir, RTErr]) end, State3 #msstate { index_state = undefined, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e23d382d6e..77914a00bf 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -567,7 +567,7 @@ handle_other(handshake_timeout, State) -> throw({handshake_timeout, State#v1.callback}); handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) -> State; -handle_other(heartbeat_timeout, +handle_other(heartbeat_timeout, State = #v1{connection = #connection{timeout_sec = T}}) -> maybe_emit_stats(State), throw({heartbeat_timeout, T}); @@ -623,7 +623,7 @@ send_blocked(#v1{connection = #connection{protocol = Protocol, sock = Sock}, Reason) -> case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of {bool, true} -> - + ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, Protocol); _ -> @@ -1164,6 +1164,7 @@ handle_method0(#'connection.open'{virtual_host = VHost}, ok = is_over_connection_limit(VHost, User), ok = rabbit_access_control:check_vhost_access(User, VHost, Sock), + ok = is_vhost_alive(VHost, User), NewConnection = Connection#connection{vhost = VHost}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), @@ -1209,6 +1210,16 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). +is_vhost_alive(VHostPath, User) -> + case rabbit_vhost_sup_sup:is_vhost_alive(VHostPath) of + true -> ok; + false -> + rabbit_misc:protocol_error(internal_error, + "access to vhost '~s' refused for user '~s': " + "vhost '~s' is down", + [VHostPath, User#user.username, VHostPath]) + end. + is_over_connection_limit(VHostPath, User) -> try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of false -> ok; @@ -1567,7 +1578,7 @@ maybe_block(State = #v1{connection_state = CS, throttle = Throttle}) -> State1 = State#v1{connection_state = blocked, throttle = update_last_blocked_at(Throttle)}, case CS of - running -> + running -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater); _ -> ok end, @@ -1589,7 +1600,7 @@ maybe_send_unblocked(State = #v1{throttle = Throttle}) -> case should_send_unblocked(Throttle) of true -> ok = send_unblocked(State), - State#v1{throttle = + State#v1{throttle = Throttle#throttle{connection_blocked_message_sent = false}}; false -> State end. @@ -1598,7 +1609,7 @@ maybe_send_blocked_or_unblocked(State = #v1{throttle = Throttle}) -> case should_send_blocked(Throttle) of true -> ok = send_blocked(State, blocked_by_message(Throttle)), - State#v1{throttle = + State#v1{throttle = Throttle#throttle{connection_blocked_message_sent = true}}; false -> maybe_send_unblocked(State) end. @@ -1624,7 +1635,7 @@ control_throttle(State = #v1{connection_state = CS, running -> maybe_block(State1); %% unblock or re-enable blocking blocked -> maybe_block(maybe_unblock(State1)); - _ -> State1 + _ -> State1 end. augment_connection_log_name(#connection{client_properties = ClientProperties, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 1ddb8c6335..30557fc7be 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -26,6 +26,7 @@ -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). -export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). -export([delete_storage/1]). +-export([vhost_down/1]). -spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). -spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). @@ -54,8 +55,9 @@ recover() -> %% rabbit_vhost_sup_sup will start the actual recovery. %% So recovery will be run every time a vhost supervisor is restarted. ok = rabbit_vhost_sup_sup:start(), - [{ok, _} = rabbit_vhost_sup_sup:vhost_sup(VHost) - || VHost <- rabbit_vhost:list()], + + [ ok = rabbit_vhost_sup_sup:init_vhost(VHost) + || VHost <- rabbit_vhost:list()], ok. recover(VHost) -> @@ -144,6 +146,12 @@ delete(VHostPath, ActingUser) -> rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath), ok. +vhost_down(VHostPath) -> + ok = rabbit_event:notify(vhost_down, + [{name, VHostPath}, + {node, node()}, + {user_who_performed_action, ?INTERNAL_USER}]). + delete_storage(VHost) -> VhostDir = msg_store_dir_path(VHost), rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]), diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_process.erl index be2c5f20bb..e3c815a727 100644 --- a/src/rabbit_vhost_sup_watcher.erl +++ b/src/rabbit_vhost_process.erl @@ -14,10 +14,21 @@ %% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. %% -%% This module implements a watcher process which should stop -%% the parent supervisor if its vhost is missing from the mnesia DB +%% This module implements a vhost identity process. --module(rabbit_vhost_sup_watcher). +%% On start this process will try to recover the vhost data and +%% processes structure (queues and message stores). +%% If recovered successfully, the process will save it's PID +%% to vhost process registry. If vhost process PID is in the registry and the +%% process is alive - the vhost is considered running. + +%% On termination, the ptocess will notify of vhost going down. + +%% The process will also check periodically if the vhost still +%% present in mnesia DB and stop the vhost supervision tree when it +%% disappears. + +-module(rabbit_vhost_process). -include("rabbit.hrl"). @@ -29,15 +40,26 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - start_link(VHost) -> gen_server2:start_link(?MODULE, [VHost], []). init([VHost]) -> - Interval = interval(), - timer:send_interval(Interval, check_vhost), - {ok, VHost}. + process_flag(trap_exit, true), + rabbit_log:debug("Recovering data for VHost ~p~n", [VHost]), + try + %% Recover the vhost data and save it to vhost registry. + ok = rabbit_vhost:recover(VHost), + rabbit_vhost_sup_sup:save_vhost_process(VHost, self()), + Interval = interval(), + timer:send_interval(Interval, check_vhost), + {ok, VHost} + catch _:Reason -> + rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n" + " Stacktrace ~p", + [VHost, Reason, erlang:get_stacktrace()]), + {stop, Reason} + end. handle_call(_,_,VHost) -> {reply, ok, VHost}. @@ -64,7 +86,11 @@ handle_info(check_vhost, VHost) -> handle_info(_, VHost) -> {noreply, VHost}. -terminate(_, _) -> ok. +terminate(shutdown, VHost) -> + %% Notify that vhost is stopped. + rabbit_vhost:vhost_down(VHost); +terminate(_, _VHost) -> + ok. code_change(_OldVsn, VHost, _Extra) -> {ok, VHost}. diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl index bbf006fbd3..82899f8236 100644 --- a/src/rabbit_vhost_sup.erl +++ b/src/rabbit_vhost_sup.erl @@ -27,9 +27,5 @@ start_link(VHost) -> supervisor2:start_link(?MODULE, [VHost]). -init([VHost]) -> - {ok, {{one_for_all, 0, 1}, - [{rabbit_vhost_sup_watcher, - {rabbit_vhost_sup_watcher, start_link, [VHost]}, - intrinsic, ?WORKER_WAIT, worker, - [rabbit_vhost_sup]}]}}. +init([_VHost]) -> + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index 7ecac7a5d4..1d5db93fda 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -23,14 +23,17 @@ -export([init/1]). -export([start_link/0, start/0]). --export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). +-export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). -export([delete_on_all_nodes/1]). --export([start_vhost/1, start_vhost/2, start_on_all_nodes/1, vhost_restart_strategy/0]). +-export([start_on_all_nodes/1]). + +-export([save_vhost_process/2]). +-export([is_vhost_alive/1]). %% Internal --export([stop_and_delete_vhost/1]). +-export([stop_and_delete_vhost/1, start_vhost/1]). --record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}). +-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}). start() -> case supervisor:start_child(rabbit_sup, {?MODULE, @@ -56,48 +59,23 @@ init([]) -> [rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}. start_on_all_nodes(VHost) -> - [ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], - ok. + NodesStart = [ {Node, start_vhost(VHost, Node)} + || Node <- rabbit_nodes:all_running() ], + Failures = lists:filter(fun({_, {ok, _}}) -> false; (_) -> true end, NodesStart), + case Failures of + [] -> ok; + Errors -> {error, {failed_to_start_vhost_on_nodes, Errors}} + end. delete_on_all_nodes(VHost) -> [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], ok. -start_vhost(VHost, Node) when Node == node(self()) -> - start_vhost(VHost); -start_vhost(VHost, Node) -> - case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of - {ok, Pid} when is_pid(Pid) -> - {ok, Pid}; - {badrpc, RpcErr} -> - {error, RpcErr} - end. - -start_vhost(VHost) -> - case rabbit_vhost:exists(VHost) of - false -> {error, {no_such_vhost, VHost}}; - true -> - case vhost_sup_pid(VHost) of - no_pid -> - case supervisor2:start_child(?MODULE, [VHost]) of - {ok, _} -> ok; - {error, {already_started, _}} -> ok; - Error -> - rabbit_log:error("Could not start process tree " - "for vhost '~s': ~p", [VHost, Error]), - throw(Error) - end, - {ok, _} = vhost_sup_pid(VHost); - {ok, Pid} when is_pid(Pid) -> - {ok, Pid} - end - end. - stop_and_delete_vhost(VHost) -> case get_vhost_sup(VHost) of not_found -> ok; #vhost_sup{wrapper_pid = WrapperPid, - vhost_sup_pid = VHostSupPid} = VHostSup -> + vhost_sup_pid = VHostSupPid} -> case is_process_alive(WrapperPid) of false -> ok; true -> @@ -106,7 +84,7 @@ stop_and_delete_vhost(VHost) -> [VHostSupPid, VHost]), case supervisor2:terminate_child(?MODULE, WrapperPid) of ok -> - ets:delete_object(?MODULE, VHostSup), + ets:delete(?MODULE, VHost), ok = rabbit_vhost:delete_storage(VHost); Other -> Other @@ -128,9 +106,31 @@ stop_and_delete_vhost(VHost, Node) -> {error, RpcErr} end. --spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}. -vhost_sup(VHost, Local) when Local == node(self()) -> - vhost_sup(VHost); +-spec init_vhost(rabbit_types:vhost()) -> ok. +init_vhost(VHost) -> + case start_vhost(VHost) of + {ok, _} -> ok; + {error, {no_such_vhost, VHost}} -> + {error, {no_such_vhost, VHost}}; + {error, Reason} -> + case vhost_restart_strategy() of + permanent -> + rabbit_log:error( + "Unable to initialize vhost data store for vhost '~s'." + " Reason: ~p", + [VHost, Reason]), + throw({error, Reason}); + transient -> + rabbit_log:warning( + "Unable to initialize vhost data store for vhost '~s'." + " The vhost will be stopped for this node. " + " Reason: ~p", + [VHost, Reason]), + ok + end + end. + +-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}. vhost_sup(VHost, Node) -> case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of {ok, Pid} when is_pid(Pid) -> @@ -139,9 +139,63 @@ vhost_sup(VHost, Node) -> {error, RpcErr} end. --spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}. +-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}. vhost_sup(VHost) -> - start_vhost(VHost). + case vhost_sup_pid(VHost) of + no_pid -> + case start_vhost(VHost) of + {ok, Pid} -> + true = is_vhost_alive(VHost), + {ok, Pid}; + {error, {no_such_vhost, VHost}} -> + {error, {no_such_vhost, VHost}}; + Error -> + throw(Error) + end; + {ok, Pid} when is_pid(Pid) -> + {ok, Pid} + end. + +-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}. +start_vhost(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {badrpc, RpcErr} -> + {error, RpcErr} + end. + +-spec start_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. +start_vhost(VHost) -> + case rabbit_vhost:exists(VHost) of + false -> {error, {no_such_vhost, VHost}}; + true -> + case supervisor2:start_child(?MODULE, [VHost]) of + {ok, Pid} -> {ok, Pid}; + {error, {already_started, Pid}} -> {ok, Pid}; + {error, Err} -> {error, Err} + end + end. + +-spec is_vhost_alive(rabbit_types:vhost()) -> boolean(). +is_vhost_alive(VHost) -> +%% A vhost is considered alive if it's supervision tree is alive and +%% saved in the ETS table + case get_vhost_sup(VHost) of + #vhost_sup{wrapper_pid = WrapperPid, + vhost_sup_pid = VHostSupPid, + vhost_process_pid = VHostProcessPid} + when is_pid(WrapperPid), + is_pid(VHostSupPid), + is_pid(VHostProcessPid) -> + is_process_alive(WrapperPid) + andalso + is_process_alive(VHostSupPid) + andalso + is_process_alive(VHostProcessPid); + _ -> false + end. + -spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok. save_vhost_sup(VHost, WrapperPid, VHostPid) -> @@ -150,6 +204,12 @@ save_vhost_sup(VHost, WrapperPid, VHostPid) -> wrapper_pid = WrapperPid}), ok. +-spec save_vhost_process(rabbit_types:vhost(), pid()) -> ok. +save_vhost_process(VHost, VHostProcessPid) -> + true = ets:update_element(?MODULE, VHost, + {#vhost_sup.vhost_process_pid, VHostProcessPid}), + ok. + -spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}. get_vhost_sup(VHost) -> case ets:lookup(?MODULE, VHost) of diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl index 8dbec30bff..8e23389bb9 100644 --- a/src/rabbit_vhost_sup_wrapper.erl +++ b/src/rabbit_vhost_sup_wrapper.erl @@ -27,24 +27,35 @@ -export([start_vhost_sup/1]). start_link(VHost) -> - supervisor2:start_link(?MODULE, [VHost]). + %% Using supervisor, because supervisor2 does not stop a started child when + %% another one fails to start. Bug? + supervisor:start_link(?MODULE, [VHost]). init([VHost]) -> %% 2 restarts in 5 minutes. One per message store. {ok, {{one_for_all, 2, 300}, - [{rabbit_vhost_sup, - {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, - permanent, infinity, supervisor, - [rabbit_vhost_sup]}]}}. + [ + %% rabbit_vhost_sup is an empty supervisor container for + %% all data processes. + {rabbit_vhost_sup, + {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, + permanent, infinity, supervisor, + [rabbit_vhost_sup]}, + %% rabbit_vhost_process is a vhost identity process, which + %% is responsible for data recovery and vhost aliveness status. + %% See the module comments for more info. + {rabbit_vhost_process, + {rabbit_vhost_process, start_link, [VHost]}, + permanent, ?WORKER_WAIT, worker, + [rabbit_vhost_process]}]}}. + start_vhost_sup(VHost) -> case rabbit_vhost_sup:start_link(VHost) of {ok, Pid} -> %% Save vhost sup record with wrapper pid and vhost sup pid. ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid), - %% We can start recover as soon as we have vhost_sup record saved - ok = rabbit_vhost:recover(VHost), {ok, Pid}; Other -> Other - end. + end.
\ No newline at end of file |
