summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl11
-rw-r--r--src/rabbit_msg_store_vhost_sup.erl7
-rw-r--r--src/rabbit_sup.erl10
-rw-r--r--src/rabbit_variable_queue.erl10
-rw-r--r--src/rabbit_vhost.erl30
5 files changed, 43 insertions, 25 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 1e929c7f7c..cb87af70e1 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/4, successfully_recovered_state/1,
+-export([start_link/4, start_global_store_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
write/3, write_flow/3, read/2, contains/2, remove/2]).
@@ -479,10 +479,15 @@ start_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) ->
[Name, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).
+start_global_store_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) ->
+ gen_server2:start_link({local, Name}, ?MODULE,
+ [Name, Dir, ClientRefs, StartupFunState],
+ [{timeout, infinity}]).
+
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
-client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server) ->
+client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom(Server) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
@@ -548,7 +553,7 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
[client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}).
-set_maximum_since_use(Server, Age) when is_pid(Server) ->
+set_maximum_since_use(Server, Age) when is_pid(Server); is_atom(Server) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl
index 6b4988b06d..65bf2c74e8 100644
--- a/src/rabbit_msg_store_vhost_sup.erl
+++ b/src/rabbit_msg_store_vhost_sup.erl
@@ -26,7 +26,7 @@ add_vhost(Name, VHost) ->
start_store_for_vhost(Name, ClientRefs, StartupFunState, VHost) ->
case vhost_store_pid(Name, VHost) of
no_pid ->
- VHostDir = vhost_store_dir(VHost),
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
ok = rabbit_file:ensure_dir(VHostDir),
rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]),
case rabbit_msg_store:start_link(Name, VHostDir, ClientRefs, StartupFunState) of
@@ -81,8 +81,3 @@ successfully_recovered_state(Name, VHost) ->
Pid when is_pid(Pid) ->
rabbit_msg_store:successfully_recovered_state(Pid)
end.
-
-vhost_store_dir(VHost) ->
- Dir = rabbit_mnesia:dir(),
- EncodedName = list_to_binary(rabbit_vhost:dir(VHost)),
- binary_to_list(filename:join([Dir, EncodedName])).
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index ad70540e5b..a457938dc9 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor).
--export([start_link/0, start_child/1, start_child/2, start_child/3,
+-export([start_link/0, start_child/1, start_child/2, start_child/3, start_child/4,
start_supervisor_child/1, start_supervisor_child/2,
start_supervisor_child/3,
start_restartable_child/1, start_restartable_child/2,
@@ -37,6 +37,7 @@
-spec start_child(atom()) -> 'ok'.
-spec start_child(atom(), [any()]) -> 'ok'.
-spec start_child(atom(), atom(), [any()]) -> 'ok'.
+-spec start_child(atom(), atom(), atom(), [any()]) -> 'ok'.
-spec start_supervisor_child(atom()) -> 'ok'.
-spec start_supervisor_child(atom(), [any()]) -> 'ok'.
-spec start_supervisor_child(atom(), atom(), [any()]) -> 'ok'.
@@ -60,6 +61,13 @@ start_child(ChildId, Mod, Args) ->
{ChildId, {Mod, start_link, Args},
transient, ?WORKER_WAIT, worker, [Mod]})).
+start_child(ChildId, Mod, Fun, Args) ->
+ child_reply(supervisor:start_child(
+ ?SERVER,
+ {ChildId, {Mod, Fun, Args},
+ transient, ?WORKER_WAIT, worker, [Mod]})).
+
+
start_supervisor_child(Mod) -> start_supervisor_child(Mod, []).
start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8995154886..fc8dee8385 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -2715,9 +2715,9 @@ transform_store(Store, TransformFun) ->
move_messages_to_vhost_store() ->
rabbit_log:info("Moving messages to per-vhsot message store"),
Queues = list_persistent_queues(),
- %% Old msg_store may require recovery.
+ %% Legacy (global) msg_store may require recovery.
%% This upgrade step should only be started
- %% if we are upgrading from version with old store.
+ %% if we are upgrading from a pre-3.7.0 version.
{RecoveryTerms, StartFunState} = start_recovery_terms(Queues),
OldStore = run_old_persistent_store(RecoveryTerms, StartFunState),
%% New store should not be recovered.
@@ -2743,9 +2743,11 @@ move_messages_to_vhost_store() ->
ok = rabbit_sup:stop_child(NewStoreSup).
migrate_queue(Queue, OldStore, NewStoreSup) ->
+ #amqqueue{name = QueueName} = Queue,
+ rabbit_log:info("Migrating messages in queue ~s in vhost ~s to per-vhost message store~n",
+ [QueueName#resource.name, QueueName#resource.virtual_host]),
OldStoreClient = get_global_store_client(OldStore),
NewStoreClient = get_per_vhost_store_client(Queue, NewStoreSup),
- #amqqueue{name = QueueName} = Queue,
rabbit_queue_index:move_to_per_vhost_stores(QueueName),
%% WARNING: During scan_queue_segments queue index state is being recovered
%% and terminated. This can cause side effects!
@@ -2813,7 +2815,7 @@ start_recovery_terms(Queues) ->
run_old_persistent_store(Refs, StartFunState) ->
OldStoreName = ?PERSISTENT_MSG_STORE,
- ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store,
+ ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, start_global_store_link,
[OldStoreName, rabbit_mnesia:dir(),
Refs, StartFunState]),
OldStoreName.
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 26fdaa0db8..bbf77f290f 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -23,7 +23,7 @@
-export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2,
set_limits/2, limits_of/1]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
--export([dir/1]).
+-export([dir/1, msg_store_dir_path/1]).
-export([purge_messages/1]).
-spec add(rabbit_types:vhost()) -> 'ok'.
@@ -96,15 +96,15 @@ delete(VHostPath) ->
[ok = Fun() || Fun <- Funs],
ok.
-purge_messages(VHostPath) ->
- VhostDir = filename:join(rabbit_mnesia:dir(), dir(VHostPath)),
- rabbit_log:info("Deleting vhost message store directory at '~s'~n", [VhostDir]),
+purge_messages(VHost) ->
+ VhostDir = msg_store_dir_path(VHost),
+ rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
%% Message store is stopped to close file handles
- rabbit_variable_queue:stop_vhost_msg_store(VHostPath),
+ rabbit_variable_queue:stop_vhost_msg_store(VHost),
ok = rabbit_file:recursive_delete([VhostDir]),
- %% Second terminate is made in case message store is
- %% restarted during deletion
- rabbit_variable_queue:stop_vhost_msg_store(VHostPath).
+ %% Ensure the store is terminated even if it was restarted during the delete operation
+ %% above.
+ rabbit_variable_queue:stop_vhost_msg_store(VHost).
assert_benign(ok) -> ok;
assert_benign({ok, _}) -> ok;
@@ -179,6 +179,17 @@ set_limits(VHost = #vhost{}, undefined) ->
set_limits(VHost = #vhost{}, Limits) ->
VHost#vhost{limits = Limits}.
+
+dir(Vhost) ->
+ <<Num:128>> = erlang:md5(term_to_binary(Vhost)),
+ rabbit_misc:format("~.36B", [Num]).
+
+msg_store_dir_path(VHost) ->
+ Dir = rabbit_mnesia:dir(),
+ EncodedName = list_to_binary(dir(VHost)),
+ binary_to_list(filename:join([Dir, "msg_stores", "vhosts", EncodedName])).
+
+
%%----------------------------------------------------------------------------
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
@@ -198,6 +209,3 @@ info_all(Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref, fun(VHost) -> info(VHost, Items) end, list()).
-dir(Vhost) ->
- <<Num:128>> = erlang:md5(term_to_binary(Vhost)),
- rabbit_misc:format("~.36B", [Num]).