summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile4
-rw-r--r--priv/schema/rabbitmq.schema6
-rw-r--r--src/rabbit.erl19
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl36
-rw-r--r--src/rabbit_exchange.erl9
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_msg_store_vhost_sup.erl103
-rw-r--r--src/rabbit_priority_queue.erl10
-rw-r--r--src/rabbit_queue_index.erl64
-rw-r--r--src/rabbit_recovery_terms.erl114
-rw-r--r--src/rabbit_sup.erl23
-rw-r--r--src/rabbit_upgrade.erl3
-rw-r--r--src/rabbit_variable_queue.erl238
-rw-r--r--src/rabbit_vhost.erl44
-rw-r--r--src/rabbit_vhost_msg_store.erl61
-rw-r--r--src/rabbit_vhost_sup.erl34
-rw-r--r--src/rabbit_vhost_sup_sup.erl171
-rw-r--r--src/rabbit_vhost_sup_watcher.erl66
-rw-r--r--src/rabbit_vhost_sup_wrapper.erl53
-rw-r--r--src/rabbit_vm.erl86
-rw-r--r--test/backing_queue_SUITE.erl67
-rw-r--r--test/channel_operation_timeout_test_queue.erl108
-rw-r--r--test/crashing_queues_SUITE.erl12
24 files changed, 897 insertions, 444 deletions
diff --git a/Makefile b/Makefile
index 1510d8995a..0a27b81cfe 100644
--- a/Makefile
+++ b/Makefile
@@ -117,7 +117,9 @@ define PROJECT_ENV
%% rabbitmq-server-589
{proxy_protocol, false},
{disk_monitor_failure_retries, 10},
- {disk_monitor_failure_retry_interval, 120000}
+ {disk_monitor_failure_retry_interval, 120000},
+ %% either "stop_node" or "ignore"
+ {vhost_restart_strategy, stop_node}
]
endef
diff --git a/priv/schema/rabbitmq.schema b/priv/schema/rabbitmq.schema
index fab07baeb4..6ccea48022 100644
--- a/priv/schema/rabbitmq.schema
+++ b/priv/schema/rabbitmq.schema
@@ -949,6 +949,12 @@ end}.
{mapping, "proxy_protocol", "rabbit.proxy_protocol",
[{datatype, {enum, [true, false]}}]}.
+%% Whether to stop the rabbit application if VHost data
+%% cannot be recovered.
+
+{mapping, "vhost_restart_strategy", "rabbit.vhost_restart_strategy",
+ [{datatype, {enum, [stop_node, ignore]}}]}.
+
% ==========================
% Lager section
% ==========================
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 5228984ad2..8e6c9ead26 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -147,12 +147,6 @@
[{description, "core initialized"},
{requires, kernel_ready}]}).
--rabbit_boot_step({empty_db_check,
- [{description, "empty DB check"},
- {mfa, {?MODULE, maybe_insert_default_data, []}},
- {requires, core_initialized},
- {enables, routing_ready}]}).
-
-rabbit_boot_step({upgrade_queues,
[{description, "per-vhost message store migration"},
{mfa, {rabbit_upgrade,
@@ -164,7 +158,13 @@
-rabbit_boot_step({recovery,
[{description, "exchange, queue and binding recovery"},
{mfa, {rabbit, recover, []}},
- {requires, core_initialized},
+ {requires, [core_initialized]},
+ {enables, routing_ready}]}).
+
+-rabbit_boot_step({empty_db_check,
+ [{description, "empty DB check"},
+ {mfa, {?MODULE, maybe_insert_default_data, []}},
+ {requires, recovery},
{enables, routing_ready}]}).
-rabbit_boot_step({mirrored_queues,
@@ -829,10 +829,7 @@ boot_delegate() ->
recover() ->
rabbit_policy:recover(),
- Qs = rabbit_amqqueue:recover(),
- ok = rabbit_binding:recover(rabbit_exchange:recover(),
- [QName || #amqqueue{name = QName} <- Qs]),
- rabbit_amqqueue:start(Qs).
+ rabbit_vhost:recover().
maybe_insert_default_data() ->
case rabbit_table:needs_default_data() of
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index d0c55b2c0e..347dbbb48a 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -19,6 +19,8 @@
-behaviour(supervisor2).
-export([start_link/0, start_queue_process/3]).
+-export([start_for_vhost/1, stop_for_vhost/1,
+ find_for_vhost/2, find_for_vhost/1]).
-export([init/1]).
@@ -36,14 +38,42 @@
%%----------------------------------------------------------------------------
start_link() ->
- supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
+ supervisor2:start_link(?MODULE, []).
start_queue_process(Node, Q, StartMode) ->
- {ok, _SupPid, QPid} = supervisor2:start_child(
- {?SERVER, Node}, [Q, StartMode]),
+ #amqqueue{name = #resource{virtual_host = VHost}} = Q,
+ {ok, Sup} = find_for_vhost(VHost, Node),
+ {ok, _SupPid, QPid} = supervisor2:start_child(Sup, [Q, StartMode]),
QPid.
init([]) ->
{ok, {{simple_one_for_one, 10, 10},
[{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []},
temporary, ?SUPERVISOR_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}.
+
+-spec find_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
+find_for_vhost(VHost) ->
+ find_for_vhost(VHost, node()).
+
+-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}.
+find_for_vhost(VHost, Node) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
+ case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
+ [QSup] -> {ok, QSup};
+ Result -> {error, {queue_supervisor_not_found, Result}}
+ end.
+
+-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
+start_for_vhost(VHost) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ supervisor2:start_child(
+ VHostSup,
+ {rabbit_amqqueue_sup_sup,
+ {rabbit_amqqueue_sup_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}).
+
+-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
+stop_for_vhost(VHost) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
+ ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup). \ No newline at end of file
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 4061098a9d..cc2797489d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, policy_changed/2, callback/4, declare/7,
+-export([recover/1, policy_changed/2, callback/4, declare/7,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
update_scratch/3, update_decorators/1, immutable/1,
@@ -36,7 +36,7 @@
-type type() :: atom().
-type fun_name() :: atom().
--spec recover() -> [name()].
+-spec recover(rabbit_types:vhost()) -> [name()].
-spec callback
(rabbit_types:exchange(), fun_name(),
fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'.
@@ -107,10 +107,11 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
policy, user_who_performed_action]).
-recover() ->
+recover(VHost) ->
Xs = rabbit_misc:table_filter(
fun (#exchange{name = XName}) ->
- mnesia:read({rabbit_exchange, XName}) =:= []
+ XName#resource.virtual_host =:= VHost andalso
+ mnesia:read({rabbit_exchange, XName}) =:= []
end,
fun (X, Tx) ->
X1 = case Tx of
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index b9952178e0..fefa0de1c9 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,7 +26,7 @@
msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4]).
--export([start/1, stop/0, delete_crashed/1]).
+-export([start/2, stop/1, delete_crashed/1]).
-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
@@ -81,12 +81,12 @@
%% Backing queue
%% ---------------------------------------------------------------------------
-start(_DurableQueues) ->
+start(_Vhost, _DurableQueues) ->
%% This will never get called as this module will never be
%% installed as the default BQ implementation.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-stop() ->
+stop(_Vhost) ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 748a5afdf5..ee697be501 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -197,8 +197,10 @@ stop_pending_slaves(QName, Pids) ->
case erlang:process_info(Pid, dictionary) of
undefined -> ok;
{dictionary, Dict} ->
+ Vhost = QName#resource.virtual_host,
+ {ok, AmqQSup} = rabbit_amqqueue_sup_sup:find_for_vhost(Vhost),
case proplists:get_value('$ancestors', Dict) of
- [Sup, rabbit_amqqueue_sup_sup | _] ->
+ [Sup, AmqQSup | _] ->
exit(Sup, kill),
exit(Pid, kill);
_ ->
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl
deleted file mode 100644
index 5031c5f043..0000000000
--- a/src/rabbit_msg_store_vhost_sup.erl
+++ /dev/null
@@ -1,103 +0,0 @@
--module(rabbit_msg_store_vhost_sup).
-
--include("rabbit.hrl").
-
--behaviour(supervisor2).
-
--export([start_link/3, init/1, add_vhost/2, delete_vhost/2,
- client_init/5, successfully_recovered_state/2]).
-
-%% Internal
--export([start_store_for_vhost/4]).
-
-start_link(Type, VhostsClientRefs, StartupFunState) when is_map(VhostsClientRefs);
- VhostsClientRefs == undefined ->
- supervisor2:start_link({local, Type}, ?MODULE,
- [Type, VhostsClientRefs, StartupFunState]).
-
-init([Type, VhostsClientRefs, StartupFunState]) ->
- ets:new(Type, [named_table, public]),
- {ok, {{simple_one_for_one, 1, 1},
- [{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_store_for_vhost,
- [Type, VhostsClientRefs, StartupFunState]},
- transient, infinity, supervisor, [rabbit_msg_store]}]}}.
-
-
-add_vhost(Type, VHost) ->
- VHostPid = maybe_start_store_for_vhost(Type, VHost),
- {ok, VHostPid}.
-
-start_store_for_vhost(Type, VhostsClientRefs, StartupFunState, VHost) ->
- case vhost_store_pid(Type, VHost) of
- no_pid ->
- VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
- ok = rabbit_file:ensure_dir(VHostDir),
- rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]),
- VhostRefs = refs_for_vhost(VHost, VhostsClientRefs),
- VhostStartupFunState = startup_fun_state_for_vhost(StartupFunState, VHost),
- case rabbit_msg_store:start_link(Type, VHostDir, VhostRefs, VhostStartupFunState) of
- {ok, Pid} ->
- ets:insert(Type, {VHost, Pid}),
- {ok, Pid};
- Other -> Other
- end;
- Pid when is_pid(Pid) ->
- {error, {already_started, Pid}}
- end.
-
-startup_fun_state_for_vhost({Fun, {start, [#resource{}|_] = QNames}}, VHost) ->
- QNamesForVhost = [QName || QName = #resource{virtual_host = VH} <- QNames,
- VH == VHost ],
- {Fun, {start, QNamesForVhost}};
-startup_fun_state_for_vhost(State, _VHost) -> State.
-
-refs_for_vhost(_, undefined) -> undefined;
-refs_for_vhost(VHost, Refs) ->
- case maps:find(VHost, Refs) of
- {ok, Val} -> Val;
- error -> []
- end.
-
-
-delete_vhost(Type, VHost) ->
- case vhost_store_pid(Type, VHost) of
- no_pid -> ok;
- Pid when is_pid(Pid) ->
- supervisor2:terminate_child(Type, Pid),
- cleanup_vhost_store(Type, VHost, Pid)
- end,
- ok.
-
-client_init(Type, Ref, MsgOnDiskFun, CloseFDsFun, VHost) ->
- VHostPid = maybe_start_store_for_vhost(Type, VHost),
- rabbit_msg_store:client_init(VHostPid, Ref, MsgOnDiskFun, CloseFDsFun).
-
-maybe_start_store_for_vhost(Type, VHost) ->
- case supervisor2:start_child(Type, [VHost]) of
- {ok, Pid} -> Pid;
- {error, {already_started, Pid}} -> Pid;
- Error -> throw(Error)
- end.
-
-vhost_store_pid(Type, VHost) ->
- case ets:lookup(Type, VHost) of
- [] -> no_pid;
- [{VHost, Pid}] ->
- case erlang:is_process_alive(Pid) of
- true -> Pid;
- false ->
- cleanup_vhost_store(Type, VHost, Pid),
- no_pid
- end
- end.
-
-cleanup_vhost_store(Type, VHost, Pid) ->
- ets:delete_object(Type, {VHost, Pid}).
-
-successfully_recovered_state(Type, VHost) ->
- case vhost_store_pid(Type, VHost) of
- no_pid ->
- throw({message_store_not_started, Type, VHost});
- Pid when is_pid(Pid) ->
- rabbit_msg_store:successfully_recovered_state(Pid)
- end.
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index 34e23260ba..41e65e8a1f 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -30,7 +30,7 @@
-export([enable/0]).
--export([start/1, stop/0]).
+-export([start/2, stop/1]).
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
@@ -83,22 +83,22 @@ enable() ->
%%----------------------------------------------------------------------------
-start(QNames) ->
+start(VHost, QNames) ->
BQ = bq(),
%% TODO this expand-collapse dance is a bit ridiculous but it's what
%% rabbit_amqqueue:recover/0 expects. We could probably simplify
%% this if we rejigged recovery a bit.
{DupNames, ExpNames} = expand_queues(QNames),
- case BQ:start(ExpNames) of
+ case BQ:start(VHost, ExpNames) of
{ok, ExpRecovery} ->
{ok, collapse_recovery(QNames, DupNames, ExpRecovery)};
Else ->
Else
end.
-stop() ->
+stop(VHost) ->
BQ = bq(),
- BQ:stop().
+ BQ:stop(VHost).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 67f783a8dd..a71eaf1ff4 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -17,16 +17,19 @@
-module(rabbit_queue_index).
-export([erase/1, init/3, reset_state/1, recover/6,
- terminate/2, delete_and_terminate/1,
+ terminate/3, delete_and_terminate/1,
pre_publish/7, flush_pre_publish_cache/2,
publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
- read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
+ read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]).
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
-export([scan_queue_segments/3]).
%% Migrates from global to per-vhost message stores
--export([move_to_per_vhost_stores/1, update_recovery_term/2]).
+-export([move_to_per_vhost_stores/1,
+ update_recovery_term/2,
+ read_global_recovery_terms/1,
+ cleanup_global_recovery_terms/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -261,7 +264,7 @@
on_sync_fun(), on_sync_fun()) ->
{'undefined' | non_neg_integer(),
'undefined' | non_neg_integer(), qistate()}.
--spec terminate([any()], qistate()) -> qistate().
+-spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate().
-spec delete_and_terminate(qistate()) -> qistate().
-spec publish(rabbit_types:msg_id(), seq_id(),
rabbit_types:message_properties(), boolean(),
@@ -278,7 +281,7 @@
-spec next_segment_boundary(seq_id()) -> seq_id().
-spec bounds(qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}.
--spec start([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}.
+-spec start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}.
-spec add_queue_ttl() -> 'ok'.
@@ -321,9 +324,9 @@ recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.
-terminate(Terms, State = #qistate { dir = Dir }) ->
+terminate(VHost, Terms, State = #qistate { dir = Dir }) ->
{SegmentCounts, State1} = terminate(State),
- rabbit_recovery_terms:store(filename:basename(Dir),
+ rabbit_recovery_terms:store(VHost, filename:basename(Dir),
[{segments, SegmentCounts} | Terms]),
State1.
@@ -491,34 +494,63 @@ bounds(State = #qistate { segments = Segments }) ->
end,
{LowSeqId, NextSeqId, State}.
-start(DurableQueueNames) ->
- ok = rabbit_recovery_terms:start(),
+start(VHost, DurableQueueNames) ->
+ ok = rabbit_recovery_terms:start(VHost),
{DurableTerms, DurableDirectories} =
lists:foldl(
fun(QName, {RecoveryTerms, ValidDirectories}) ->
DirName = queue_name_to_dir_name(QName),
- RecoveryInfo = case rabbit_recovery_terms:read(DirName) of
+ RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of
{error, _} -> non_clean_shutdown;
{ok, Terms} -> Terms
end,
{[RecoveryInfo | RecoveryTerms],
sets:add_element(DirName, ValidDirectories)}
end, {[], sets:new()}, DurableQueueNames),
-
%% Any queue directory we've not been asked to recover is considered garbage
rabbit_file:recursive_delete(
[DirName ||
- DirName <- all_queue_directory_names(),
+ DirName <- all_queue_directory_names(VHost),
not sets:is_element(filename:basename(DirName), DurableDirectories)]),
+ rabbit_recovery_terms:clear(VHost),
- rabbit_recovery_terms:clear(),
+ %% The backing queue interface requires that the queue recovery terms
+ %% which come back from start/1 are in the same order as DurableQueueNames
+ OrderedTerms = lists:reverse(DurableTerms),
+ {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+
+
+read_global_recovery_terms(DurableQueueNames) ->
+ ok = rabbit_recovery_terms:open_global_table(),
+
+ DurableTerms =
+ lists:foldl(
+ fun(QName, RecoveryTerms) ->
+ DirName = queue_name_to_dir_name(QName),
+ RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
+ {error, _} -> non_clean_shutdown;
+ {ok, Terms} -> Terms
+ end,
+ [RecoveryInfo | RecoveryTerms]
+ end, [], DurableQueueNames),
+ ok = rabbit_recovery_terms:close_global_table(),
%% The backing queue interface requires that the queue recovery terms
%% which come back from start/1 are in the same order as DurableQueueNames
OrderedTerms = lists:reverse(DurableTerms),
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
-stop() -> rabbit_recovery_terms:stop().
+cleanup_global_recovery_terms() ->
+ rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
+ rabbit_recovery_terms:delete_global_table(),
+ ok.
+
+
+stop(VHost) -> rabbit_recovery_terms:stop(VHost).
+
+all_queue_directory_names(VHost) ->
+ filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_path(VHost),
+ "queues", "*"])).
all_queue_directory_names() ->
filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_wildcard(),
@@ -1447,6 +1479,6 @@ move_to_per_vhost_stores(#resource{} = QueueName) ->
end,
ok.
-update_recovery_term(#resource{} = QueueName, Term) ->
+update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) ->
Key = queue_name_to_dir_name(QueueName),
- rabbit_recovery_terms:store(Key, Term).
+ rabbit_recovery_terms:store(VHost, Key, Term).
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index c941126cd3..be9b1b6227 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -21,51 +21,69 @@
-behaviour(gen_server).
--export([start/0, stop/0, store/2, read/1, clear/0]).
+-export([start/1, stop/1, store/3, read/2, clear/1]).
--export([start_link/0]).
+-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([upgrade_recovery_terms/0, persistent_bytes/0]).
+-export([open_global_table/0, close_global_table/0,
+ read_global/1, delete_global_table/0]).
+-export([open_table/1, close_table/1]).
-rabbit_upgrade({upgrade_recovery_terms, local, []}).
-rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}).
-%%----------------------------------------------------------------------------
-
--spec start() -> rabbit_types:ok_or_error(term()).
--spec stop() -> rabbit_types:ok_or_error(term()).
--spec store(file:filename(), term()) -> rabbit_types:ok_or_error(term()).
--spec read(file:filename()) -> rabbit_types:ok_or_error2(term(), not_found).
--spec clear() -> 'ok'.
+-include("rabbit.hrl").
%%----------------------------------------------------------------------------
--define(SERVER, ?MODULE).
+-spec start(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()).
+-spec stop(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()).
+-spec store(rabbit_types:vhost(), file:filename(), term()) -> rabbit_types:ok_or_error(term()).
+-spec read(rabbit_types:vhost(), file:filename()) -> rabbit_types:ok_or_error2(term(), not_found).
+-spec clear(rabbit_types:vhost()) -> 'ok'.
-start() -> rabbit_sup:start_child(?MODULE).
+%%----------------------------------------------------------------------------
-stop() -> rabbit_sup:stop_child(?MODULE).
+start(VHost) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ {ok, _} = supervisor2:start_child(
+ VHostSup,
+ {?MODULE,
+ {?MODULE, start_link, [VHost]},
+ transient, ?WORKER_WAIT, worker,
+ [?MODULE]}),
+ ok.
+
+stop(VHost) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ case supervisor:terminate_child(VHostSup, ?MODULE) of
+ ok -> supervisor:delete_child(VHostSup, ?MODULE);
+ E -> E
+ end.
-store(DirBaseName, Terms) -> dets:insert(?MODULE, {DirBaseName, Terms}).
+store(VHost, DirBaseName, Terms) ->
+ dets:insert(VHost, {DirBaseName, Terms}).
-read(DirBaseName) ->
- case dets:lookup(?MODULE, DirBaseName) of
+read(VHost, DirBaseName) ->
+ case dets:lookup(VHost, DirBaseName) of
[{_, Terms}] -> {ok, Terms};
_ -> {error, not_found}
end.
-clear() ->
- ok = dets:delete_all_objects(?MODULE),
- flush().
+clear(VHost) ->
+ ok = dets:delete_all_objects(VHost),
+ flush(VHost).
-start_link() -> gen_server:start_link(?MODULE, [], []).
+start_link(VHost) ->
+ gen_server:start_link(?MODULE, [VHost], []).
%%----------------------------------------------------------------------------
upgrade_recovery_terms() ->
- open_table(),
+ open_global_table(),
try
QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"),
Dirs = case rabbit_file:list_dir(QueuesDir) of
@@ -75,37 +93,54 @@ upgrade_recovery_terms() ->
[begin
File = filename:join([QueuesDir, Dir, "clean.dot"]),
case rabbit_file:read_term_file(File) of
- {ok, Terms} -> ok = store(Dir, Terms);
+ {ok, Terms} -> ok = store(?MODULE, Dir, Terms);
{error, _} -> ok
end,
file:delete(File)
end || Dir <- Dirs],
ok
after
- close_table()
+ close_global_table()
end.
persistent_bytes() -> dets_upgrade(fun persistent_bytes/1).
persistent_bytes(Props) -> Props ++ [{persistent_bytes, 0}].
dets_upgrade(Fun)->
- open_table(),
+ open_global_table(),
try
ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) ->
- store(DirBaseName, Fun(Terms)),
+ store(?MODULE, DirBaseName, Fun(Terms)),
Acc
end, ok, ?MODULE),
ok
after
- close_table()
+ close_global_table()
end.
+open_global_table() ->
+ File = filename:join(rabbit_mnesia:dir(), "recovery.dets"),
+ {ok, _} = dets:open_file(?MODULE, [{file, File},
+ {ram_file, true},
+ {auto_save, infinity}]),
+ ok.
+
+close_global_table() ->
+ ok = dets:sync(?MODULE),
+ ok = dets:close(?MODULE).
+
+read_global(DirBaseName) ->
+ read(?MODULE, DirBaseName).
+
+delete_global_table() ->
+ file:delete(filename:join(rabbit_mnesia:dir(), "recovery.dets")).
+
%%----------------------------------------------------------------------------
-init(_) ->
+init([VHost]) ->
process_flag(trap_exit, true),
- open_table(),
- {ok, undefined}.
+ open_table(VHost),
+ {ok, VHost}.
handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}.
@@ -113,22 +148,23 @@ handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}.
handle_info(_Info, State) -> {noreply, State}.
-terminate(_Reason, _State) ->
- close_table().
+terminate(_Reason, VHost) ->
+ close_table(VHost).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
-open_table() ->
- File = filename:join(rabbit_mnesia:dir(), "recovery.dets"),
- {ok, _} = dets:open_file(?MODULE, [{file, File},
- {ram_file, true},
- {auto_save, infinity}]).
+open_table(VHost) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ File = filename:join(VHostDir, "recovery.dets"),
+ {ok, _} = dets:open_file(VHost, [{file, File},
+ {ram_file, true},
+ {auto_save, infinity}]).
-flush() -> ok = dets:sync(?MODULE).
+flush(VHost) -> ok = dets:sync(VHost).
-close_table() ->
- ok = flush(),
- ok = dets:close(?MODULE).
+close_table(VHost) ->
+ ok = flush(VHost),
+ ok = dets:close(VHost).
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 38d561fa80..0622d16e61 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -16,7 +16,7 @@
-module(rabbit_sup).
--behaviour(supervisor).
+-behaviour(supervisor2).
-export([start_link/0, start_child/1, start_child/2, start_child/3, start_child/4,
start_supervisor_child/1, start_supervisor_child/2,
@@ -25,7 +25,7 @@
start_delayed_restartable_child/1, start_delayed_restartable_child/2,
stop_child/1]).
--export([init/1]).
+-export([init/1, prep_stop/0]).
-include("rabbit.hrl").
@@ -49,20 +49,20 @@
%%----------------------------------------------------------------------------
-start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
start_child(Mod) -> start_child(Mod, []).
start_child(Mod, Args) -> start_child(Mod, Mod, Args).
start_child(ChildId, Mod, Args) ->
- child_reply(supervisor:start_child(
+ child_reply(supervisor2:start_child(
?SERVER,
{ChildId, {Mod, start_link, Args},
transient, ?WORKER_WAIT, worker, [Mod]})).
start_child(ChildId, Mod, Fun, Args) ->
- child_reply(supervisor:start_child(
+ child_reply(supervisor2:start_child(
?SERVER,
{ChildId, {Mod, Fun, Args},
transient, ?WORKER_WAIT, worker, [Mod]})).
@@ -73,7 +73,7 @@ start_supervisor_child(Mod) -> start_supervisor_child(Mod, []).
start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args).
start_supervisor_child(ChildId, Mod, Args) ->
- child_reply(supervisor:start_child(
+ child_reply(supervisor2:start_child(
?SERVER,
{ChildId, {Mod, start_link, Args},
transient, infinity, supervisor, [Mod]})).
@@ -85,20 +85,25 @@ start_delayed_restartable_child(M, A) -> start_restartable_child(M, A, true).
start_restartable_child(Mod, Args, Delay) ->
Name = list_to_atom(atom_to_list(Mod) ++ "_sup"),
- child_reply(supervisor:start_child(
+ child_reply(supervisor2:start_child(
?SERVER,
{Name, {rabbit_restartable_sup, start_link,
[Name, {Mod, start_link, Args}, Delay]},
transient, infinity, supervisor, [rabbit_restartable_sup]})).
stop_child(ChildId) ->
- case supervisor:terminate_child(?SERVER, ChildId) of
- ok -> supervisor:delete_child(?SERVER, ChildId);
+ case supervisor2:terminate_child(?SERVER, ChildId) of
+ ok -> supervisor2:delete_child(?SERVER, ChildId);
E -> E
end.
init([]) -> {ok, {{one_for_all, 0, 1}, []}}.
+prep_stop() ->
+ rabbit_log:info("Stopping dependencies...~n",[]),
+ Apps = rabbit_plugins:active(),
+ rabbit:stop_apps(app_utils:app_dependency_order(Apps, true)),
+ rabbit_log:info("Dependencies stopped...~n",[]).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index eae464219a..ed2143a2b9 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -262,7 +262,8 @@ maybe_upgrade_local() ->
maybe_migrate_queues_to_per_vhost_storage() ->
Result = case rabbit_version:upgrades_required(message_store) of
{error, version_not_available} -> version_not_available;
- {error, starting_from_scratch} -> starting_from_scratch;
+ {error, starting_from_scratch} ->
+ starting_from_scratch;
{error, _} = Err -> throw(Err);
{ok, []} -> ok;
{ok, Upgrades} -> apply_upgrades(message_store,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 300da96441..86321003c9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -28,19 +28,17 @@
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4, multiple_routing_keys/0]).
--export([start/1, stop/0]).
-
-%% exported for parallel map
--export([add_vhost_msg_store/1]).
+-export([start/2, stop/1]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/6]).
+-export([start_msg_store/3, stop_msg_store/1, init/6]).
-export([move_messages_to_vhost_store/0]).
--export([stop_vhost_msg_store/1]).
+
-include_lib("stdlib/include/qlc.hrl").
-define(QUEUE_MIGRATION_BATCH_SIZE, 100).
+-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}).
%%----------------------------------------------------------------------------
%% Messages, and their position in the queue, can be in memory or on
@@ -320,7 +318,10 @@
%% number of reduce_memory_usage executions, once it
%% reaches a threshold the queue will manually trigger a runtime GC
%% see: maybe_execute_gc/1
- memory_reduction_run_count
+ memory_reduction_run_count,
+ %% Queue data is grouped by VHost. We need to store it
+ %% to work with queue index.
+ virtual_host
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -345,8 +346,6 @@
}).
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
--define(PERSISTENT_MSG_STORE_SUP, msg_store_persistent_vhost).
--define(TRANSIENT_MSG_STORE_SUP, msg_store_transient_vhost).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
@@ -477,66 +476,37 @@ explicit_gc_run_operation_threshold_for_mode(Mode) ->
%% Public API
%%----------------------------------------------------------------------------
-start(DurableQueues) ->
- {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
+start(VHost, DurableQueues) ->
+ {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues),
%% Group recovery terms by vhost.
- {[], VhostRefs} = lists:foldl(
- fun
- %% We need to skip a queue name
- (non_clean_shutdown, {[_|QNames], VhostRefs}) ->
- {QNames, VhostRefs};
- (Terms, {[QueueName | QNames], VhostRefs}) ->
- case proplists:get_value(persistent_ref, Terms) of
- undefined -> {QNames, VhostRefs};
- Ref ->
- #resource{virtual_host = VHost} = QueueName,
- Refs = case maps:find(VHost, VhostRefs) of
- {ok, Val} -> Val;
- error -> []
- end,
- {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)}
- end
- end,
- {DurableQueues, #{}},
- AllTerms),
- start_msg_store(VhostRefs, StartFunState),
+ ClientRefs = [Ref || Terms <- AllTerms,
+ Terms /= non_clean_shutdown,
+ begin
+ Ref = proplists:get_value(persistent_ref, Terms),
+ Ref =/= undefined
+ end],
+ start_msg_store(VHost, ClientRefs, StartFunState),
{ok, AllTerms}.
-stop() ->
- ok = stop_msg_store(),
- ok = rabbit_queue_index:stop().
-
-start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined ->
- ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
- [?TRANSIENT_MSG_STORE_SUP,
- undefined, {fun (ok) -> finished end, ok}]),
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
- [?PERSISTENT_MSG_STORE_SUP, Refs, StartFunState]),
- %% Start message store for all known vhosts
- VHosts = rabbit_vhost:list(),
- %% TODO: recovery is limited by queue index recovery
- %% pool size. There is no point in parallelizing vhost
- %% recovery until there will be a queue index
- %% recovery pool per vhost
- lists:foreach(fun(Vhost) ->
- add_vhost_msg_store(Vhost)
- end,
- lists:sort(VHosts)),
- ok.
+stop(VHost) ->
+ ok = stop_msg_store(VHost),
+ ok = rabbit_queue_index:stop(VHost).
-add_vhost_msg_store(VHost) ->
+start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined ->
rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
- rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
- rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost),
+ {ok, _} = rabbit_vhost_msg_store:start(VHost,
+ ?TRANSIENT_MSG_STORE,
+ undefined,
+ ?EMPTY_START_FUN_STATE),
+ {ok, _} = rabbit_vhost_msg_store:start(VHost,
+ ?PERSISTENT_MSG_STORE,
+ Refs,
+ StartFunState),
rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]).
-stop_msg_store() ->
- ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP),
- ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP).
-
-stop_vhost_msg_store(VHost) ->
- rabbit_msg_store_vhost_sup:delete_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
- rabbit_msg_store_vhost_sup:delete_vhost(?PERSISTENT_MSG_STORE_SUP, VHost),
+stop_msg_store(VHost) ->
+ rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE),
+ rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE),
ok.
init(Queue, Recover, Callback) ->
@@ -555,12 +525,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
VHost = QueueName#resource.virtual_host,
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
- true -> msg_store_client_init(?PERSISTENT_MSG_STORE_SUP,
+ true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback, VHost);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined,
- AsyncCallback, VHost));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined,
+ AsyncCallback, VHost), VHost);
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
@@ -569,7 +539,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
VHost = QueueName#resource.virtual_host,
{PersistentClient, ContainsCheckFun} =
case IsDurable of
- true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, PRef,
+ true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback,
VHost),
{C, fun (MsgId) when is_binary(MsgId) ->
@@ -579,17 +549,18 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
end};
false -> {undefined, fun(_MsgId) -> false end}
end,
- TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP,
+ TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback,
VHost),
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
- rabbit_msg_store_vhost_sup:successfully_recovered_state(
- ?PERSISTENT_MSG_STORE_SUP, VHost),
+ rabbit_vhost_msg_store:successfully_recovered_state(
+ VHost,
+ ?PERSISTENT_MSG_STORE),
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
- PersistentClient, TransientClient).
+ PersistentClient, TransientClient, VHost).
process_recovery_terms(Terms=non_clean_shutdown) ->
{rabbit_guid:gen(), Terms};
@@ -600,23 +571,24 @@ process_recovery_terms(Terms) ->
end.
terminate(_Reason, State) ->
- State1 = #vqstate { persistent_count = PCount,
+ State1 = #vqstate { virtual_host = VHost,
+ persistent_count = PCount,
persistent_bytes = PBytes,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
purge_pending_ack(true, State),
PRef = case MSCStateP of
undefined -> undefined;
- _ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
+ _ -> ok = maybe_client_terminate(MSCStateP),
rabbit_msg_store:client_ref(MSCStateP)
end,
ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
Terms = [{persistent_ref, PRef},
{persistent_count, PCount},
{persistent_bytes, PBytes}],
- a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
- Terms, IndexState),
- msg_store_clients = undefined }).
+ a(State1#vqstate {
+ index_state = rabbit_queue_index:terminate(VHost, Terms, IndexState),
+ msg_store_clients = undefined }).
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
@@ -1270,12 +1242,12 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
Callback, VHost).
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
- CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP),
- rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun,
- fun () ->
- Callback(?MODULE, CloseFDsFun)
- end,
- VHost).
+ CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
+ rabbit_vhost_msg_store:client_init(VHost, MsgStore,
+ Ref, MsgOnDiskFun,
+ fun () ->
+ Callback(?MODULE, CloseFDsFun)
+ end).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
@@ -1379,7 +1351,7 @@ expand_delta(_SeqId, #delta { count = Count,
%%----------------------------------------------------------------------------
init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
- PersistentClient, TransientClient) ->
+ PersistentClient, TransientClient, VHost) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
{DeltaCount1, DeltaBytes1} =
@@ -1446,7 +1418,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
io_batch_size = IoBatchSize,
mode = default,
- memory_reduction_run_count = 0},
+ memory_reduction_run_count = 0,
+ virtual_host = VHost},
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
@@ -2771,16 +2744,21 @@ multiple_routing_keys() ->
%% Assumes message store is not running
transform_storage(TransformFun) ->
- transform_store(?PERSISTENT_MSG_STORE_SUP, TransformFun),
- transform_store(?TRANSIENT_MSG_STORE_SUP, TransformFun).
+ transform_store(?PERSISTENT_MSG_STORE, TransformFun),
+ transform_store(?TRANSIENT_MSG_STORE, TransformFun).
transform_store(Store, TransformFun) ->
rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
move_messages_to_vhost_store() ->
+ case list_persistent_queues() of
+ % [] -> ok;
+ Queues -> move_messages_to_vhost_store(Queues)
+ end.
+
+move_messages_to_vhost_store(Queues) ->
log_upgrade("Moving messages to per-vhost message store"),
- Queues = list_persistent_queues(),
%% Move the queue index for each persistent queue to the new store
lists:foreach(
fun(Queue) ->
@@ -2791,30 +2769,30 @@ move_messages_to_vhost_store() ->
%% Legacy (global) msg_store may require recovery.
%% This upgrade step should only be started
%% if we are upgrading from a pre-3.7.0 version.
- {QueuesWithTerms, RecoveryRefs, StartFunState} = start_recovery_terms(Queues),
+ {QueuesWithTerms, RecoveryRefs, StartFunState} = read_old_recovery_terms(Queues),
OldStore = run_old_persistent_store(RecoveryRefs, StartFunState),
+
+ VHosts = rabbit_vhost:list(),
+
%% New store should not be recovered.
- NewStoreSup = start_new_store_sup(),
- Vhosts = rabbit_vhost:list(),
- lists:foreach(fun(VHost) ->
- rabbit_msg_store_vhost_sup:add_vhost(NewStoreSup, VHost)
- end,
- Vhosts),
+ NewMsgStore = start_new_store(VHosts),
+ %% Recovery terms should be started for all vhosts for new store.
+ [{ok, _} = rabbit_recovery_terms:open_table(VHost) || VHost <- VHosts],
+
MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size,
?QUEUE_MIGRATION_BATCH_SIZE),
in_batches(MigrationBatchSize,
- {rabbit_variable_queue, migrate_queue, [OldStore, NewStoreSup]},
+ {rabbit_variable_queue, migrate_queue, [OldStore, NewMsgStore]},
QueuesWithTerms,
"message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ~n",
"message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"),
log_upgrade("Message store migration finished"),
- delete_old_store(OldStore),
-
- ok = rabbit_queue_index:stop(),
- ok = rabbit_sup:stop_child(NewStoreSup),
- ok.
+ ok = delete_old_store(OldStore),
+ ok = rabbit_queue_index:cleanup_global_recovery_terms(),
+ [ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts],
+ ok = stop_new_store(NewMsgStore).
in_batches(Size, MFA, List, MessageStart, MessageEnd) ->
in_batches(Size, 1, MFA, List, MessageStart, MessageEnd).
@@ -2840,12 +2818,14 @@ in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) ->
rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]),
in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd).
-migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore, NewStoreSup) ->
+migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name},
+ RecoveryTerm},
+ OldStore, NewStore) ->
log_upgrade_verbose(
"Migrating messages in queue ~s in vhost ~s to per-vhost message store~n",
[Name, VHost]),
OldStoreClient = get_global_store_client(OldStore),
- NewStoreClient = get_per_vhost_store_client(QueueName, NewStoreSup),
+ NewStoreClient = get_per_vhost_store_client(QueueName, NewStore),
%% WARNING: During scan_queue_segments queue index state is being recovered
%% and terminated. This can cause side effects!
rabbit_queue_index:scan_queue_segments(
@@ -2881,12 +2861,10 @@ migrate_message(MsgId, OldC, NewC) ->
_ -> OldC
end.
-get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStoreSup) ->
- rabbit_msg_store_vhost_sup:client_init(NewStoreSup,
- rabbit_guid:gen(),
- fun(_,_) -> ok end,
- fun() -> ok end,
- VHost).
+get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStore) ->
+ {VHost, StorePid} = lists:keyfind(VHost, 1, NewStore),
+ rabbit_msg_store:client_init(StorePid, rabbit_guid:gen(),
+ fun(_,_) -> ok end, fun() -> ok end).
get_global_store_client(OldStore) ->
rabbit_msg_store:client_init(OldStore,
@@ -2905,9 +2883,11 @@ list_persistent_queues() ->
mnesia:read(rabbit_queue, Name, read) =:= []]))
end).
-start_recovery_terms(Queues) ->
+read_old_recovery_terms([]) ->
+ {[], [], ?EMPTY_START_FUN_STATE};
+read_old_recovery_terms(Queues) ->
QueueNames = [Name || #amqqueue{name = Name} <- Queues],
- {AllTerms, StartFunState} = rabbit_queue_index:start(QueueNames),
+ {AllTerms, StartFunState} = rabbit_queue_index:read_global_recovery_terms(QueueNames),
Refs = [Ref || Terms <- AllTerms,
Terms /= non_clean_shutdown,
begin
@@ -2923,13 +2903,25 @@ run_old_persistent_store(Refs, StartFunState) ->
Refs, StartFunState]),
OldStoreName.
-start_new_store_sup() ->
- % Start persistent store sup without recovery.
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP,
- rabbit_msg_store_vhost_sup,
- [?PERSISTENT_MSG_STORE_SUP,
- undefined, {fun (ok) -> finished end, ok}]),
- ?PERSISTENT_MSG_STORE_SUP.
+start_new_store(VHosts) ->
+ %% Ensure vhost supervisor is started, so we can add vhsots to it.
+ lists:map(fun(VHost) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ {ok, Pid} = rabbit_msg_store:start_link(?PERSISTENT_MSG_STORE,
+ VHostDir,
+ undefined,
+ ?EMPTY_START_FUN_STATE),
+ {VHost, Pid}
+ end,
+ VHosts).
+
+stop_new_store(NewStore) ->
+ lists:foreach(fun({_VHost, StorePid}) ->
+ unlink(StorePid),
+ exit(StorePid, shutdown)
+ end,
+ NewStore),
+ ok.
delete_old_store(OldStore) ->
ok = rabbit_sup:stop_child(OldStore),
@@ -2937,7 +2929,8 @@ delete_old_store(OldStore) ->
[filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]),
%% Delete old transient store as well
rabbit_file:recursive_delete(
- [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]).
+ [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]),
+ ok.
log_upgrade(Msg) ->
log_upgrade(Msg, []).
@@ -2950,3 +2943,14 @@ log_upgrade_verbose(Msg) ->
log_upgrade_verbose(Msg, Args) ->
rabbit_log_upgrade:info(Msg, Args).
+
+maybe_client_terminate(MSCStateP) ->
+ %% Queue might have been asked to stop by the supervisor, it needs a clean
+ %% shutdown in order for the supervising strategy to work - if it reaches max
+ %% restarts might bring the vhost down.
+ try
+ rabbit_msg_store:client_terminate(MSCStateP)
+ catch
+ _:_ ->
+ ok
+ end.
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 5ed23d9114..1d1ea16cca 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -20,11 +20,12 @@
%%----------------------------------------------------------------------------
+-export([recover/0, recover/1]).
-export([add/2, delete/2, exists/1, list/0, with/2, assert/1, update/2,
set_limits/2, limits_of/1]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
--export([purge_messages/1]).
+-export([delete_storage/1]).
-spec add(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
@@ -42,6 +43,32 @@
-spec info_all(rabbit_types:info_keys(), reference(), pid()) ->
'ok'.
+recover() ->
+ %% Clear out remnants of old incarnation, in case we restarted
+ %% faster than other nodes handled DOWN messages from us.
+ rabbit_amqqueue:on_node_down(node()),
+
+ rabbit_amqqueue:warn_file_limit(),
+ %% rabbit_vhost_sup_sup will start the actual recovery.
+ %% So recovery will be run every time a vhost supervisor is restarted.
+ ok = rabbit_vhost_sup_sup:start(),
+ [{ok, _} = rabbit_vhost_sup_sup:vhost_sup(VHost)
+ || VHost <- rabbit_vhost:list()],
+ ok.
+
+recover(VHost) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ rabbit_log:info("Making sure data directory '~s' for vhost '~s' exists~n",
+ [VHostDir, VHost]),
+ VHostStubFile = filename:join(VHostDir, ".vhost"),
+ ok = rabbit_file:ensure_dir(VHostStubFile),
+ ok = file:write_file(VHostStubFile, VHost),
+ Qs = rabbit_amqqueue:recover(VHost),
+ ok = rabbit_binding:recover(rabbit_exchange:recover(VHost),
+ [QName || #amqqueue{name = QName} <- Qs]),
+ ok = rabbit_amqqueue:start(Qs),
+ ok.
+
%%----------------------------------------------------------------------------
-define(INFO_KEYS, [name, tracing]).
@@ -75,6 +102,7 @@ add(VHostPath, ActingUser) ->
{<<"amq.rabbitmq.trace">>, topic, true}]],
ok
end),
+ ok = rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath),
rabbit_event:notify(vhost_created, info(VHostPath)
++ [{user_who_performed_action, ActingUser}]),
R.
@@ -96,17 +124,16 @@ delete(VHostPath, ActingUser) ->
ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath},
{user_who_performed_action, ActingUser}]),
[ok = Fun() || Fun <- Funs],
+ %% After vhost was deleted from mnesia DB, we try to stop vhost supervisors
+ %% on all the nodes.
+ rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath),
ok.
-purge_messages(VHost) ->
+delete_storage(VHost) ->
VhostDir = msg_store_dir_path(VHost),
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
- %% Message store is stopped to close file handles
- rabbit_variable_queue:stop_vhost_msg_store(VHost),
- ok = rabbit_file:recursive_delete([VhostDir]),
- %% Ensure the store is terminated even if it was restarted during the delete operation
- %% above.
- rabbit_variable_queue:stop_vhost_msg_store(VHost).
+ %% Message store should be closed when vhost supervisor is closed.
+ ok = rabbit_file:recursive_delete([VhostDir]).
assert_benign(ok, _) -> ok;
assert_benign({ok, _}, _) -> ok;
@@ -134,7 +161,6 @@ internal_delete(VHostPath, ActingUser) ->
Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info), ActingUser)
|| Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
- purge_messages(VHostPath),
Fs1 ++ Fs2.
exists(VHostPath) ->
diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl
new file mode 100644
index 0000000000..482ad082b8
--- /dev/null
+++ b/src/rabbit_vhost_msg_store.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_vhost_msg_store).
+
+-include("rabbit.hrl").
+
+-export([start/4, stop/2, client_init/5, successfully_recovered_state/2]).
+
+
+start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
+ ClientRefs == undefined ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ supervisor2:start_child(VHostSup,
+ {Type, {rabbit_msg_store, start_link,
+ [Type, VHostDir, ClientRefs, StartupFunState]},
+ transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}).
+
+stop(VHost, Type) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ ok = supervisor2:terminate_child(VHostSup, Type),
+ ok = supervisor2:delete_child(VHostSup, Type).
+
+client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) ->
+ with_vhost_store(VHost, Type, fun(StorePid) ->
+ rabbit_msg_store:client_init(StorePid, Ref, MsgOnDiskFun, CloseFDsFun)
+ end).
+
+with_vhost_store(VHost, Type, Fun) ->
+ case vhost_store_pid(VHost, Type) of
+ no_pid ->
+ throw({message_store_not_started, Type, VHost});
+ Pid when is_pid(Pid) ->
+ Fun(Pid)
+ end.
+
+vhost_store_pid(VHost, Type) ->
+ {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ case supervisor2:find_child(VHostSup, Type) of
+ [Pid] -> Pid;
+ [] -> no_pid
+ end.
+
+successfully_recovered_state(VHost, Type) ->
+ with_vhost_store(VHost, Type, fun(StorePid) ->
+ rabbit_msg_store:successfully_recovered_state(StorePid)
+ end). \ No newline at end of file
diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl
new file mode 100644
index 0000000000..b8c7a649e5
--- /dev/null
+++ b/src/rabbit_vhost_sup.erl
@@ -0,0 +1,34 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_vhost_sup).
+
+-include("rabbit.hrl").
+
+%% Supervisor is a per-vhost supervisor to contain queues and message stores
+-behaviour(supervisor2).
+-export([init/1]).
+-export([start_link/1]).
+
+start_link(VHost) ->
+ supervisor2:start_link(?MODULE, [VHost]).
+
+init([VHost]) ->
+ {ok, {{one_for_all, 0, 1},
+ [{rabbit_vhost_sup_watcher,
+ {rabbit_vhost_sup_watcher, start_link, [VHost]},
+ intrinsic, ?WORKER_WAIT, worker,
+ [rabbit_vhost_sup]}]}}.
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
new file mode 100644
index 0000000000..e528d64e0e
--- /dev/null
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -0,0 +1,171 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_vhost_sup_sup).
+
+-include("rabbit.hrl").
+
+-behaviour(supervisor2).
+
+-export([init/1]).
+
+-export([start_link/0, start/0]).
+-export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
+-export([delete_on_all_nodes/1]).
+-export([start_vhost/1, start_vhost/2, start_on_all_nodes/1]).
+
+%% Internal
+-export([stop_and_delete_vhost/1]).
+
+-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}).
+
+start() ->
+ case supervisor:start_child(rabbit_sup, {?MODULE, {?MODULE, start_link, []},
+ permanent, infinity, supervisor, [?MODULE]}) of
+ {ok, _} -> ok;
+ {error, Err} -> {error, Err}
+ end.
+
+start_link() ->
+ supervisor2:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+ VhostRestart = case application:get_env(rabbit, vhost_restart_strategy, stop_node) of
+ ignore -> transient;
+ stop_node -> permanent;
+ transient -> transient;
+ permanent -> permanent
+ end,
+
+ ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]),
+ {ok, {{simple_one_for_one, 0, 5},
+ [{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []},
+ VhostRestart, infinity, supervisor,
+ [rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}.
+
+start_on_all_nodes(VHost) ->
+ [ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
+ ok.
+
+delete_on_all_nodes(VHost) ->
+ [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
+ ok.
+
+start_vhost(VHost, Node) when Node == node(self()) ->
+ start_vhost(VHost);
+start_vhost(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid};
+ {badrpc, RpcErr} ->
+ {error, RpcErr}
+ end.
+
+start_vhost(VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ false -> {error, {no_such_vhost, VHost}};
+ true ->
+ case vhost_sup_pid(VHost) of
+ no_pid ->
+ case supervisor2:start_child(?MODULE, [VHost]) of
+ {ok, _} -> ok;
+ {error, {already_started, _}} -> ok;
+ Error -> throw(Error)
+ end,
+ {ok, _} = vhost_sup_pid(VHost);
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid}
+ end
+ end.
+
+stop_and_delete_vhost(VHost) ->
+ case get_vhost_sup(VHost) of
+ not_found -> ok;
+ #vhost_sup{wrapper_pid = WrapperPid,
+ vhost_sup_pid = VHostSupPid} = VHostSup ->
+ case is_process_alive(WrapperPid) of
+ false -> ok;
+ true ->
+ rabbit_log:info("Stopping vhost supervisor ~p"
+ " for vhost ~p~n",
+ [VHostSupPid, VHost]),
+ case supervisor2:terminate_child(?MODULE, WrapperPid) of
+ ok ->
+ ets:delete_object(?MODULE, VHostSup),
+ ok = rabbit_vhost:delete_storage(VHost);
+ Other ->
+ Other
+ end
+ end
+ end.
+
+%% We take an optimistic approach whan stopping a remote VHost supervisor.
+stop_and_delete_vhost(VHost, Node) when Node == node(self()) ->
+ stop_and_delete_vhost(VHost);
+stop_and_delete_vhost(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, stop_and_delete_vhost, [VHost]) of
+ ok -> ok;
+ {badrpc, RpcErr} ->
+ rabbit_log:error("Failed to stop and delete a vhost ~p"
+ " on node ~p."
+ " Reason: ~p",
+ [VHost, Node, RpcErr]),
+ {error, RpcErr}
+ end.
+
+-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}.
+vhost_sup(VHost, Local) when Local == node(self()) ->
+ vhost_sup(VHost);
+vhost_sup(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid};
+ {badrpc, RpcErr} ->
+ {error, RpcErr}
+ end.
+
+-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}.
+vhost_sup(VHost) ->
+ start_vhost(VHost).
+
+-spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok.
+save_vhost_sup(VHost, WrapperPid, VHostPid) ->
+ true = ets:insert(?MODULE, #vhost_sup{vhost = VHost,
+ vhost_sup_pid = VHostPid,
+ wrapper_pid = WrapperPid}),
+ ok.
+
+-spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
+get_vhost_sup(VHost) ->
+ case ets:lookup(?MODULE, VHost) of
+ [] -> not_found;
+ [#vhost_sup{} = VHostSup] -> VHostSup
+ end.
+
+-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}.
+vhost_sup_pid(VHost) ->
+ case get_vhost_sup(VHost) of
+ not_found ->
+ no_pid;
+ #vhost_sup{vhost_sup_pid = Pid} = VHostSup ->
+ case erlang:is_process_alive(Pid) of
+ true -> {ok, Pid};
+ false ->
+ ets:delete_object(?MODULE, VHostSup),
+ no_pid
+ end
+ end.
+
diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_sup_watcher.erl
new file mode 100644
index 0000000000..3ce726621f
--- /dev/null
+++ b/src/rabbit_vhost_sup_watcher.erl
@@ -0,0 +1,66 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+%% This module implements a watcher process which should stop
+%% the parent supervisor if its vhost is missing from the mnesia DB
+
+-module(rabbit_vhost_sup_watcher).
+
+-include("rabbit.hrl").
+
+-define(TICKTIME_RATIO, 4).
+
+-behaviour(gen_server2).
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+
+start_link(VHost) ->
+ gen_server2:start_link(?MODULE, [VHost], []).
+
+
+init([VHost]) ->
+ Interval = interval(),
+ timer:send_interval(Interval, check_vhost),
+ {ok, VHost}.
+
+handle_call(_,_,VHost) ->
+ {reply, ok, VHost}.
+
+handle_cast(_, VHost) ->
+ {noreply, VHost}.
+
+handle_info(check_vhost, VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ true -> {noreply, VHost};
+ false ->
+ rabbit_log:error(" Vhost \"~p\" is gone."
+ " Stopping message store supervisor.",
+ [VHost]),
+ {stop, normal, VHost}
+ end;
+handle_info(_, VHost) ->
+ {noreply, VHost}.
+
+terminate(_, _) -> ok.
+
+code_change(_OldVsn, VHost, _Extra) ->
+ {ok, VHost}.
+
+interval() ->
+ application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO. \ No newline at end of file
diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl
new file mode 100644
index 0000000000..3396f71fa9
--- /dev/null
+++ b/src/rabbit_vhost_sup_wrapper.erl
@@ -0,0 +1,53 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_vhost_sup_wrapper).
+
+-include("rabbit.hrl").
+
+-behaviour(supervisor2).
+-export([init/1]).
+-export([start_link/1]).
+-export([start_vhost_sup/1]).
+
+start_link(VHost) ->
+ supervisor2:start_link(?MODULE, [VHost]).
+
+%% This module is a wrapper around vhost supervisor to
+%% provide exactly once restart.
+
+%% rabbit_vhost_sup supervisor children are added dynamically,
+%% so one_for_all strategy cannot be used.
+
+init([VHost]) ->
+ %% Two restarts in 1 hour. One per message store.
+ {ok, {{one_for_all, 2, 3600000},
+ [{rabbit_vhost_sup,
+ {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
+ permanent, infinity, supervisor,
+ [rabbit_vhost_sup]}]}}.
+
+start_vhost_sup(VHost) ->
+ case rabbit_vhost_sup:start_link(VHost) of
+ {ok, Pid} ->
+ %% Save vhost sup record with wrapper pid and vhost sup pid.
+ ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid),
+ %% We can start recover as soon as we have vhost_sup record saved
+ ok = rabbit_vhost:recover(VHost),
+ {ok, Pid};
+ Other ->
+ Other
+ end.
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 326e0491d0..b65536c0d4 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -41,17 +41,18 @@ memory() ->
[aggregate(Names, Sums, memory, fun (X) -> X end)
|| Names <- distinguished_interesting_sups()],
- Mnesia = mnesia_memory(),
- MsgIndexETS = ets_memory([msg_store_persistent_vhost, msg_store_transient_vhost]),
- MetricsETS = ets_memory([rabbit_metrics]),
- MetricsProc = try
- [{_, M}] = process_info(whereis(rabbit_metrics), [memory]),
- M
- catch
- error:badarg ->
- 0
- end,
- MgmtDbETS = ets_memory([rabbit_mgmt_storage]),
+ Mnesia = mnesia_memory(),
+ MsgIndexETS = ets_memory(msg_stores()),
+ MetricsETS = ets_memory([rabbit_metrics]),
+ MetricsProc =
+ try
+ [{_, M}] = process_info(whereis(rabbit_metrics), [memory]),
+ M
+ catch
+ error:badarg ->
+ 0
+ end,
+ MgmtDbETS = ets_memory([rabbit_mgmt_storage]),
[{total, Total},
{processes, Processes},
@@ -124,8 +125,8 @@ mnesia_memory() ->
_ -> 0
end.
-ets_memory(OwnerNames) ->
- lists:sum([V || {_K, V} <- ets_tables_memory(OwnerNames)]).
+ets_memory(Owners) ->
+ lists:sum([V || {_K, V} <- ets_tables_memory(Owners)]).
ets_tables_memory(all) ->
[{ets:info(T, name), bytes(ets:info(T, memory))}
@@ -133,11 +134,14 @@ ets_tables_memory(all) ->
is_atom(T)];
ets_tables_memory(OwnerName) when is_atom(OwnerName) ->
ets_tables_memory([OwnerName]);
-ets_tables_memory(OwnerNames) when is_list(OwnerNames) ->
- Owners = [whereis(N) || N <- OwnerNames],
+ets_tables_memory(Owners) when is_list(Owners) ->
+ OwnerPids = lists:map(fun(O) when is_pid(O) -> O;
+ (O) when is_atom(O) -> whereis(O)
+ end,
+ Owners),
[{ets:info(T, name), bytes(ets:info(T, memory))}
|| T <- ets:all(),
- lists:member(ets:info(T, owner), Owners)].
+ lists:member(ets:info(T, owner), OwnerPids)].
bytes(Words) -> try
Words * erlang:system_info(wordsize)
@@ -146,10 +150,37 @@ bytes(Words) -> try
end.
interesting_sups() ->
- [[rabbit_amqqueue_sup_sup], conn_sups() | interesting_sups0()].
+ [queue_sups(), conn_sups() | interesting_sups0()].
+
+queue_sups() ->
+ all_vhosts_children(rabbit_amqqueue_sup_sup).
+
+msg_stores() ->
+ all_vhosts_children(msg_store_transient)
+ ++
+ all_vhosts_children(msg_store_persistent).
+
+all_vhosts_children(Name) ->
+ case whereis(rabbit_vhost_sup_sup) of
+ undefined -> [];
+ Pid when is_pid(Pid) ->
+ lists:filtermap(
+ fun({_, VHostSupWrapper, _, _}) ->
+ case supervisor2:find_child(VHostSupWrapper,
+ rabbit_vhost_sup) of
+ [] -> false;
+ [VHostSup] ->
+ case supervisor2:find_child(VHostSup, Name) of
+ [QSup] -> {true, QSup};
+ [] -> false
+ end
+ end
+ end,
+ supervisor:which_children(rabbit_vhost_sup_sup))
+ end.
interesting_sups0() ->
- MsgIndexProcs = [msg_store_transient_vhost, msg_store_persistent_vhost],
+ MsgIndexProcs = msg_stores(),
MgmtDbProcs = [rabbit_mgmt_sup_sup],
PluginProcs = plugin_sups(),
[MsgIndexProcs, MgmtDbProcs, PluginProcs].
@@ -166,18 +197,19 @@ ranch_server_sups() ->
error:badarg -> []
end.
-conn_sups(With) -> [{Sup, With} || Sup <- conn_sups()].
+with(Sups, With) -> [{Sup, With} || Sup <- Sups].
-distinguishers() -> [{rabbit_amqqueue_sup_sup, fun queue_type/1} |
- conn_sups(fun conn_type/1)].
+distinguishers() -> with(queue_sups(), fun queue_type/1) ++
+ with(conn_sups(), fun conn_type/1).
distinguished_interesting_sups() ->
- [[{rabbit_amqqueue_sup_sup, master}],
- [{rabbit_amqqueue_sup_sup, slave}],
- conn_sups(reader),
- conn_sups(writer),
- conn_sups(channel),
- conn_sups(other)]
+ [
+ with(queue_sups(), master),
+ with(queue_sups(), slave),
+ with(conn_sups(), reader),
+ with(conn_sups(), writer),
+ with(conn_sups(), channel),
+ with(conn_sups(), other)]
++ interesting_sups0().
plugin_sups() ->
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index e17b2afbf3..3ff215f497 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -21,10 +21,11 @@
-compile(export_all).
--define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost).
--define(TRANSIENT_MSG_STORE, msg_store_transient_vhost).
+-define(PERSISTENT_MSG_STORE, msg_store_persistent).
+-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(TIMEOUT, 30000).
+-define(VHOST, <<"/">>).
-define(VARIABLE_QUEUE_TESTCASES, [
variable_queue_dynamic_duration_change,
@@ -253,9 +254,9 @@ msg_store1(_Config) ->
MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3),
ok = rabbit_msg_store:client_terminate(MSCState4),
%% stop and restart, preserving every other msg in 2nd half
- ok = rabbit_variable_queue:stop_msg_store(),
- ok = rabbit_variable_queue:start_msg_store(
- #{}, {fun ([]) -> finished;
+ ok = rabbit_variable_queue:stop_msg_store(?VHOST),
+ ok = rabbit_variable_queue:start_msg_store(?VHOST,
+ [], {fun ([]) -> finished;
([MsgId|MsgIdsTail])
when length(MsgIdsTail) rem 2 == 0 ->
{MsgId, 1, MsgIdsTail};
@@ -330,8 +331,8 @@ msg_store1(_Config) ->
passed.
restart_msg_store_empty() ->
- ok = rabbit_variable_queue:stop_msg_store(),
- ok = rabbit_variable_queue:start_msg_store(
+ ok = rabbit_variable_queue:stop_msg_store(?VHOST),
+ ok = rabbit_variable_queue:start_msg_store(?VHOST,
undefined, {fun (ok) -> finished end, ok}).
msg_id_bin(X) ->
@@ -376,10 +377,10 @@ on_disk_stop(Pid) ->
msg_store_client_init_capture(MsgStore, Ref) ->
Pid = spawn(fun on_disk_capture/0),
- {Pid, rabbit_msg_store_vhost_sup:client_init(
- MsgStore, Ref, fun (MsgIds, _ActionTaken) ->
- Pid ! {on_disk, MsgIds}
- end, undefined, <<"/">>)}.
+ {Pid, rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref,
+ fun (MsgIds, _ActionTaken) ->
+ Pid ! {on_disk, MsgIds}
+ end, undefined)}.
msg_store_contains(Atom, MsgIds, MSCState) ->
Atom = lists:foldl(
@@ -456,14 +457,16 @@ test_msg_store_confirm_timer() ->
Ref = rabbit_guid:gen(),
MsgId = msg_id_bin(1),
Self = self(),
- MSCState = rabbit_msg_store_vhost_sup:client_init(
- ?PERSISTENT_MSG_STORE, Ref,
- fun (MsgIds, _ActionTaken) ->
- case gb_sets:is_member(MsgId, MsgIds) of
- true -> Self ! on_disk;
- false -> ok
- end
- end, undefined, <<"/">>),
+ MSCState = rabbit_vhost_msg_store:client_init(
+ ?VHOST,
+ ?PERSISTENT_MSG_STORE,
+ Ref,
+ fun (MsgIds, _ActionTaken) ->
+ case gb_sets:is_member(MsgId, MsgIds) of
+ true -> Self ! on_disk;
+ false -> ok
+ end
+ end, undefined),
ok = msg_store_write([MsgId], MSCState),
ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false),
ok = msg_store_remove([MsgId], MSCState),
@@ -651,8 +654,8 @@ bq_queue_index1(_Config) ->
Qi8
end),
- ok = rabbit_variable_queue:stop(),
- {ok, _} = rabbit_variable_queue:start([]),
+ ok = rabbit_variable_queue:stop(?VHOST),
+ {ok, _} = rabbit_variable_queue:start(?VHOST, []),
passed.
@@ -672,8 +675,8 @@ bq_queue_index_props1(_Config) ->
Qi2
end),
- ok = rabbit_variable_queue:stop(),
- {ok, _} = rabbit_variable_queue:start([]),
+ ok = rabbit_variable_queue:stop(?VHOST),
+ {ok, _} = rabbit_variable_queue:start(?VHOST, []),
passed.
@@ -718,7 +721,7 @@ bq_queue_recover1(Config) ->
true, false, [], none, <<"acting-user">>),
publish_and_confirm(Q, <<>>, Count),
- SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(QPid),
+ SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(Q),
true = is_pid(SupPid),
exit(SupPid, kill),
exit(QPid, kill),
@@ -726,8 +729,8 @@ bq_queue_recover1(Config) ->
receive {'DOWN', MRef, process, QPid, _Info} -> ok
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
- rabbit_amqqueue:stop(),
- rabbit_amqqueue:start(rabbit_amqqueue:recover()),
+ rabbit_amqqueue:stop(?VHOST),
+ rabbit_amqqueue:start(rabbit_amqqueue:recover(?VHOST)),
{ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,
@@ -1275,14 +1278,14 @@ init_test_queue(QName) ->
Res.
restart_test_queue(Qi, QName) ->
- _ = rabbit_queue_index:terminate([], Qi),
- ok = rabbit_variable_queue:stop(),
- {ok, _} = rabbit_variable_queue:start([QName]),
+ _ = rabbit_queue_index:terminate(?VHOST, [], Qi),
+ ok = rabbit_variable_queue:stop(?VHOST),
+ {ok, _} = rabbit_variable_queue:start(?VHOST, [QName]),
init_test_queue(QName).
empty_test_queue(QName) ->
- ok = rabbit_variable_queue:stop(),
- {ok, _} = rabbit_variable_queue:start([]),
+ ok = rabbit_variable_queue:stop(?VHOST),
+ {ok, _} = rabbit_variable_queue:start(?VHOST, []),
{0, 0, Qi} = init_test_queue(QName),
_ = rabbit_queue_index:delete_and_terminate(Qi),
ok.
@@ -1337,7 +1340,7 @@ nop(_) -> ok.
nop(_, _) -> ok.
msg_store_client_init(MsgStore, Ref) ->
- rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>).
+ rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref, undefined, undefined).
variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl
index 10d97726f4..09d7ab4e95 100644
--- a/test/channel_operation_timeout_test_queue.erl
+++ b/test/channel_operation_timeout_test_queue.erl
@@ -28,10 +28,10 @@
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4, multiple_routing_keys/0]).
--export([start/1, stop/0]).
+-export([start/2, stop/1]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/6]).
+-export([start_msg_store/3, stop_msg_store/1, init/6]).
%%----------------------------------------------------------------------------
%% This test backing queue follows the variable queue implementation, with
@@ -87,7 +87,8 @@
io_batch_size,
%% default queue or lazy queue
- mode
+ mode,
+ virtual_host
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -111,10 +112,11 @@
}).
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
--define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost).
--define(TRANSIENT_MSG_STORE, msg_store_transient_vhost).
+-define(PERSISTENT_MSG_STORE, msg_store_persistent).
+-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(QUEUE, lqueue).
-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
+-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -184,7 +186,8 @@
disk_write_count :: non_neg_integer(),
io_batch_size :: pos_integer(),
- mode :: 'default' | 'lazy' }.
+ mode :: 'default' | 'lazy',
+ virtual_host :: rabbit_types:vhost() }.
%% Duplicated from rabbit_backing_queue
-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
@@ -213,55 +216,39 @@
%% Public API
%%----------------------------------------------------------------------------
-start(DurableQueues) ->
- {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
+start(VHost, DurableQueues) ->
+ {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues),
%% Group recovery terms by vhost.
- {[], VhostRefs} = lists:foldl(
- fun
- %% We need to skip a queue name
- (non_clean_shutdown, {[_|QNames], VhostRefs}) ->
- {QNames, VhostRefs};
- (Terms, {[QueueName | QNames], VhostRefs}) ->
- case proplists:get_value(persistent_ref, Terms) of
- undefined -> {QNames, VhostRefs};
- Ref ->
- #resource{virtual_host = VHost} = QueueName,
- Refs = case maps:find(VHost, VhostRefs) of
- {ok, Val} -> Val;
- error -> []
- end,
- {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)}
- end
- end,
- {DurableQueues, #{}},
- AllTerms),
- start_msg_store(VhostRefs, StartFunState),
+ ClientRefs = [Ref || Terms <- AllTerms,
+ Terms /= non_clean_shutdown,
+ begin
+ Ref = proplists:get_value(persistent_ref, Terms),
+ Ref =/= undefined
+ end],
+ start_msg_store(VHost, ClientRefs, StartFunState),
{ok, AllTerms}.
-stop() ->
- ok = stop_msg_store(),
- ok = rabbit_queue_index:stop().
-
-start_msg_store(Refs, StartFunState) ->
- ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup,
- [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
- undefined, {fun (ok) -> finished end, ok}]),
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
- [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
- Refs, StartFunState]),
- %% Start message store for all known vhosts
- VHosts = rabbit_vhost:list(),
- lists:foreach(
- fun(VHost) ->
- rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost),
- rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost)
- end,
- VHosts),
+stop(VHost) ->
+ ok = stop_msg_store(VHost),
+ ok = rabbit_queue_index:stop(VHost).
+
+start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined ->
+ rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
+ {ok, _} = rabbit_vhost_msg_store:start(VHost,
+ ?TRANSIENT_MSG_STORE,
+ undefined,
+ ?EMPTY_START_FUN_STATE),
+ {ok, _} = rabbit_vhost_msg_store:start(VHost,
+ ?PERSISTENT_MSG_STORE,
+ Refs,
+ StartFunState),
+ rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]).
+
+stop_msg_store(VHost) ->
+ rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE),
+ rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE),
ok.
-stop_msg_store() ->
- ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
- ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
init(Queue, Recover, Callback) ->
init(
@@ -284,7 +271,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
VHost);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost), VHost);
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
@@ -309,10 +296,10 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
- rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost),
+ rabbit_vhost_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost),
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
- PersistentClient, TransientClient).
+ PersistentClient, TransientClient, VHost).
process_recovery_terms(Terms=non_clean_shutdown) ->
{rabbit_guid:gen(), Terms};
@@ -326,7 +313,8 @@ terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
persistent_bytes = PBytes,
index_state = IndexState,
- msg_store_clients = {MSCStateP, MSCStateT} } =
+ msg_store_clients = {MSCStateP, MSCStateT},
+ virtual_host = VHost } =
purge_pending_ack(true, State),
PRef = case MSCStateP of
undefined -> undefined;
@@ -338,7 +326,7 @@ terminate(_Reason, State) ->
{persistent_count, PCount},
{persistent_bytes, PBytes}],
a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
- Terms, IndexState),
+ VHost, Terms, IndexState),
msg_store_clients = undefined }).
%% the only difference between purge and delete is that delete also
@@ -990,10 +978,9 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
- rabbit_msg_store_vhost_sup:client_init(
+ rabbit_vhost_msg_store:client_init(VHost,
MsgStore, Ref, MsgOnDiskFun,
- fun () -> Callback(?MODULE, CloseFDsFun) end,
- VHost).
+ fun () -> Callback(?MODULE, CloseFDsFun) end).
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
@@ -1084,7 +1071,7 @@ expand_delta(_SeqId, #delta { count = Count } = Delta) ->
%%----------------------------------------------------------------------------
init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
- PersistentClient, TransientClient) ->
+ PersistentClient, TransientClient, VHost) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
{DeltaCount1, DeltaBytes1} =
@@ -1148,7 +1135,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
io_batch_size = IoBatchSize,
- mode = default },
+ mode = default,
+ virtual_host = VHost },
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl
index 457a4110fa..6e78c1579f 100644
--- a/test/crashing_queues_SUITE.erl
+++ b/test/crashing_queues_SUITE.erl
@@ -218,12 +218,18 @@ kill_queue(Node, QName) ->
queue_pid(Node, QName) ->
#amqqueue{pid = QPid,
- state = State} = lookup(Node, QName),
+ state = State,
+ name = #resource{virtual_host = VHost}} = lookup(Node, QName),
case State of
- crashed -> case sup_child(Node, rabbit_amqqueue_sup_sup) of
+ crashed ->
+ case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of
+ {error, {queue_supervisor_not_found, Result}} -> {error, no_sup};
+ {ok, SPid} ->
+ case sup_child(Node, SPid) of
{ok, _} -> QPid; %% restarting
{error, no_child} -> crashed %% given up
- end;
+ end
+ end;
_ -> QPid
end.