diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-03-20 09:50:55 +0000 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-04-12 12:13:43 +0100 |
| commit | 256caaeb3e69294c5ff1c2a3d29cb3957d021f87 (patch) | |
| tree | 7c307bdc1e4c7864994c891ef1e6dd870b63b67c | |
| parent | 6a95535314def537e564a34c15733c1a20eff20d (diff) | |
| download | rabbitmq-server-git-256caaeb3e69294c5ff1c2a3d29cb3957d021f87.tar.gz | |
Migrating to per-vhost supervisor message store.
Support reading/saving recovery terms from global storage to per-vhost storages.
| -rw-r--r-- | src/rabbit_amqqueue_sup_sup.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_recovery_terms.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 59 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 1 | ||||
| -rw-r--r-- | test/unit_inbroker_SUITE.erl | 2 |
7 files changed, 88 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index a753f591c4..347dbbb48a 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -19,7 +19,8 @@ -behaviour(supervisor2). -export([start_link/0, start_queue_process/3]). --export([start_for_vhost/1, stop_for_vhost/1, find_for_vhost/2]). +-export([start_for_vhost/1, stop_for_vhost/1, + find_for_vhost/2, find_for_vhost/1]). -export([init/1]). @@ -50,6 +51,11 @@ init([]) -> [{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, temporary, ?SUPERVISOR_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}. +-spec find_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. +find_for_vhost(VHost) -> + find_for_vhost(VHost, node()). + +-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}. find_for_vhost(VHost, Node) -> {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node), case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of @@ -57,6 +63,7 @@ find_for_vhost(VHost, Node) -> Result -> {error, {queue_supervisor_not_found, Result}} end. +-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. start_for_vhost(VHost) -> {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), supervisor2:start_child( @@ -65,6 +72,7 @@ start_for_vhost(VHost) -> {rabbit_amqqueue_sup_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}). +-spec stop_for_vhost(rabbit_types:vhost()) -> ok. stop_for_vhost(VHost) -> {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index aec974c10c..5c1af7f4aa 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -194,12 +194,13 @@ stop_pending_slaves(QName, Pids) -> [begin rabbit_mirror_queue_misc:log_warning( QName, "Detected stale HA slave, stopping it: ~p~n", [Pid]), - %TODO: per-vhost supervisor case erlang:process_info(Pid, dictionary) of undefined -> ok; {dictionary, Dict} -> + Vhost = QName#resource.virtual_host, + {ok, AmqQSup} = rabbit_amqqueue_sup_sup:find_for_vhost(Vhost), case proplists:get_value('$ancestors', Dict) of - [Sup, rabbit_amqqueue_sup_sup | _] -> + [Sup, AmqQSup | _] -> exit(Sup, kill), exit(Pid, kill); _ -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index e3a1e35ab3..23469f4593 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -26,7 +26,10 @@ -export([scan_queue_segments/3]). %% Migrates from global to per-vhost message stores --export([move_to_per_vhost_stores/1, update_recovery_term/2]). +-export([move_to_per_vhost_stores/1, + update_recovery_term/2, + read_global_recovery_terms/1, + cleanup_global_recovery_terms/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -516,6 +519,33 @@ start(VHost, DurableQueueNames) -> OrderedTerms = lists:reverse(DurableTerms), {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. + +read_global_recovery_terms(DurableQueueNames) -> + ok = rabbit_recovery_terms:open_global_table(), + + DurableTerms = + lists:foldl( + fun(QName, RecoveryTerms) -> + DirName = queue_name_to_dir_name(QName), + RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of + {error, _} -> non_clean_shutdown; + {ok, Terms} -> Terms + end, + [RecoveryInfo | RecoveryTerms] + end, [], DurableQueueNames), + + ok = rabbit_recovery_terms:close_global_table(), + %% The backing queue interface requires that the queue recovery terms + %% which come back from start/1 are in the same order as DurableQueueNames + OrderedTerms = lists:reverse(DurableTerms), + {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. + +cleanup_global_recovery_terms() -> + rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]), + rabbit_recovery_terms:delete_global_table(), + ok. + + stop(VHost) -> rabbit_recovery_terms:stop(VHost). all_queue_directory_names(VHost) -> diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index 19dbcd2fd0..be9b1b6227 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -28,6 +28,9 @@ terminate/2, code_change/3]). -export([upgrade_recovery_terms/0, persistent_bytes/0]). +-export([open_global_table/0, close_global_table/0, + read_global/1, delete_global_table/0]). +-export([open_table/1, close_table/1]). -rabbit_upgrade({upgrade_recovery_terms, local, []}). -rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}). @@ -119,12 +122,19 @@ open_global_table() -> File = filename:join(rabbit_mnesia:dir(), "recovery.dets"), {ok, _} = dets:open_file(?MODULE, [{file, File}, {ram_file, true}, - {auto_save, infinity}]). + {auto_save, infinity}]), + ok. close_global_table() -> ok = dets:sync(?MODULE), ok = dets:close(?MODULE). +read_global(DirBaseName) -> + read(?MODULE, DirBaseName). + +delete_global_table() -> + file:delete(filename:join(rabbit_mnesia:dir(), "recovery.dets")). + %%---------------------------------------------------------------------------- init([VHost]) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 556f92acb7..599045b01f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -2753,7 +2753,7 @@ transform_store(Store, TransformFun) -> move_messages_to_vhost_store() -> case list_persistent_queues() of - [] -> ok; + % [] -> ok; Queues -> move_messages_to_vhost_store(Queues) end. @@ -2776,21 +2776,23 @@ move_messages_to_vhost_store(Queues) -> VHosts = rabbit_vhost:list(), %% New store should not be recovered. - ok = start_new_store(VHosts), + NewMsgStore = start_new_store(VHosts), + %% Recovery terms should be started for all vhosts for new store. + [{ok, _} = rabbit_recovery_terms:open_table(VHost) || VHost <- VHosts], + MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size, ?QUEUE_MIGRATION_BATCH_SIZE), in_batches(MigrationBatchSize, - {rabbit_variable_queue, migrate_queue, [OldStore]}, + {rabbit_variable_queue, migrate_queue, [OldStore, NewMsgStore]}, QueuesWithTerms, "message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ~n", "message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"), log_upgrade("Message store migration finished"), - delete_old_store(OldStore), - - ok = rabbit_queue_index:stop(), - ok = stop_new_store(VHosts), - ok. + ok = delete_old_store(OldStore), + ok = rabbit_queue_index:cleanup_global_recovery_terms(), + [ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts], + ok = stop_new_store(NewMsgStore). in_batches(Size, MFA, List, MessageStart, MessageEnd) -> in_batches(Size, 1, MFA, List, MessageStart, MessageEnd). @@ -2816,12 +2818,14 @@ in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) -> rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]), in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd). -migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore) -> +migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, + RecoveryTerm}, + OldStore, NewStore) -> log_upgrade_verbose( "Migrating messages in queue ~s in vhost ~s to per-vhost message store~n", [Name, VHost]), OldStoreClient = get_global_store_client(OldStore), - NewStoreClient = get_per_vhost_store_client(QueueName), + NewStoreClient = get_per_vhost_store_client(QueueName, NewStore), %% WARNING: During scan_queue_segments queue index state is being recovered %% and terminated. This can cause side effects! rabbit_queue_index:scan_queue_segments( @@ -2857,11 +2861,10 @@ migrate_message(MsgId, OldC, NewC) -> _ -> OldC end. -get_per_vhost_store_client(#resource{virtual_host = VHost}) -> - rabbit_vhost_msg_store:client_init(VHost, ?PERSISTENT_MSG_STORE, - rabbit_guid:gen(), - fun(_,_) -> ok end, - fun() -> ok end). +get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStore) -> + {VHost, StorePid} = lists:keyfind(VHost, 1, NewStore), + rabbit_msg_store:client_init(StorePid, rabbit_guid:gen(), + fun(_,_) -> ok end, fun() -> ok end). get_global_store_client(OldStore) -> rabbit_msg_store:client_init(OldStore, @@ -2902,19 +2905,22 @@ run_old_persistent_store(Refs, StartFunState) -> start_new_store(VHosts) -> %% Ensure vhost supervisor is started, so we can add vhsots to it. - %% TODO: Start message store for vhost without a supervisor. - lists:foreach(fun(VHost) -> - % Start persistent store without recovery. - {ok, _} = rabbit_vhost_msg_store:start(VHost, ?PERSISTENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE) + lists:map(fun(VHost) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + {ok, Pid} = rabbit_msg_store:start_link(?PERSISTENT_MSG_STORE, + VHostDir, + undefined, + ?EMPTY_START_FUN_STATE), + {VHost, Pid} end, - VHosts), - ok. + VHosts). -stop_new_store(VHosts) -> - lists:foreach(fun(VHost) -> - ok = rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE) +stop_new_store(NewStore) -> + lists:foreach(fun({_VHost, StorePid}) -> + unlink(StorePid), + exit(StorePid, shutdown) end, - VHosts), + NewStore), ok. delete_old_store(OldStore) -> @@ -2923,7 +2929,8 @@ delete_old_store(OldStore) -> [filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]), %% Delete old transient store as well rabbit_file:recursive_delete( - [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]). + [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]), + ok. log_upgrade(Msg) -> log_upgrade(Msg, []). diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index 58a5925af9..90f7be503e 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -64,7 +64,6 @@ stop_and_delete_vhost(VHost) -> end. delete_on_all_nodes(VHost) -> -%% TODO: failing nodes [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], ok. diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl index 600ece9378..be7a6757f7 100644 --- a/test/unit_inbroker_SUITE.erl +++ b/test/unit_inbroker_SUITE.erl @@ -829,7 +829,7 @@ bq_queue_recover1(Config) -> rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>), true, false, [], none, <<"acting-user">>), publish_and_confirm(Q, <<>>, Count), -%% TODO: per-vhost supervisor + SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(Q), true = is_pid(SupPid), exit(SupPid, kill), |
