summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-07-25 18:16:46 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2017-07-25 18:16:46 +0100
commit762e7b0006118a2017dce03cfd0fd2ddc2f5b940 (patch)
treec9d8b453678ebfa4e905ee343d464d2f44568136 /src
parente8a8d90c0e5737503a547431d6ec847a569c7b8c (diff)
parent481dd67c6c068d1e3740c6b48fb87c161f483e67 (diff)
downloadrabbitmq-server-git-762e7b0006118a2017dce03cfd0fd2ddc2f5b940.tar.gz
Merge branch 'master' into rabbitmq-management-446
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl34
-rw-r--r--src/rabbit_connection_tracking.erl12
-rw-r--r--src/rabbit_connection_tracking_handler.erl9
-rw-r--r--src/rabbit_direct.erl40
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_reader.erl23
-rw-r--r--src/rabbit_vhost.erl12
-rw-r--r--src/rabbit_vhost_process.erl (renamed from src/rabbit_vhost_sup_watcher.erl)42
-rw-r--r--src/rabbit_vhost_sup.erl8
-rw-r--r--src/rabbit_vhost_sup_sup.erl146
-rw-r--r--src/rabbit_vhost_sup_wrapper.erl27
11 files changed, 260 insertions, 95 deletions
diff --git a/src/gm.erl b/src/gm.erl
index f67050affb..0da190a57d 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -395,9 +395,8 @@
-define(GROUP_TABLE, gm_group).
-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
+-define(FORCE_GC_TIMER, 250).
-define(VERSION_START, 0).
-define(SETS, ordsets).
-define(DICT, orddict).
@@ -416,6 +415,7 @@
broadcast_buffer,
broadcast_buffer_sz,
broadcast_timer,
+ force_gc_timer,
txn_executor,
shutting_down
}).
@@ -508,7 +508,8 @@ table_definitions() ->
[{Name, [?TABLE_MATCH | Attributes]}].
start_link(GroupName, Module, Args, TxnFun) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
+ gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun],
+ [{spawn_opt, [{fullsweep_after, 0}]}]).
leave(Server) ->
gen_server2:cast(Server, leave).
@@ -551,9 +552,9 @@ init([GroupName, Module, Args, TxnFun]) ->
broadcast_buffer = [],
broadcast_buffer_sz = 0,
broadcast_timer = undefined,
+ force_gc_timer = undefined,
txn_executor = TxnFun,
- shutting_down = false }, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ shutting_down = false }}.
handle_call({confirmed_broadcast, _Msg}, _From,
@@ -708,6 +709,10 @@ handle_cast(leave, State) ->
{stop, normal, State}.
+handle_info(force_gc, State) ->
+ garbage_collect(),
+ noreply(State #state { force_gc_timer = undefined });
+
handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
@@ -883,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
noreply(State) ->
- {noreply, ensure_broadcast_timer(State), flush_timeout(State)}.
+ {noreply, ensure_timers(State), flush_timeout(State)}.
reply(Reply, State) ->
- {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}.
+ {reply, Reply, ensure_timers(State), flush_timeout(State)}.
+
+ensure_timers(State) ->
+ ensure_force_gc_timer(ensure_broadcast_timer(State)).
-flush_timeout(#state{broadcast_buffer = []}) -> hibernate;
+flush_timeout(#state{broadcast_buffer = []}) -> infinity;
flush_timeout(_) -> 0.
+ensure_force_gc_timer(State = #state { force_gc_timer = TRef })
+ when is_reference(TRef) ->
+ State;
+ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) ->
+ TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc),
+ State #state { force_gc_timer = TRef }.
+
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = undefined }) ->
State;
@@ -958,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self,
end, Self, MembersState),
State #state { members_state = MembersState1,
broadcast_buffer = [],
- broadcast_buffer_sz = 0}.
-
+ broadcast_buffer_sz = 0 }.
%% ---------------------------------------------------------------------------
%% View construction and inspection
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index f8c4c6541b..27c4bff810 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -34,7 +34,7 @@
delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1,
clear_tracked_connection_tables_for_this_node/0,
register_connection/1, unregister_connection/1,
- list/0, list/1, list_on_node/1, list_of_user/1,
+ list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
count_connections_in/1]).
@@ -217,6 +217,16 @@ list_on_node(Node) ->
catch exit:{aborted, {no_exists, _}} -> []
end.
+-spec list_on_node(node(), rabbit_types:vhsot()) -> [rabbit_types:tracked_connection()].
+
+list_on_node(Node, VHost) ->
+ try mnesia:dirty_match_object(
+ tracked_connection_table_name_for(Node),
+ #tracked_connection{vhost = VHost, _ = '_'})
+ catch exit:{aborted, {no_exists, _}} -> []
+ end.
+
+
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()].
list_of_user(Username) ->
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index f1b844c60c..3ae17677e0 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -82,6 +82,15 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
close_connections(rabbit_connection_tracking:list(VHost),
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
{ok, State};
+handle_event(#event{type = vhost_down, props = Details}, State) ->
+ VHost = pget(name, Details),
+ Node = pget(node, Details),
+ rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
+ " because the vhost database has stopped working",
+ [VHost, Node]),
+ close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
+ rabbit_misc:format("vhost '~s' is down", [VHost])),
+ {ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
Username = pget(name, Details),
rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 4b7f06305a..26e8f4d452 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -90,16 +90,21 @@ connect(Creds, VHost, Protocol, Pid, Infos) ->
true ->
{error, not_allowed};
false ->
- case AuthFun() of
- {ok, User = #user{username = Username}} ->
- notify_auth_result(Username,
- user_authentication_success, []),
- connect1(User, VHost, Protocol, Pid, Infos);
- {refused, Username, Msg, Args} ->
- notify_auth_result(Username,
- user_authentication_failure,
- [{error, rabbit_misc:format(Msg, Args)}]),
- {error, {auth_failure, "Refused"}}
+ case is_vhost_alive(VHost, Creds, Pid) of
+ false ->
+ {error, {internal_error, vhost_is_down}};
+ true ->
+ case AuthFun() of
+ {ok, User = #user{username = Username}} ->
+ notify_auth_result(Username,
+ user_authentication_success, []),
+ connect1(User, VHost, Protocol, Pid, Infos);
+ {refused, Username, Msg, Args} ->
+ notify_auth_result(Username,
+ user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}]),
+ {error, {auth_failure, "Refused"}}
+ end
end
end;
false -> {error, broker_not_found_on_node}
@@ -140,6 +145,21 @@ maybe_call_connection_info_module(Protocol, Creds, VHost, Pid, Infos) ->
[]
end.
+is_vhost_alive(VHost, {Username, _Password}, Pid) ->
+ PrintedUsername = case Username of
+ none -> "";
+ _ -> Username
+ end,
+ case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
+ true -> true;
+ false ->
+ rabbit_log_connection:error(
+ "Error on Direct connection ~p~n"
+ "access to vhost '~s' refused for user '~s': "
+ "vhost '~s' is down",
+ [Pid, VHost, PrintedUsername, VHost]),
+ false
+ end.
is_over_connection_limit(VHost, {Username, _Password}, Pid) ->
PrintedUsername = case Username of
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 275a9127d1..fe78075d0f 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1010,7 +1010,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
ok;
{error, RTErr} ->
rabbit_log:error("Unable to save message store recovery terms"
- "for directory ~p~nError: ~p~n",
+ " for directory ~p~nError: ~p~n",
[Dir, RTErr])
end,
State3 #msstate { index_state = undefined,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e23d382d6e..77914a00bf 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -567,7 +567,7 @@ handle_other(handshake_timeout, State) ->
throw({handshake_timeout, State#v1.callback});
handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
State;
-handle_other(heartbeat_timeout,
+handle_other(heartbeat_timeout,
State = #v1{connection = #connection{timeout_sec = T}}) ->
maybe_emit_stats(State),
throw({heartbeat_timeout, T});
@@ -623,7 +623,7 @@ send_blocked(#v1{connection = #connection{protocol = Protocol,
sock = Sock}, Reason) ->
case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
{bool, true} ->
-
+
ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
Protocol);
_ ->
@@ -1164,6 +1164,7 @@ handle_method0(#'connection.open'{virtual_host = VHost},
ok = is_over_connection_limit(VHost, User),
ok = rabbit_access_control:check_vhost_access(User, VHost, Sock),
+ ok = is_vhost_alive(VHost, User),
NewConnection = Connection#connection{vhost = VHost},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -1209,6 +1210,16 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
+is_vhost_alive(VHostPath, User) ->
+ case rabbit_vhost_sup_sup:is_vhost_alive(VHostPath) of
+ true -> ok;
+ false ->
+ rabbit_misc:protocol_error(internal_error,
+ "access to vhost '~s' refused for user '~s': "
+ "vhost '~s' is down",
+ [VHostPath, User#user.username, VHostPath])
+ end.
+
is_over_connection_limit(VHostPath, User) ->
try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of
false -> ok;
@@ -1567,7 +1578,7 @@ maybe_block(State = #v1{connection_state = CS, throttle = Throttle}) ->
State1 = State#v1{connection_state = blocked,
throttle = update_last_blocked_at(Throttle)},
case CS of
- running ->
+ running ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater);
_ -> ok
end,
@@ -1589,7 +1600,7 @@ maybe_send_unblocked(State = #v1{throttle = Throttle}) ->
case should_send_unblocked(Throttle) of
true ->
ok = send_unblocked(State),
- State#v1{throttle =
+ State#v1{throttle =
Throttle#throttle{connection_blocked_message_sent = false}};
false -> State
end.
@@ -1598,7 +1609,7 @@ maybe_send_blocked_or_unblocked(State = #v1{throttle = Throttle}) ->
case should_send_blocked(Throttle) of
true ->
ok = send_blocked(State, blocked_by_message(Throttle)),
- State#v1{throttle =
+ State#v1{throttle =
Throttle#throttle{connection_blocked_message_sent = true}};
false -> maybe_send_unblocked(State)
end.
@@ -1624,7 +1635,7 @@ control_throttle(State = #v1{connection_state = CS,
running -> maybe_block(State1);
%% unblock or re-enable blocking
blocked -> maybe_block(maybe_unblock(State1));
- _ -> State1
+ _ -> State1
end.
augment_connection_log_name(#connection{client_properties = ClientProperties,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 1ddb8c6335..30557fc7be 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -26,6 +26,7 @@
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
-export([delete_storage/1]).
+-export([vhost_down/1]).
-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
@@ -54,8 +55,9 @@ recover() ->
%% rabbit_vhost_sup_sup will start the actual recovery.
%% So recovery will be run every time a vhost supervisor is restarted.
ok = rabbit_vhost_sup_sup:start(),
- [{ok, _} = rabbit_vhost_sup_sup:vhost_sup(VHost)
- || VHost <- rabbit_vhost:list()],
+
+ [ ok = rabbit_vhost_sup_sup:init_vhost(VHost)
+ || VHost <- rabbit_vhost:list()],
ok.
recover(VHost) ->
@@ -144,6 +146,12 @@ delete(VHostPath, ActingUser) ->
rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath),
ok.
+vhost_down(VHostPath) ->
+ ok = rabbit_event:notify(vhost_down,
+ [{name, VHostPath},
+ {node, node()},
+ {user_who_performed_action, ?INTERNAL_USER}]).
+
delete_storage(VHost) ->
VhostDir = msg_store_dir_path(VHost),
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_process.erl
index be2c5f20bb..e3c815a727 100644
--- a/src/rabbit_vhost_sup_watcher.erl
+++ b/src/rabbit_vhost_process.erl
@@ -14,10 +14,21 @@
%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
%%
-%% This module implements a watcher process which should stop
-%% the parent supervisor if its vhost is missing from the mnesia DB
+%% This module implements a vhost identity process.
--module(rabbit_vhost_sup_watcher).
+%% On start this process will try to recover the vhost data and
+%% processes structure (queues and message stores).
+%% If recovered successfully, the process will save it's PID
+%% to vhost process registry. If vhost process PID is in the registry and the
+%% process is alive - the vhost is considered running.
+
+%% On termination, the ptocess will notify of vhost going down.
+
+%% The process will also check periodically if the vhost still
+%% present in mnesia DB and stop the vhost supervision tree when it
+%% disappears.
+
+-module(rabbit_vhost_process).
-include("rabbit.hrl").
@@ -29,15 +40,26 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-
start_link(VHost) ->
gen_server2:start_link(?MODULE, [VHost], []).
init([VHost]) ->
- Interval = interval(),
- timer:send_interval(Interval, check_vhost),
- {ok, VHost}.
+ process_flag(trap_exit, true),
+ rabbit_log:debug("Recovering data for VHost ~p~n", [VHost]),
+ try
+ %% Recover the vhost data and save it to vhost registry.
+ ok = rabbit_vhost:recover(VHost),
+ rabbit_vhost_sup_sup:save_vhost_process(VHost, self()),
+ Interval = interval(),
+ timer:send_interval(Interval, check_vhost),
+ {ok, VHost}
+ catch _:Reason ->
+ rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
+ " Stacktrace ~p",
+ [VHost, Reason, erlang:get_stacktrace()]),
+ {stop, Reason}
+ end.
handle_call(_,_,VHost) ->
{reply, ok, VHost}.
@@ -64,7 +86,11 @@ handle_info(check_vhost, VHost) ->
handle_info(_, VHost) ->
{noreply, VHost}.
-terminate(_, _) -> ok.
+terminate(shutdown, VHost) ->
+ %% Notify that vhost is stopped.
+ rabbit_vhost:vhost_down(VHost);
+terminate(_, _VHost) ->
+ ok.
code_change(_OldVsn, VHost, _Extra) ->
{ok, VHost}.
diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl
index bbf006fbd3..82899f8236 100644
--- a/src/rabbit_vhost_sup.erl
+++ b/src/rabbit_vhost_sup.erl
@@ -27,9 +27,5 @@
start_link(VHost) ->
supervisor2:start_link(?MODULE, [VHost]).
-init([VHost]) ->
- {ok, {{one_for_all, 0, 1},
- [{rabbit_vhost_sup_watcher,
- {rabbit_vhost_sup_watcher, start_link, [VHost]},
- intrinsic, ?WORKER_WAIT, worker,
- [rabbit_vhost_sup]}]}}.
+init([_VHost]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index 7ecac7a5d4..1d5db93fda 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -23,14 +23,17 @@
-export([init/1]).
-export([start_link/0, start/0]).
--export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
+-export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
-export([delete_on_all_nodes/1]).
--export([start_vhost/1, start_vhost/2, start_on_all_nodes/1, vhost_restart_strategy/0]).
+-export([start_on_all_nodes/1]).
+
+-export([save_vhost_process/2]).
+-export([is_vhost_alive/1]).
%% Internal
--export([stop_and_delete_vhost/1]).
+-export([stop_and_delete_vhost/1, start_vhost/1]).
--record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}).
+-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}).
start() ->
case supervisor:start_child(rabbit_sup, {?MODULE,
@@ -56,48 +59,23 @@ init([]) ->
[rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}.
start_on_all_nodes(VHost) ->
- [ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
- ok.
+ NodesStart = [ {Node, start_vhost(VHost, Node)}
+ || Node <- rabbit_nodes:all_running() ],
+ Failures = lists:filter(fun({_, {ok, _}}) -> false; (_) -> true end, NodesStart),
+ case Failures of
+ [] -> ok;
+ Errors -> {error, {failed_to_start_vhost_on_nodes, Errors}}
+ end.
delete_on_all_nodes(VHost) ->
[ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
ok.
-start_vhost(VHost, Node) when Node == node(self()) ->
- start_vhost(VHost);
-start_vhost(VHost, Node) ->
- case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid};
- {badrpc, RpcErr} ->
- {error, RpcErr}
- end.
-
-start_vhost(VHost) ->
- case rabbit_vhost:exists(VHost) of
- false -> {error, {no_such_vhost, VHost}};
- true ->
- case vhost_sup_pid(VHost) of
- no_pid ->
- case supervisor2:start_child(?MODULE, [VHost]) of
- {ok, _} -> ok;
- {error, {already_started, _}} -> ok;
- Error ->
- rabbit_log:error("Could not start process tree "
- "for vhost '~s': ~p", [VHost, Error]),
- throw(Error)
- end,
- {ok, _} = vhost_sup_pid(VHost);
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid}
- end
- end.
-
stop_and_delete_vhost(VHost) ->
case get_vhost_sup(VHost) of
not_found -> ok;
#vhost_sup{wrapper_pid = WrapperPid,
- vhost_sup_pid = VHostSupPid} = VHostSup ->
+ vhost_sup_pid = VHostSupPid} ->
case is_process_alive(WrapperPid) of
false -> ok;
true ->
@@ -106,7 +84,7 @@ stop_and_delete_vhost(VHost) ->
[VHostSupPid, VHost]),
case supervisor2:terminate_child(?MODULE, WrapperPid) of
ok ->
- ets:delete_object(?MODULE, VHostSup),
+ ets:delete(?MODULE, VHost),
ok = rabbit_vhost:delete_storage(VHost);
Other ->
Other
@@ -128,9 +106,31 @@ stop_and_delete_vhost(VHost, Node) ->
{error, RpcErr}
end.
--spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}.
-vhost_sup(VHost, Local) when Local == node(self()) ->
- vhost_sup(VHost);
+-spec init_vhost(rabbit_types:vhost()) -> ok.
+init_vhost(VHost) ->
+ case start_vhost(VHost) of
+ {ok, _} -> ok;
+ {error, {no_such_vhost, VHost}} ->
+ {error, {no_such_vhost, VHost}};
+ {error, Reason} ->
+ case vhost_restart_strategy() of
+ permanent ->
+ rabbit_log:error(
+ "Unable to initialize vhost data store for vhost '~s'."
+ " Reason: ~p",
+ [VHost, Reason]),
+ throw({error, Reason});
+ transient ->
+ rabbit_log:warning(
+ "Unable to initialize vhost data store for vhost '~s'."
+ " The vhost will be stopped for this node. "
+ " Reason: ~p",
+ [VHost, Reason]),
+ ok
+ end
+ end.
+
+-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}.
vhost_sup(VHost, Node) ->
case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of
{ok, Pid} when is_pid(Pid) ->
@@ -139,9 +139,63 @@ vhost_sup(VHost, Node) ->
{error, RpcErr}
end.
--spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}.
+-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}.
vhost_sup(VHost) ->
- start_vhost(VHost).
+ case vhost_sup_pid(VHost) of
+ no_pid ->
+ case start_vhost(VHost) of
+ {ok, Pid} ->
+ true = is_vhost_alive(VHost),
+ {ok, Pid};
+ {error, {no_such_vhost, VHost}} ->
+ {error, {no_such_vhost, VHost}};
+ Error ->
+ throw(Error)
+ end;
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid}
+ end.
+
+-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}.
+start_vhost(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid};
+ {badrpc, RpcErr} ->
+ {error, RpcErr}
+ end.
+
+-spec start_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
+start_vhost(VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ false -> {error, {no_such_vhost, VHost}};
+ true ->
+ case supervisor2:start_child(?MODULE, [VHost]) of
+ {ok, Pid} -> {ok, Pid};
+ {error, {already_started, Pid}} -> {ok, Pid};
+ {error, Err} -> {error, Err}
+ end
+ end.
+
+-spec is_vhost_alive(rabbit_types:vhost()) -> boolean().
+is_vhost_alive(VHost) ->
+%% A vhost is considered alive if it's supervision tree is alive and
+%% saved in the ETS table
+ case get_vhost_sup(VHost) of
+ #vhost_sup{wrapper_pid = WrapperPid,
+ vhost_sup_pid = VHostSupPid,
+ vhost_process_pid = VHostProcessPid}
+ when is_pid(WrapperPid),
+ is_pid(VHostSupPid),
+ is_pid(VHostProcessPid) ->
+ is_process_alive(WrapperPid)
+ andalso
+ is_process_alive(VHostSupPid)
+ andalso
+ is_process_alive(VHostProcessPid);
+ _ -> false
+ end.
+
-spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok.
save_vhost_sup(VHost, WrapperPid, VHostPid) ->
@@ -150,6 +204,12 @@ save_vhost_sup(VHost, WrapperPid, VHostPid) ->
wrapper_pid = WrapperPid}),
ok.
+-spec save_vhost_process(rabbit_types:vhost(), pid()) -> ok.
+save_vhost_process(VHost, VHostProcessPid) ->
+ true = ets:update_element(?MODULE, VHost,
+ {#vhost_sup.vhost_process_pid, VHostProcessPid}),
+ ok.
+
-spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
get_vhost_sup(VHost) ->
case ets:lookup(?MODULE, VHost) of
diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl
index 8dbec30bff..8e23389bb9 100644
--- a/src/rabbit_vhost_sup_wrapper.erl
+++ b/src/rabbit_vhost_sup_wrapper.erl
@@ -27,24 +27,35 @@
-export([start_vhost_sup/1]).
start_link(VHost) ->
- supervisor2:start_link(?MODULE, [VHost]).
+ %% Using supervisor, because supervisor2 does not stop a started child when
+ %% another one fails to start. Bug?
+ supervisor:start_link(?MODULE, [VHost]).
init([VHost]) ->
%% 2 restarts in 5 minutes. One per message store.
{ok, {{one_for_all, 2, 300},
- [{rabbit_vhost_sup,
- {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
- permanent, infinity, supervisor,
- [rabbit_vhost_sup]}]}}.
+ [
+ %% rabbit_vhost_sup is an empty supervisor container for
+ %% all data processes.
+ {rabbit_vhost_sup,
+ {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
+ permanent, infinity, supervisor,
+ [rabbit_vhost_sup]},
+ %% rabbit_vhost_process is a vhost identity process, which
+ %% is responsible for data recovery and vhost aliveness status.
+ %% See the module comments for more info.
+ {rabbit_vhost_process,
+ {rabbit_vhost_process, start_link, [VHost]},
+ permanent, ?WORKER_WAIT, worker,
+ [rabbit_vhost_process]}]}}.
+
start_vhost_sup(VHost) ->
case rabbit_vhost_sup:start_link(VHost) of
{ok, Pid} ->
%% Save vhost sup record with wrapper pid and vhost sup pid.
ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid),
- %% We can start recover as soon as we have vhost_sup record saved
- ok = rabbit_vhost:recover(VHost),
{ok, Pid};
Other ->
Other
- end.
+ end. \ No newline at end of file