summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-03-17 13:03:03 +0000
committerDaniil Fedotov <dfedotov@pivotal.io>2017-04-12 12:13:43 +0100
commitc464fdefa9c451d25c42573190f72f24a227ed4a (patch)
tree3a277ae22ef2fecd4a01b37a0edc59342c64dc1b /src
parent536674232015587befed5be325bc89aeca3c17ff (diff)
downloadrabbitmq-server-git-c464fdefa9c451d25c42573190f72f24a227ed4a.tar.gz
Per-vhost supervision trees for queues and message stores.
Per-vhost message stores can be restarted, but queues contain references for old message stores in message store client data, also queues rely on message store process to report confirms for messages on disk. Because after message store restart queues will not get any confirms and will fail with badarg error trying to access message store with an old client, queue processes should be restarted together with message stores. Queue process cannot monitor message store because of backing_queue mechanism, so they should be controlled by a supervision tree. One tree will contain queues supervisor and message store proecesses. Per-vhost supervisor will restart if any of it's children dies. Per-vhost supervisor restart process will do queue and message store data recovery the same way as pre-3.7 global message store did, just with VHost as an argument and in a vhost data directory.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl28
-rw-r--r--src/rabbit_exchange.erl9
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_mirror_queue_slave.erl1
-rw-r--r--src/rabbit_msg_store_vhost_sup.erl103
-rw-r--r--src/rabbit_priority_queue.erl10
-rw-r--r--src/rabbit_queue_index.erl34
-rw-r--r--src/rabbit_recovery_terms.erl104
-rw-r--r--src/rabbit_upgrade.erl3
-rw-r--r--src/rabbit_variable_queue.erl208
-rw-r--r--src/rabbit_vhost.erl47
-rw-r--r--src/rabbit_vhost_msg_store.erl61
-rw-r--r--src/rabbit_vhost_sup.erl34
-rw-r--r--src/rabbit_vhost_sup_sup.erl127
-rw-r--r--src/rabbit_vhost_sup_watcher.erl66
-rw-r--r--src/rabbit_vm.erl2
17 files changed, 549 insertions, 299 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 680a6a2a98..2fa18ac0e5 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -819,10 +819,7 @@ boot_delegate() ->
recover() ->
rabbit_policy:recover(),
- Qs = rabbit_amqqueue:recover(),
- ok = rabbit_binding:recover(rabbit_exchange:recover(),
- [QName || #amqqueue{name = QName} <- Qs]),
- rabbit_amqqueue:start(Qs).
+ rabbit_vhost:recover().
maybe_insert_default_data() ->
case rabbit_table:needs_default_data() of
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index d0c55b2c0e..a753f591c4 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -19,6 +19,7 @@
-behaviour(supervisor2).
-export([start_link/0, start_queue_process/3]).
+-export([start_for_vhost/1, stop_for_vhost/1, find_for_vhost/2]).
-export([init/1]).
@@ -36,14 +37,35 @@
%%----------------------------------------------------------------------------
start_link() ->
- supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
+ supervisor2:start_link(?MODULE, []).
start_queue_process(Node, Q, StartMode) ->
- {ok, _SupPid, QPid} = supervisor2:start_child(
- {?SERVER, Node}, [Q, StartMode]),
+ #amqqueue{name = #resource{virtual_host = VHost}} = Q,
+ {ok, Sup} = find_for_vhost(VHost, Node),
+ {ok, _SupPid, QPid} = supervisor2:start_child(Sup, [Q, StartMode]),
QPid.
init([]) ->
{ok, {{simple_one_for_one, 10, 10},
[{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []},
temporary, ?SUPERVISOR_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}.
+
+find_for_vhost(VHost, Node) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
+ case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
+ [QSup] -> {ok, QSup};
+ Result -> {error, {queue_supervisor_not_found, Result}}
+ end.
+
+start_for_vhost(VHost) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ supervisor2:start_child(
+ VHostSup,
+ {rabbit_amqqueue_sup_sup,
+ {rabbit_amqqueue_sup_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}).
+
+stop_for_vhost(VHost) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
+ ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup). \ No newline at end of file
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 4061098a9d..cc2797489d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, policy_changed/2, callback/4, declare/7,
+-export([recover/1, policy_changed/2, callback/4, declare/7,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
update_scratch/3, update_decorators/1, immutable/1,
@@ -36,7 +36,7 @@
-type type() :: atom().
-type fun_name() :: atom().
--spec recover() -> [name()].
+-spec recover(rabbit_types:vhost()) -> [name()].
-spec callback
(rabbit_types:exchange(), fun_name(),
fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'.
@@ -107,10 +107,11 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
policy, user_who_performed_action]).
-recover() ->
+recover(VHost) ->
Xs = rabbit_misc:table_filter(
fun (#exchange{name = XName}) ->
- mnesia:read({rabbit_exchange, XName}) =:= []
+ XName#resource.virtual_host =:= VHost andalso
+ mnesia:read({rabbit_exchange, XName}) =:= []
end,
fun (X, Tx) ->
X1 = case Tx of
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index b006e37eb2..94710aed43 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,7 +26,7 @@
msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4]).
--export([start/1, stop/0, delete_crashed/1]).
+-export([start/2, stop/1, delete_crashed/1]).
-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
@@ -81,12 +81,12 @@
%% Backing queue
%% ---------------------------------------------------------------------------
-start(_DurableQueues) ->
+start(_Vhost, _DurableQueues) ->
%% This will never get called as this module will never be
%% installed as the default BQ implementation.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-stop() ->
+stop(_Vhost) ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 61623c9441..aec974c10c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -194,6 +194,7 @@ stop_pending_slaves(QName, Pids) ->
[begin
rabbit_mirror_queue_misc:log_warning(
QName, "Detected stale HA slave, stopping it: ~p~n", [Pid]),
+ %TODO: per-vhost supervisor
case erlang:process_info(Pid, dictionary) of
undefined -> ok;
{dictionary, Dict} ->
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl
deleted file mode 100644
index 5031c5f043..0000000000
--- a/src/rabbit_msg_store_vhost_sup.erl
+++ /dev/null
@@ -1,103 +0,0 @@
--module(rabbit_msg_store_vhost_sup).
-
--include("rabbit.hrl").
-
--behaviour(supervisor2).
-
--export([start_link/3, init/1, add_vhost/2, delete_vhost/2,
- client_init/5, successfully_recovered_state/2]).
-
-%% Internal
--export([start_store_for_vhost/4]).
-
-start_link(Type, VhostsClientRefs, StartupFunState) when is_map(VhostsClientRefs);
- VhostsClientRefs == undefined ->
- supervisor2:start_link({local, Type}, ?MODULE,
- [Type, VhostsClientRefs, StartupFunState]).
-
-init([Type, VhostsClientRefs, StartupFunState]) ->
- ets:new(Type, [named_table, public]),
- {ok, {{simple_one_for_one, 1, 1},
- [{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_store_for_vhost,
- [Type, VhostsClientRefs, StartupFunState]},
- transient, infinity, supervisor, [rabbit_msg_store]}]}}.
-
-
-add_vhost(Type, VHost) ->
- VHostPid = maybe_start_store_for_vhost(Type, VHost),
- {ok, VHostPid}.
-
-start_store_for_vhost(Type, VhostsClientRefs, StartupFunState, VHost) ->
- case vhost_store_pid(Type, VHost) of
- no_pid ->
- VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
- ok = rabbit_file:ensure_dir(VHostDir),
- rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]),
- VhostRefs = refs_for_vhost(VHost, VhostsClientRefs),
- VhostStartupFunState = startup_fun_state_for_vhost(StartupFunState, VHost),
- case rabbit_msg_store:start_link(Type, VHostDir, VhostRefs, VhostStartupFunState) of
- {ok, Pid} ->
- ets:insert(Type, {VHost, Pid}),
- {ok, Pid};
- Other -> Other
- end;
- Pid when is_pid(Pid) ->
- {error, {already_started, Pid}}
- end.
-
-startup_fun_state_for_vhost({Fun, {start, [#resource{}|_] = QNames}}, VHost) ->
- QNamesForVhost = [QName || QName = #resource{virtual_host = VH} <- QNames,
- VH == VHost ],
- {Fun, {start, QNamesForVhost}};
-startup_fun_state_for_vhost(State, _VHost) -> State.
-
-refs_for_vhost(_, undefined) -> undefined;
-refs_for_vhost(VHost, Refs) ->
- case maps:find(VHost, Refs) of
- {ok, Val} -> Val;
- error -> []
- end.
-
-
-delete_vhost(Type, VHost) ->
- case vhost_store_pid(Type, VHost) of
- no_pid -> ok;
- Pid when is_pid(Pid) ->
- supervisor2:terminate_child(Type, Pid),
- cleanup_vhost_store(Type, VHost, Pid)
- end,
- ok.
-
-client_init(Type, Ref, MsgOnDiskFun, CloseFDsFun, VHost) ->
- VHostPid = maybe_start_store_for_vhost(Type, VHost),
- rabbit_msg_store:client_init(VHostPid, Ref, MsgOnDiskFun, CloseFDsFun).
-
-maybe_start_store_for_vhost(Type, VHost) ->
- case supervisor2:start_child(Type, [VHost]) of
- {ok, Pid} -> Pid;
- {error, {already_started, Pid}} -> Pid;
- Error -> throw(Error)
- end.
-
-vhost_store_pid(Type, VHost) ->
- case ets:lookup(Type, VHost) of
- [] -> no_pid;
- [{VHost, Pid}] ->
- case erlang:is_process_alive(Pid) of
- true -> Pid;
- false ->
- cleanup_vhost_store(Type, VHost, Pid),
- no_pid
- end
- end.
-
-cleanup_vhost_store(Type, VHost, Pid) ->
- ets:delete_object(Type, {VHost, Pid}).
-
-successfully_recovered_state(Type, VHost) ->
- case vhost_store_pid(Type, VHost) of
- no_pid ->
- throw({message_store_not_started, Type, VHost});
- Pid when is_pid(Pid) ->
- rabbit_msg_store:successfully_recovered_state(Pid)
- end.
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index 34e23260ba..41e65e8a1f 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -30,7 +30,7 @@
-export([enable/0]).
--export([start/1, stop/0]).
+-export([start/2, stop/1]).
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
@@ -83,22 +83,22 @@ enable() ->
%%----------------------------------------------------------------------------
-start(QNames) ->
+start(VHost, QNames) ->
BQ = bq(),
%% TODO this expand-collapse dance is a bit ridiculous but it's what
%% rabbit_amqqueue:recover/0 expects. We could probably simplify
%% this if we rejigged recovery a bit.
{DupNames, ExpNames} = expand_queues(QNames),
- case BQ:start(ExpNames) of
+ case BQ:start(VHost, ExpNames) of
{ok, ExpRecovery} ->
{ok, collapse_recovery(QNames, DupNames, ExpRecovery)};
Else ->
Else
end.
-stop() ->
+stop(VHost) ->
BQ = bq(),
- BQ:stop().
+ BQ:stop(VHost).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index bf80fe53a5..e3a1e35ab3 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -17,10 +17,10 @@
-module(rabbit_queue_index).
-export([erase/1, init/3, reset_state/1, recover/6,
- terminate/2, delete_and_terminate/1,
+ terminate/3, delete_and_terminate/1,
pre_publish/7, flush_pre_publish_cache/2,
publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
- read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
+ read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]).
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
-export([scan_queue_segments/3]).
@@ -261,7 +261,7 @@
on_sync_fun(), on_sync_fun()) ->
{'undefined' | non_neg_integer(),
'undefined' | non_neg_integer(), qistate()}.
--spec terminate([any()], qistate()) -> qistate().
+-spec terminate(rabbit_types:vhsot(), [any()], qistate()) -> qistate().
-spec delete_and_terminate(qistate()) -> qistate().
-spec publish(rabbit_types:msg_id(), seq_id(),
rabbit_types:message_properties(), boolean(),
@@ -278,7 +278,7 @@
-spec next_segment_boundary(seq_id()) -> seq_id().
-spec bounds(qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}.
--spec start([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}.
+-spec start(rabbit_types:vhsot(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}.
-spec add_queue_ttl() -> 'ok'.
@@ -321,9 +321,9 @@ recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.
-terminate(Terms, State = #qistate { dir = Dir }) ->
+terminate(VHost, Terms, State = #qistate { dir = Dir }) ->
{SegmentCounts, State1} = terminate(State),
- rabbit_recovery_terms:store(filename:basename(Dir),
+ rabbit_recovery_terms:store(VHost, filename:basename(Dir),
[{segments, SegmentCounts} | Terms]),
State1.
@@ -491,34 +491,36 @@ bounds(State = #qistate { segments = Segments }) ->
end,
{LowSeqId, NextSeqId, State}.
-start(DurableQueueNames) ->
- ok = rabbit_recovery_terms:start(),
+start(VHost, DurableQueueNames) ->
+ ok = rabbit_recovery_terms:start(VHost),
{DurableTerms, DurableDirectories} =
lists:foldl(
fun(QName, {RecoveryTerms, ValidDirectories}) ->
DirName = queue_name_to_dir_name(QName),
- RecoveryInfo = case rabbit_recovery_terms:read(DirName) of
+ RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of
{error, _} -> non_clean_shutdown;
{ok, Terms} -> Terms
end,
{[RecoveryInfo | RecoveryTerms],
sets:add_element(DirName, ValidDirectories)}
end, {[], sets:new()}, DurableQueueNames),
-
%% Any queue directory we've not been asked to recover is considered garbage
rabbit_file:recursive_delete(
[DirName ||
- DirName <- all_queue_directory_names(),
+ DirName <- all_queue_directory_names(VHost),
not sets:is_element(filename:basename(DirName), DurableDirectories)]),
-
- rabbit_recovery_terms:clear(),
+ rabbit_recovery_terms:clear(VHost),
%% The backing queue interface requires that the queue recovery terms
%% which come back from start/1 are in the same order as DurableQueueNames
OrderedTerms = lists:reverse(DurableTerms),
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
-stop() -> rabbit_recovery_terms:stop().
+stop(VHost) -> rabbit_recovery_terms:stop(VHost).
+
+all_queue_directory_names(VHost) ->
+ filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_path(VHost),
+ "queues", "*"])).
all_queue_directory_names() ->
filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_wildcard(),
@@ -1447,6 +1449,6 @@ move_to_per_vhost_stores(#resource{} = QueueName) ->
end,
ok.
-update_recovery_term(#resource{} = QueueName, Term) ->
+update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) ->
Key = queue_name_to_dir_name(QueueName),
- rabbit_recovery_terms:store(Key, Term).
+ rabbit_recovery_terms:store(VHost, Key, Term).
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index c941126cd3..19dbcd2fd0 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -21,9 +21,9 @@
-behaviour(gen_server).
--export([start/0, stop/0, store/2, read/1, clear/0]).
+-export([start/1, stop/1, store/3, read/2, clear/1]).
--export([start_link/0]).
+-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -32,40 +32,55 @@
-rabbit_upgrade({upgrade_recovery_terms, local, []}).
-rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}).
-%%----------------------------------------------------------------------------
-
--spec start() -> rabbit_types:ok_or_error(term()).
--spec stop() -> rabbit_types:ok_or_error(term()).
--spec store(file:filename(), term()) -> rabbit_types:ok_or_error(term()).
--spec read(file:filename()) -> rabbit_types:ok_or_error2(term(), not_found).
--spec clear() -> 'ok'.
+-include("rabbit.hrl").
%%----------------------------------------------------------------------------
--define(SERVER, ?MODULE).
+-spec start(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()).
+-spec stop(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()).
+-spec store(rabbit_types:vhost(), file:filename(), term()) -> rabbit_types:ok_or_error(term()).
+-spec read(rabbit_types:vhost(), file:filename()) -> rabbit_types:ok_or_error2(term(), not_found).
+-spec clear(rabbit_types:vhost()) -> 'ok'.
-start() -> rabbit_sup:start_child(?MODULE).
+%%----------------------------------------------------------------------------
-stop() -> rabbit_sup:stop_child(?MODULE).
+start(VHost) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ {ok, _} = supervisor2:start_child(
+ VHostSup,
+ {?MODULE,
+ {?MODULE, start_link, [VHost]},
+ transient, ?WORKER_WAIT, worker,
+ [?MODULE]}),
+ ok.
+
+stop(VHost) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ case supervisor:terminate_child(VHostSup, ?MODULE) of
+ ok -> supervisor:delete_child(VHostSup, ?MODULE);
+ E -> E
+ end.
-store(DirBaseName, Terms) -> dets:insert(?MODULE, {DirBaseName, Terms}).
+store(VHost, DirBaseName, Terms) ->
+ dets:insert(VHost, {DirBaseName, Terms}).
-read(DirBaseName) ->
- case dets:lookup(?MODULE, DirBaseName) of
+read(VHost, DirBaseName) ->
+ case dets:lookup(VHost, DirBaseName) of
[{_, Terms}] -> {ok, Terms};
_ -> {error, not_found}
end.
-clear() ->
- ok = dets:delete_all_objects(?MODULE),
- flush().
+clear(VHost) ->
+ ok = dets:delete_all_objects(VHost),
+ flush(VHost).
-start_link() -> gen_server:start_link(?MODULE, [], []).
+start_link(VHost) ->
+ gen_server:start_link(?MODULE, [VHost], []).
%%----------------------------------------------------------------------------
upgrade_recovery_terms() ->
- open_table(),
+ open_global_table(),
try
QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"),
Dirs = case rabbit_file:list_dir(QueuesDir) of
@@ -75,37 +90,47 @@ upgrade_recovery_terms() ->
[begin
File = filename:join([QueuesDir, Dir, "clean.dot"]),
case rabbit_file:read_term_file(File) of
- {ok, Terms} -> ok = store(Dir, Terms);
+ {ok, Terms} -> ok = store(?MODULE, Dir, Terms);
{error, _} -> ok
end,
file:delete(File)
end || Dir <- Dirs],
ok
after
- close_table()
+ close_global_table()
end.
persistent_bytes() -> dets_upgrade(fun persistent_bytes/1).
persistent_bytes(Props) -> Props ++ [{persistent_bytes, 0}].
dets_upgrade(Fun)->
- open_table(),
+ open_global_table(),
try
ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) ->
- store(DirBaseName, Fun(Terms)),
+ store(?MODULE, DirBaseName, Fun(Terms)),
Acc
end, ok, ?MODULE),
ok
after
- close_table()
+ close_global_table()
end.
+open_global_table() ->
+ File = filename:join(rabbit_mnesia:dir(), "recovery.dets"),
+ {ok, _} = dets:open_file(?MODULE, [{file, File},
+ {ram_file, true},
+ {auto_save, infinity}]).
+
+close_global_table() ->
+ ok = dets:sync(?MODULE),
+ ok = dets:close(?MODULE).
+
%%----------------------------------------------------------------------------
-init(_) ->
+init([VHost]) ->
process_flag(trap_exit, true),
- open_table(),
- {ok, undefined}.
+ open_table(VHost),
+ {ok, VHost}.
handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}.
@@ -113,22 +138,23 @@ handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}.
handle_info(_Info, State) -> {noreply, State}.
-terminate(_Reason, _State) ->
- close_table().
+terminate(_Reason, VHost) ->
+ close_table(VHost).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
-open_table() ->
- File = filename:join(rabbit_mnesia:dir(), "recovery.dets"),
- {ok, _} = dets:open_file(?MODULE, [{file, File},
- {ram_file, true},
- {auto_save, infinity}]).
+open_table(VHost) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ File = filename:join(VHostDir, "recovery.dets"),
+ {ok, _} = dets:open_file(VHost, [{file, File},
+ {ram_file, true},
+ {auto_save, infinity}]).
-flush() -> ok = dets:sync(?MODULE).
+flush(VHost) -> ok = dets:sync(VHost).
-close_table() ->
- ok = flush(),
- ok = dets:close(?MODULE).
+close_table(VHost) ->
+ ok = flush(VHost),
+ ok = dets:close(VHost).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index eae464219a..ed2143a2b9 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -262,7 +262,8 @@ maybe_upgrade_local() ->
maybe_migrate_queues_to_per_vhost_storage() ->
Result = case rabbit_version:upgrades_required(message_store) of
{error, version_not_available} -> version_not_available;
- {error, starting_from_scratch} -> starting_from_scratch;
+ {error, starting_from_scratch} ->
+ starting_from_scratch;
{error, _} = Err -> throw(Err);
{ok, []} -> ok;
{ok, Upgrades} -> apply_upgrades(message_store,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 300da96441..556f92acb7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -28,19 +28,17 @@
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4, multiple_routing_keys/0]).
--export([start/1, stop/0]).
-
-%% exported for parallel map
--export([add_vhost_msg_store/1]).
+-export([start/2, stop/1]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/6]).
+-export([start_msg_store/3, stop_msg_store/1, init/6]).
-export([move_messages_to_vhost_store/0]).
--export([stop_vhost_msg_store/1]).
+
-include_lib("stdlib/include/qlc.hrl").
-define(QUEUE_MIGRATION_BATCH_SIZE, 100).
+-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}).
%%----------------------------------------------------------------------------
%% Messages, and their position in the queue, can be in memory or on
@@ -320,7 +318,10 @@
%% number of reduce_memory_usage executions, once it
%% reaches a threshold the queue will manually trigger a runtime GC
%% see: maybe_execute_gc/1
- memory_reduction_run_count
+ memory_reduction_run_count,
+ %% Queue data is grouped by VHost. We need to store it
+ %% to work with queue index.
+ virtual_host
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -345,8 +346,6 @@
}).
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
--define(PERSISTENT_MSG_STORE_SUP, msg_store_persistent_vhost).
--define(TRANSIENT_MSG_STORE_SUP, msg_store_transient_vhost).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
@@ -477,66 +476,37 @@ explicit_gc_run_operation_threshold_for_mode(Mode) ->
%% Public API
%%----------------------------------------------------------------------------
-start(DurableQueues) ->
- {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
+start(VHost, DurableQueues) ->
+ {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues),
%% Group recovery terms by vhost.
- {[], VhostRefs} = lists:foldl(
- fun
- %% We need to skip a queue name
- (non_clean_shutdown, {[_|QNames], VhostRefs}) ->
- {QNames, VhostRefs};
- (Terms, {[QueueName | QNames], VhostRefs}) ->
- case proplists:get_value(persistent_ref, Terms) of
- undefined -> {QNames, VhostRefs};
- Ref ->
- #resource{virtual_host = VHost} = QueueName,
- Refs = case maps:find(VHost, VhostRefs) of
- {ok, Val} -> Val;
- error -> []
- end,
- {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)}
- end
- end,
- {DurableQueues, #{}},
- AllTerms),
- start_msg_store(VhostRefs, StartFunState),
+ ClientRefs = [Ref || Terms <- AllTerms,
+ Terms /= non_clean_shutdown,
+ begin
+ Ref = proplists:get_value(persistent_ref, Terms),
+ Ref =/= undefined
+ end],
+ start_msg_store(VHost, ClientRefs, StartFunState),
{ok, AllTerms}.
-stop() ->
- ok = stop_msg_store(),
- ok = rabbit_queue_index:stop().
-
-start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined ->
- ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
- [?TRANSIENT_MSG_STORE_SUP,
- undefined, {fun (ok) -> finished end, ok}]),
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
- [?PERSISTENT_MSG_STORE_SUP, Refs, StartFunState]),
- %% Start message store for all known vhosts
- VHosts = rabbit_vhost:list(),
- %% TODO: recovery is limited by queue index recovery
- %% pool size. There is no point in parallelizing vhost
- %% recovery until there will be a queue index
- %% recovery pool per vhost
- lists:foreach(fun(Vhost) ->
- add_vhost_msg_store(Vhost)
- end,
- lists:sort(VHosts)),
- ok.
+stop(VHost) ->
+ ok = stop_msg_store(VHost),
+ ok = rabbit_queue_index:stop(VHost).
-add_vhost_msg_store(VHost) ->
+start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined ->
rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
- rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
- rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost),
+ {ok, _} = rabbit_vhost_msg_store:start(VHost,
+ ?TRANSIENT_MSG_STORE,
+ undefined,
+ ?EMPTY_START_FUN_STATE),
+ {ok, _} = rabbit_vhost_msg_store:start(VHost,
+ ?PERSISTENT_MSG_STORE,
+ Refs,
+ StartFunState),
rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]).
-stop_msg_store() ->
- ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP),
- ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP).
-
-stop_vhost_msg_store(VHost) ->
- rabbit_msg_store_vhost_sup:delete_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
- rabbit_msg_store_vhost_sup:delete_vhost(?PERSISTENT_MSG_STORE_SUP, VHost),
+stop_msg_store(VHost) ->
+ rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE),
+ rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE),
ok.
init(Queue, Recover, Callback) ->
@@ -555,12 +525,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
VHost = QueueName#resource.virtual_host,
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
- true -> msg_store_client_init(?PERSISTENT_MSG_STORE_SUP,
+ true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback, VHost);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined,
- AsyncCallback, VHost));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined,
+ AsyncCallback, VHost), VHost);
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
@@ -569,7 +539,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
VHost = QueueName#resource.virtual_host,
{PersistentClient, ContainsCheckFun} =
case IsDurable of
- true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, PRef,
+ true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback,
VHost),
{C, fun (MsgId) when is_binary(MsgId) ->
@@ -579,17 +549,18 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
end};
false -> {undefined, fun(_MsgId) -> false end}
end,
- TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP,
+ TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback,
VHost),
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
- rabbit_msg_store_vhost_sup:successfully_recovered_state(
- ?PERSISTENT_MSG_STORE_SUP, VHost),
+ rabbit_vhost_msg_store:successfully_recovered_state(
+ VHost,
+ ?PERSISTENT_MSG_STORE),
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
- PersistentClient, TransientClient).
+ PersistentClient, TransientClient, VHost).
process_recovery_terms(Terms=non_clean_shutdown) ->
{rabbit_guid:gen(), Terms};
@@ -600,7 +571,8 @@ process_recovery_terms(Terms) ->
end.
terminate(_Reason, State) ->
- State1 = #vqstate { persistent_count = PCount,
+ State1 = #vqstate { virtual_host = VHost,
+ persistent_count = PCount,
persistent_bytes = PBytes,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
@@ -614,9 +586,9 @@ terminate(_Reason, State) ->
Terms = [{persistent_ref, PRef},
{persistent_count, PCount},
{persistent_bytes, PBytes}],
- a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
- Terms, IndexState),
- msg_store_clients = undefined }).
+ a(State1#vqstate {
+ index_state = rabbit_queue_index:terminate(VHost, Terms, IndexState),
+ msg_store_clients = undefined }).
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
@@ -1270,12 +1242,12 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
Callback, VHost).
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
- CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP),
- rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun,
- fun () ->
- Callback(?MODULE, CloseFDsFun)
- end,
- VHost).
+ CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
+ rabbit_vhost_msg_store:client_init(VHost, MsgStore,
+ Ref, MsgOnDiskFun,
+ fun () ->
+ Callback(?MODULE, CloseFDsFun)
+ end).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
@@ -1379,7 +1351,7 @@ expand_delta(_SeqId, #delta { count = Count,
%%----------------------------------------------------------------------------
init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
- PersistentClient, TransientClient) ->
+ PersistentClient, TransientClient, VHost) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
{DeltaCount1, DeltaBytes1} =
@@ -1446,7 +1418,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
io_batch_size = IoBatchSize,
mode = default,
- memory_reduction_run_count = 0},
+ memory_reduction_run_count = 0,
+ virtual_host = VHost},
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
@@ -2771,16 +2744,21 @@ multiple_routing_keys() ->
%% Assumes message store is not running
transform_storage(TransformFun) ->
- transform_store(?PERSISTENT_MSG_STORE_SUP, TransformFun),
- transform_store(?TRANSIENT_MSG_STORE_SUP, TransformFun).
+ transform_store(?PERSISTENT_MSG_STORE, TransformFun),
+ transform_store(?TRANSIENT_MSG_STORE, TransformFun).
transform_store(Store, TransformFun) ->
rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
move_messages_to_vhost_store() ->
+ case list_persistent_queues() of
+ [] -> ok;
+ Queues -> move_messages_to_vhost_store(Queues)
+ end.
+
+move_messages_to_vhost_store(Queues) ->
log_upgrade("Moving messages to per-vhost message store"),
- Queues = list_persistent_queues(),
%% Move the queue index for each persistent queue to the new store
lists:foreach(
fun(Queue) ->
@@ -2791,20 +2769,18 @@ move_messages_to_vhost_store() ->
%% Legacy (global) msg_store may require recovery.
%% This upgrade step should only be started
%% if we are upgrading from a pre-3.7.0 version.
- {QueuesWithTerms, RecoveryRefs, StartFunState} = start_recovery_terms(Queues),
+ {QueuesWithTerms, RecoveryRefs, StartFunState} = read_old_recovery_terms(Queues),
OldStore = run_old_persistent_store(RecoveryRefs, StartFunState),
+
+ VHosts = rabbit_vhost:list(),
+
%% New store should not be recovered.
- NewStoreSup = start_new_store_sup(),
- Vhosts = rabbit_vhost:list(),
- lists:foreach(fun(VHost) ->
- rabbit_msg_store_vhost_sup:add_vhost(NewStoreSup, VHost)
- end,
- Vhosts),
+ ok = start_new_store(VHosts),
MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size,
?QUEUE_MIGRATION_BATCH_SIZE),
in_batches(MigrationBatchSize,
- {rabbit_variable_queue, migrate_queue, [OldStore, NewStoreSup]},
+ {rabbit_variable_queue, migrate_queue, [OldStore]},
QueuesWithTerms,
"message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ~n",
"message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"),
@@ -2813,7 +2789,7 @@ move_messages_to_vhost_store() ->
delete_old_store(OldStore),
ok = rabbit_queue_index:stop(),
- ok = rabbit_sup:stop_child(NewStoreSup),
+ ok = stop_new_store(VHosts),
ok.
in_batches(Size, MFA, List, MessageStart, MessageEnd) ->
@@ -2840,12 +2816,12 @@ in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) ->
rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]),
in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd).
-migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore, NewStoreSup) ->
+migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore) ->
log_upgrade_verbose(
"Migrating messages in queue ~s in vhost ~s to per-vhost message store~n",
[Name, VHost]),
OldStoreClient = get_global_store_client(OldStore),
- NewStoreClient = get_per_vhost_store_client(QueueName, NewStoreSup),
+ NewStoreClient = get_per_vhost_store_client(QueueName),
%% WARNING: During scan_queue_segments queue index state is being recovered
%% and terminated. This can cause side effects!
rabbit_queue_index:scan_queue_segments(
@@ -2881,12 +2857,11 @@ migrate_message(MsgId, OldC, NewC) ->
_ -> OldC
end.
-get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStoreSup) ->
- rabbit_msg_store_vhost_sup:client_init(NewStoreSup,
- rabbit_guid:gen(),
- fun(_,_) -> ok end,
- fun() -> ok end,
- VHost).
+get_per_vhost_store_client(#resource{virtual_host = VHost}) ->
+ rabbit_vhost_msg_store:client_init(VHost, ?PERSISTENT_MSG_STORE,
+ rabbit_guid:gen(),
+ fun(_,_) -> ok end,
+ fun() -> ok end).
get_global_store_client(OldStore) ->
rabbit_msg_store:client_init(OldStore,
@@ -2905,9 +2880,11 @@ list_persistent_queues() ->
mnesia:read(rabbit_queue, Name, read) =:= []]))
end).
-start_recovery_terms(Queues) ->
+read_old_recovery_terms([]) ->
+ {[], [], ?EMPTY_START_FUN_STATE};
+read_old_recovery_terms(Queues) ->
QueueNames = [Name || #amqqueue{name = Name} <- Queues],
- {AllTerms, StartFunState} = rabbit_queue_index:start(QueueNames),
+ {AllTerms, StartFunState} = rabbit_queue_index:read_global_recovery_terms(QueueNames),
Refs = [Ref || Terms <- AllTerms,
Terms /= non_clean_shutdown,
begin
@@ -2923,13 +2900,22 @@ run_old_persistent_store(Refs, StartFunState) ->
Refs, StartFunState]),
OldStoreName.
-start_new_store_sup() ->
- % Start persistent store sup without recovery.
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP,
- rabbit_msg_store_vhost_sup,
- [?PERSISTENT_MSG_STORE_SUP,
- undefined, {fun (ok) -> finished end, ok}]),
- ?PERSISTENT_MSG_STORE_SUP.
+start_new_store(VHosts) ->
+ %% Ensure vhost supervisor is started, so we can add vhsots to it.
+ %% TODO: Start message store for vhost without a supervisor.
+ lists:foreach(fun(VHost) ->
+ % Start persistent store without recovery.
+ {ok, _} = rabbit_vhost_msg_store:start(VHost, ?PERSISTENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE)
+ end,
+ VHosts),
+ ok.
+
+stop_new_store(VHosts) ->
+ lists:foreach(fun(VHost) ->
+ ok = rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE)
+ end,
+ VHosts),
+ ok.
delete_old_store(OldStore) ->
ok = rabbit_sup:stop_child(OldStore),
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 5ed23d9114..6d046021fd 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -20,11 +20,12 @@
%%----------------------------------------------------------------------------
+-export([recover/0, recover/1]).
-export([add/2, delete/2, exists/1, list/0, with/2, assert/1, update/2,
set_limits/2, limits_of/1]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
--export([purge_messages/1]).
+-export([delete_storage/1]).
-spec add(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
@@ -42,6 +43,36 @@
-spec info_all(rabbit_types:info_keys(), reference(), pid()) ->
'ok'.
+recover() ->
+ %% Clear out remnants of old incarnation, in case we restarted
+ %% faster than other nodes handled DOWN messages from us.
+ rabbit_amqqueue:on_node_down(node()),
+
+ rabbit_amqqueue:warn_file_limit(),
+ %% rabbit_vhost_sup_sup will start the actual recovery.
+ %% So recovery will be run every time a vhost supervisor is restarted.
+ ok = rabbit_vhost_sup_sup:start(),
+ [{ok, _} = rabbit_vhost_sup_sup:vhost_sup(VHost)
+ || VHost <- rabbit_vhost:list()],
+ ok.
+
+recover(VHost) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ rabbit_log:info("Making sure data directory '~s' for vhost '~s' exists~n",
+ [VHostDir, VHost]),
+ VHostStubFile = filename:join(VHostDir, ".vhost"),
+ ok = rabbit_file:ensure_dir(VHostStubFile),
+ ok = file:write_file(VHostStubFile, VHost),
+rabbit_log:info("Starting vhost ~p~n", [VHost]),
+ Qs = rabbit_amqqueue:recover(VHost),
+rabbit_log:info("Queues recovered for vhost ~p~n", [VHost]),
+ ok = rabbit_binding:recover(rabbit_exchange:recover(VHost),
+ [QName || #amqqueue{name = QName} <- Qs]),
+rabbit_log:info("Bindings recovered for vhost ~p~n", [VHost]),
+ ok = rabbit_amqqueue:start(Qs),
+rabbit_log:info("Queues started for vhost ~p~n", [VHost]),
+ ok.
+
%%----------------------------------------------------------------------------
-define(INFO_KEYS, [name, tracing]).
@@ -96,17 +127,16 @@ delete(VHostPath, ActingUser) ->
ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath},
{user_who_performed_action, ActingUser}]),
[ok = Fun() || Fun <- Funs],
+ %% After vhost was deleted from mnesia DB, we try to stop vhost supervisors
+ %% on all the nodes.
+ rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath),
ok.
-purge_messages(VHost) ->
+delete_storage(VHost) ->
VhostDir = msg_store_dir_path(VHost),
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
- %% Message store is stopped to close file handles
- rabbit_variable_queue:stop_vhost_msg_store(VHost),
- ok = rabbit_file:recursive_delete([VhostDir]),
- %% Ensure the store is terminated even if it was restarted during the delete operation
- %% above.
- rabbit_variable_queue:stop_vhost_msg_store(VHost).
+ %% Message store should be closed when vhost supervisor is closed.
+ ok = rabbit_file:recursive_delete([VhostDir]).
assert_benign(ok, _) -> ok;
assert_benign({ok, _}, _) -> ok;
@@ -134,7 +164,6 @@ internal_delete(VHostPath, ActingUser) ->
Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info), ActingUser)
|| Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
- purge_messages(VHostPath),
Fs1 ++ Fs2.
exists(VHostPath) ->
diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl
new file mode 100644
index 0000000000..482ad082b8
--- /dev/null
+++ b/src/rabbit_vhost_msg_store.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_vhost_msg_store).
+
+-include("rabbit.hrl").
+
+-export([start/4, stop/2, client_init/5, successfully_recovered_state/2]).
+
+
+start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
+ ClientRefs == undefined ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ supervisor2:start_child(VHostSup,
+ {Type, {rabbit_msg_store, start_link,
+ [Type, VHostDir, ClientRefs, StartupFunState]},
+ transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}).
+
+stop(VHost, Type) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ ok = supervisor2:terminate_child(VHostSup, Type),
+ ok = supervisor2:delete_child(VHostSup, Type).
+
+client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) ->
+ with_vhost_store(VHost, Type, fun(StorePid) ->
+ rabbit_msg_store:client_init(StorePid, Ref, MsgOnDiskFun, CloseFDsFun)
+ end).
+
+with_vhost_store(VHost, Type, Fun) ->
+ case vhost_store_pid(VHost, Type) of
+ no_pid ->
+ throw({message_store_not_started, Type, VHost});
+ Pid when is_pid(Pid) ->
+ Fun(Pid)
+ end.
+
+vhost_store_pid(VHost, Type) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ case supervisor2:find_child(VHostSup, Type) of
+ [Pid] -> Pid;
+ [] -> no_pid
+ end.
+
+successfully_recovered_state(VHost, Type) ->
+ with_vhost_store(VHost, Type, fun(StorePid) ->
+ rabbit_msg_store:successfully_recovered_state(StorePid)
+ end). \ No newline at end of file
diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl
new file mode 100644
index 0000000000..b8c7a649e5
--- /dev/null
+++ b/src/rabbit_vhost_sup.erl
@@ -0,0 +1,34 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_vhost_sup).
+
+-include("rabbit.hrl").
+
+%% Supervisor is a per-vhost supervisor to contain queues and message stores
+-behaviour(supervisor2).
+-export([init/1]).
+-export([start_link/1]).
+
+start_link(VHost) ->
+ supervisor2:start_link(?MODULE, [VHost]).
+
+init([VHost]) ->
+ {ok, {{one_for_all, 0, 1},
+ [{rabbit_vhost_sup_watcher,
+ {rabbit_vhost_sup_watcher, start_link, [VHost]},
+ intrinsic, ?WORKER_WAIT, worker,
+ [rabbit_vhost_sup]}]}}.
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
new file mode 100644
index 0000000000..58a5925af9
--- /dev/null
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -0,0 +1,127 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_vhost_sup_sup).
+
+-include("rabbit.hrl").
+
+-behaviour(supervisor2).
+
+-export([init/1]).
+
+-export([start_link/0, start/0]).
+-export([vhost_sup/1, vhost_sup/2]).
+-export([start_vhost/1, stop_and_delete_vhost/1, delete_on_all_nodes/1]).
+
+start() ->
+ rabbit_sup:start_supervisor_child(?MODULE).
+
+start_link() ->
+ supervisor2:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+ ets:new(?MODULE, [named_table, public]),
+ {ok, {{simple_one_for_one, 1, 5},
+ [{rabbit_vhost, {rabbit_vhost_sup_sup, start_vhost, []},
+ transient, infinity, supervisor,
+ [rabbit_vhost_sup_sup, rabbit_vhost_sup]}]}}.
+
+start_vhost(VHost) ->
+ case rabbit_vhost_sup:start_link(VHost) of
+ {ok, Pid} ->
+ ok = save_vhost_pid(VHost, Pid),
+ ok = rabbit_vhost:recover(VHost),
+ {ok, Pid};
+ Other ->
+ Other
+ end.
+
+stop_and_delete_vhost(VHost) ->
+ case vhost_pid(VHost) of
+ no_pid -> ok;
+ Pid when is_pid(Pid) ->
+ rabbit_log:info("Stopping vhost supervisor ~p for vhost ~p~n",
+ [Pid, VHost]),
+ case supervisor2:terminate_child(?MODULE, Pid) of
+ ok ->
+ ok = rabbit_vhost:delete_storage(VHost);
+ Other ->
+ Other
+ end
+ end.
+
+delete_on_all_nodes(VHost) ->
+%% TODO: failing nodes
+ [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
+ ok.
+
+%% We take an optimistic approach whan stopping a remote VHost supervisor.
+stop_and_delete_vhost(VHost, Node) when Node == node(self()) ->
+ stop_and_delete_vhost(VHost);
+stop_and_delete_vhost(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, stop_and_delete_vhost, [VHost]) of
+ ok -> ok;
+ {badrpc, RpcErr} ->
+ rabbit_log:error("Failed to stop and delete a vhost ~p"
+ " on node ~p."
+ " Reason: ~p",
+ [VHost, Node, RpcErr]),
+ {error, RpcErr}
+ end.
+
+vhost_sup(VHost, Local) when Local == node(self()) ->
+ vhost_sup(VHost);
+vhost_sup(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid};
+ {badrpc, RpcErr} ->
+ {error, RpcErr}
+ end.
+
+-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}.
+vhost_sup(VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ false -> {error, {no_such_vhost, VHost}};
+ true ->
+ case vhost_pid(VHost) of
+ no_pid ->
+ case supervisor2:start_child(?MODULE, [VHost]) of
+ {ok, Pid} -> {ok, Pid};
+ {error, {already_started, Pid}} -> {ok, Pid};
+ Error -> throw(Error)
+ end;
+ Pid when is_pid(Pid) ->
+ {ok, Pid}
+ end
+ end.
+
+save_vhost_pid(VHost, Pid) ->
+ true = ets:insert(?MODULE, {VHost, Pid}),
+ ok.
+
+-spec vhost_pid(rabbit_types:vhost()) -> no_pid | pid().
+vhost_pid(VHost) ->
+ case ets:lookup(?MODULE, VHost) of
+ [] -> no_pid;
+ [{VHost, Pid}] ->
+ case erlang:is_process_alive(Pid) of
+ true -> Pid;
+ false ->
+ ets:delete_object(?MODULE, {VHost, Pid}),
+ no_pid
+ end
+ end.
diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_sup_watcher.erl
new file mode 100644
index 0000000000..3ce726621f
--- /dev/null
+++ b/src/rabbit_vhost_sup_watcher.erl
@@ -0,0 +1,66 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+%% This module implements a watcher process which should stop
+%% the parent supervisor if its vhost is missing from the mnesia DB
+
+-module(rabbit_vhost_sup_watcher).
+
+-include("rabbit.hrl").
+
+-define(TICKTIME_RATIO, 4).
+
+-behaviour(gen_server2).
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+
+start_link(VHost) ->
+ gen_server2:start_link(?MODULE, [VHost], []).
+
+
+init([VHost]) ->
+ Interval = interval(),
+ timer:send_interval(Interval, check_vhost),
+ {ok, VHost}.
+
+handle_call(_,_,VHost) ->
+ {reply, ok, VHost}.
+
+handle_cast(_, VHost) ->
+ {noreply, VHost}.
+
+handle_info(check_vhost, VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ true -> {noreply, VHost};
+ false ->
+ rabbit_log:error(" Vhost \"~p\" is gone."
+ " Stopping message store supervisor.",
+ [VHost]),
+ {stop, normal, VHost}
+ end;
+handle_info(_, VHost) ->
+ {noreply, VHost}.
+
+terminate(_, _) -> ok.
+
+code_change(_OldVsn, VHost, _Extra) ->
+ {ok, VHost}.
+
+interval() ->
+ application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO. \ No newline at end of file
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 326e0491d0..24398c477b 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -144,7 +144,7 @@ bytes(Words) -> try
catch
_:_ -> 0
end.
-
+%% TODO: per-vhost supervisor
interesting_sups() ->
[[rabbit_amqqueue_sup_sup], conn_sups() | interesting_sups0()].