summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-03-20 09:50:55 +0000
committerDaniil Fedotov <dfedotov@pivotal.io>2017-04-12 12:13:43 +0100
commit256caaeb3e69294c5ff1c2a3d29cb3957d021f87 (patch)
tree7c307bdc1e4c7864994c891ef1e6dd870b63b67c /src
parent6a95535314def537e564a34c15733c1a20eff20d (diff)
downloadrabbitmq-server-git-256caaeb3e69294c5ff1c2a3d29cb3957d021f87.tar.gz
Migrating to per-vhost supervisor message store.
Support reading/saving recovery terms from global storage to per-vhost storages.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl10
-rw-r--r--src/rabbit_mirror_queue_slave.erl5
-rw-r--r--src/rabbit_queue_index.erl32
-rw-r--r--src/rabbit_recovery_terms.erl12
-rw-r--r--src/rabbit_variable_queue.erl59
-rw-r--r--src/rabbit_vhost_sup_sup.erl1
6 files changed, 87 insertions, 32 deletions
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index a753f591c4..347dbbb48a 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -19,7 +19,8 @@
-behaviour(supervisor2).
-export([start_link/0, start_queue_process/3]).
--export([start_for_vhost/1, stop_for_vhost/1, find_for_vhost/2]).
+-export([start_for_vhost/1, stop_for_vhost/1,
+ find_for_vhost/2, find_for_vhost/1]).
-export([init/1]).
@@ -50,6 +51,11 @@ init([]) ->
[{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []},
temporary, ?SUPERVISOR_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}.
+-spec find_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
+find_for_vhost(VHost) ->
+ find_for_vhost(VHost, node()).
+
+-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}.
find_for_vhost(VHost, Node) ->
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
@@ -57,6 +63,7 @@ find_for_vhost(VHost, Node) ->
Result -> {error, {queue_supervisor_not_found, Result}}
end.
+-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_for_vhost(VHost) ->
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
supervisor2:start_child(
@@ -65,6 +72,7 @@ start_for_vhost(VHost) ->
{rabbit_amqqueue_sup_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}).
+-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
stop_for_vhost(VHost) ->
{ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index aec974c10c..5c1af7f4aa 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -194,12 +194,13 @@ stop_pending_slaves(QName, Pids) ->
[begin
rabbit_mirror_queue_misc:log_warning(
QName, "Detected stale HA slave, stopping it: ~p~n", [Pid]),
- %TODO: per-vhost supervisor
case erlang:process_info(Pid, dictionary) of
undefined -> ok;
{dictionary, Dict} ->
+ Vhost = QName#resource.virtual_host,
+ {ok, AmqQSup} = rabbit_amqqueue_sup_sup:find_for_vhost(Vhost),
case proplists:get_value('$ancestors', Dict) of
- [Sup, rabbit_amqqueue_sup_sup | _] ->
+ [Sup, AmqQSup | _] ->
exit(Sup, kill),
exit(Pid, kill);
_ ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index e3a1e35ab3..23469f4593 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -26,7 +26,10 @@
-export([scan_queue_segments/3]).
%% Migrates from global to per-vhost message stores
--export([move_to_per_vhost_stores/1, update_recovery_term/2]).
+-export([move_to_per_vhost_stores/1,
+ update_recovery_term/2,
+ read_global_recovery_terms/1,
+ cleanup_global_recovery_terms/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -516,6 +519,33 @@ start(VHost, DurableQueueNames) ->
OrderedTerms = lists:reverse(DurableTerms),
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+
+read_global_recovery_terms(DurableQueueNames) ->
+ ok = rabbit_recovery_terms:open_global_table(),
+
+ DurableTerms =
+ lists:foldl(
+ fun(QName, RecoveryTerms) ->
+ DirName = queue_name_to_dir_name(QName),
+ RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
+ {error, _} -> non_clean_shutdown;
+ {ok, Terms} -> Terms
+ end,
+ [RecoveryInfo | RecoveryTerms]
+ end, [], DurableQueueNames),
+
+ ok = rabbit_recovery_terms:close_global_table(),
+ %% The backing queue interface requires that the queue recovery terms
+ %% which come back from start/1 are in the same order as DurableQueueNames
+ OrderedTerms = lists:reverse(DurableTerms),
+ {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+
+cleanup_global_recovery_terms() ->
+ rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
+ rabbit_recovery_terms:delete_global_table(),
+ ok.
+
+
stop(VHost) -> rabbit_recovery_terms:stop(VHost).
all_queue_directory_names(VHost) ->
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index 19dbcd2fd0..be9b1b6227 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -28,6 +28,9 @@
terminate/2, code_change/3]).
-export([upgrade_recovery_terms/0, persistent_bytes/0]).
+-export([open_global_table/0, close_global_table/0,
+ read_global/1, delete_global_table/0]).
+-export([open_table/1, close_table/1]).
-rabbit_upgrade({upgrade_recovery_terms, local, []}).
-rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}).
@@ -119,12 +122,19 @@ open_global_table() ->
File = filename:join(rabbit_mnesia:dir(), "recovery.dets"),
{ok, _} = dets:open_file(?MODULE, [{file, File},
{ram_file, true},
- {auto_save, infinity}]).
+ {auto_save, infinity}]),
+ ok.
close_global_table() ->
ok = dets:sync(?MODULE),
ok = dets:close(?MODULE).
+read_global(DirBaseName) ->
+ read(?MODULE, DirBaseName).
+
+delete_global_table() ->
+ file:delete(filename:join(rabbit_mnesia:dir(), "recovery.dets")).
+
%%----------------------------------------------------------------------------
init([VHost]) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 556f92acb7..599045b01f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -2753,7 +2753,7 @@ transform_store(Store, TransformFun) ->
move_messages_to_vhost_store() ->
case list_persistent_queues() of
- [] -> ok;
+ % [] -> ok;
Queues -> move_messages_to_vhost_store(Queues)
end.
@@ -2776,21 +2776,23 @@ move_messages_to_vhost_store(Queues) ->
VHosts = rabbit_vhost:list(),
%% New store should not be recovered.
- ok = start_new_store(VHosts),
+ NewMsgStore = start_new_store(VHosts),
+ %% Recovery terms should be started for all vhosts for new store.
+ [{ok, _} = rabbit_recovery_terms:open_table(VHost) || VHost <- VHosts],
+
MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size,
?QUEUE_MIGRATION_BATCH_SIZE),
in_batches(MigrationBatchSize,
- {rabbit_variable_queue, migrate_queue, [OldStore]},
+ {rabbit_variable_queue, migrate_queue, [OldStore, NewMsgStore]},
QueuesWithTerms,
"message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ~n",
"message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"),
log_upgrade("Message store migration finished"),
- delete_old_store(OldStore),
-
- ok = rabbit_queue_index:stop(),
- ok = stop_new_store(VHosts),
- ok.
+ ok = delete_old_store(OldStore),
+ ok = rabbit_queue_index:cleanup_global_recovery_terms(),
+ [ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts],
+ ok = stop_new_store(NewMsgStore).
in_batches(Size, MFA, List, MessageStart, MessageEnd) ->
in_batches(Size, 1, MFA, List, MessageStart, MessageEnd).
@@ -2816,12 +2818,14 @@ in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) ->
rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]),
in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd).
-migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore) ->
+migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name},
+ RecoveryTerm},
+ OldStore, NewStore) ->
log_upgrade_verbose(
"Migrating messages in queue ~s in vhost ~s to per-vhost message store~n",
[Name, VHost]),
OldStoreClient = get_global_store_client(OldStore),
- NewStoreClient = get_per_vhost_store_client(QueueName),
+ NewStoreClient = get_per_vhost_store_client(QueueName, NewStore),
%% WARNING: During scan_queue_segments queue index state is being recovered
%% and terminated. This can cause side effects!
rabbit_queue_index:scan_queue_segments(
@@ -2857,11 +2861,10 @@ migrate_message(MsgId, OldC, NewC) ->
_ -> OldC
end.
-get_per_vhost_store_client(#resource{virtual_host = VHost}) ->
- rabbit_vhost_msg_store:client_init(VHost, ?PERSISTENT_MSG_STORE,
- rabbit_guid:gen(),
- fun(_,_) -> ok end,
- fun() -> ok end).
+get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStore) ->
+ {VHost, StorePid} = lists:keyfind(VHost, 1, NewStore),
+ rabbit_msg_store:client_init(StorePid, rabbit_guid:gen(),
+ fun(_,_) -> ok end, fun() -> ok end).
get_global_store_client(OldStore) ->
rabbit_msg_store:client_init(OldStore,
@@ -2902,19 +2905,22 @@ run_old_persistent_store(Refs, StartFunState) ->
start_new_store(VHosts) ->
%% Ensure vhost supervisor is started, so we can add vhsots to it.
- %% TODO: Start message store for vhost without a supervisor.
- lists:foreach(fun(VHost) ->
- % Start persistent store without recovery.
- {ok, _} = rabbit_vhost_msg_store:start(VHost, ?PERSISTENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE)
+ lists:map(fun(VHost) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ {ok, Pid} = rabbit_msg_store:start_link(?PERSISTENT_MSG_STORE,
+ VHostDir,
+ undefined,
+ ?EMPTY_START_FUN_STATE),
+ {VHost, Pid}
end,
- VHosts),
- ok.
+ VHosts).
-stop_new_store(VHosts) ->
- lists:foreach(fun(VHost) ->
- ok = rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE)
+stop_new_store(NewStore) ->
+ lists:foreach(fun({_VHost, StorePid}) ->
+ unlink(StorePid),
+ exit(StorePid, shutdown)
end,
- VHosts),
+ NewStore),
ok.
delete_old_store(OldStore) ->
@@ -2923,7 +2929,8 @@ delete_old_store(OldStore) ->
[filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]),
%% Delete old transient store as well
rabbit_file:recursive_delete(
- [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]).
+ [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]),
+ ok.
log_upgrade(Msg) ->
log_upgrade(Msg, []).
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index 58a5925af9..90f7be503e 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -64,7 +64,6 @@ stop_and_delete_vhost(VHost) ->
end.
delete_on_all_nodes(VHost) ->
-%% TODO: failing nodes
[ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
ok.