diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-03-17 13:03:03 +0000 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-04-12 12:13:43 +0100 |
| commit | c464fdefa9c451d25c42573190f72f24a227ed4a (patch) | |
| tree | 3a277ae22ef2fecd4a01b37a0edc59342c64dc1b /src | |
| parent | 536674232015587befed5be325bc89aeca3c17ff (diff) | |
| download | rabbitmq-server-git-c464fdefa9c451d25c42573190f72f24a227ed4a.tar.gz | |
Per-vhost supervision trees for queues and message stores.
Per-vhost message stores can be restarted, but queues contain
references for old message stores in message store client data,
also queues rely on message store process to report confirms for
messages on disk.
Because after message store restart queues will not get any confirms and
will fail with badarg error trying to access message store with an old client,
queue processes should be restarted together with message stores.
Queue process cannot monitor message store because of backing_queue mechanism,
so they should be controlled by a supervision tree. One tree will contain
queues supervisor and message store proecesses.
Per-vhost supervisor will restart if any of it's children dies.
Per-vhost supervisor restart process will do queue and message store data recovery
the same way as pre-3.7 global message store did, just with VHost as an argument and
in a vhost data directory.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_sup_sup.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_msg_store_vhost_sup.erl | 103 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_recovery_terms.erl | 104 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 208 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 47 | ||||
| -rw-r--r-- | src/rabbit_vhost_msg_store.erl | 61 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 127 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_watcher.erl | 66 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 2 |
17 files changed, 549 insertions, 299 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 680a6a2a98..2fa18ac0e5 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -819,10 +819,7 @@ boot_delegate() -> recover() -> rabbit_policy:recover(), - Qs = rabbit_amqqueue:recover(), - ok = rabbit_binding:recover(rabbit_exchange:recover(), - [QName || #amqqueue{name = QName} <- Qs]), - rabbit_amqqueue:start(Qs). + rabbit_vhost:recover(). maybe_insert_default_data() -> case rabbit_table:needs_default_data() of diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index d0c55b2c0e..a753f591c4 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -19,6 +19,7 @@ -behaviour(supervisor2). -export([start_link/0, start_queue_process/3]). +-export([start_for_vhost/1, stop_for_vhost/1, find_for_vhost/2]). -export([init/1]). @@ -36,14 +37,35 @@ %%---------------------------------------------------------------------------- start_link() -> - supervisor2:start_link({local, ?SERVER}, ?MODULE, []). + supervisor2:start_link(?MODULE, []). start_queue_process(Node, Q, StartMode) -> - {ok, _SupPid, QPid} = supervisor2:start_child( - {?SERVER, Node}, [Q, StartMode]), + #amqqueue{name = #resource{virtual_host = VHost}} = Q, + {ok, Sup} = find_for_vhost(VHost, Node), + {ok, _SupPid, QPid} = supervisor2:start_child(Sup, [Q, StartMode]), QPid. init([]) -> {ok, {{simple_one_for_one, 10, 10}, [{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, temporary, ?SUPERVISOR_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}. + +find_for_vhost(VHost, Node) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node), + case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of + [QSup] -> {ok, QSup}; + Result -> {error, {queue_supervisor_not_found, Result}} + end. + +start_for_vhost(VHost) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + supervisor2:start_child( + VHostSup, + {rabbit_amqqueue_sup_sup, + {rabbit_amqqueue_sup_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}). + +stop_for_vhost(VHost) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), + ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup).
\ No newline at end of file diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 4061098a9d..cc2797489d 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, policy_changed/2, callback/4, declare/7, +-export([recover/1, policy_changed/2, callback/4, declare/7, assert_equivalence/6, assert_args_equivalence/2, check_type/1, lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2, update_scratch/3, update_decorators/1, immutable/1, @@ -36,7 +36,7 @@ -type type() :: atom(). -type fun_name() :: atom(). --spec recover() -> [name()]. +-spec recover(rabbit_types:vhost()) -> [name()]. -spec callback (rabbit_types:exchange(), fun_name(), fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'. @@ -107,10 +107,11 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments, policy, user_who_performed_action]). -recover() -> +recover(VHost) -> Xs = rabbit_misc:table_filter( fun (#exchange{name = XName}) -> - mnesia:read({rabbit_exchange, XName}) =:= [] + XName#resource.virtual_host =:= VHost andalso + mnesia:read({rabbit_exchange, XName}) =:= [] end, fun (X, Tx) -> X1 = case Tx of diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index b006e37eb2..94710aed43 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,7 +26,7 @@ msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, zip_msgs_and_acks/4]). --export([start/1, stop/0, delete_crashed/1]). +-export([start/2, stop/1, delete_crashed/1]). -export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]). @@ -81,12 +81,12 @@ %% Backing queue %% --------------------------------------------------------------------------- -start(_DurableQueues) -> +start(_Vhost, _DurableQueues) -> %% This will never get called as this module will never be %% installed as the default BQ implementation. exit({not_valid_for_generic_backing_queue, ?MODULE}). -stop() -> +stop(_Vhost) -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 61623c9441..aec974c10c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -194,6 +194,7 @@ stop_pending_slaves(QName, Pids) -> [begin rabbit_mirror_queue_misc:log_warning( QName, "Detected stale HA slave, stopping it: ~p~n", [Pid]), + %TODO: per-vhost supervisor case erlang:process_info(Pid, dictionary) of undefined -> ok; {dictionary, Dict} -> diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl deleted file mode 100644 index 5031c5f043..0000000000 --- a/src/rabbit_msg_store_vhost_sup.erl +++ /dev/null @@ -1,103 +0,0 @@ --module(rabbit_msg_store_vhost_sup). - --include("rabbit.hrl"). - --behaviour(supervisor2). - --export([start_link/3, init/1, add_vhost/2, delete_vhost/2, - client_init/5, successfully_recovered_state/2]). - -%% Internal --export([start_store_for_vhost/4]). - -start_link(Type, VhostsClientRefs, StartupFunState) when is_map(VhostsClientRefs); - VhostsClientRefs == undefined -> - supervisor2:start_link({local, Type}, ?MODULE, - [Type, VhostsClientRefs, StartupFunState]). - -init([Type, VhostsClientRefs, StartupFunState]) -> - ets:new(Type, [named_table, public]), - {ok, {{simple_one_for_one, 1, 1}, - [{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_store_for_vhost, - [Type, VhostsClientRefs, StartupFunState]}, - transient, infinity, supervisor, [rabbit_msg_store]}]}}. - - -add_vhost(Type, VHost) -> - VHostPid = maybe_start_store_for_vhost(Type, VHost), - {ok, VHostPid}. - -start_store_for_vhost(Type, VhostsClientRefs, StartupFunState, VHost) -> - case vhost_store_pid(Type, VHost) of - no_pid -> - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - ok = rabbit_file:ensure_dir(VHostDir), - rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]), - VhostRefs = refs_for_vhost(VHost, VhostsClientRefs), - VhostStartupFunState = startup_fun_state_for_vhost(StartupFunState, VHost), - case rabbit_msg_store:start_link(Type, VHostDir, VhostRefs, VhostStartupFunState) of - {ok, Pid} -> - ets:insert(Type, {VHost, Pid}), - {ok, Pid}; - Other -> Other - end; - Pid when is_pid(Pid) -> - {error, {already_started, Pid}} - end. - -startup_fun_state_for_vhost({Fun, {start, [#resource{}|_] = QNames}}, VHost) -> - QNamesForVhost = [QName || QName = #resource{virtual_host = VH} <- QNames, - VH == VHost ], - {Fun, {start, QNamesForVhost}}; -startup_fun_state_for_vhost(State, _VHost) -> State. - -refs_for_vhost(_, undefined) -> undefined; -refs_for_vhost(VHost, Refs) -> - case maps:find(VHost, Refs) of - {ok, Val} -> Val; - error -> [] - end. - - -delete_vhost(Type, VHost) -> - case vhost_store_pid(Type, VHost) of - no_pid -> ok; - Pid when is_pid(Pid) -> - supervisor2:terminate_child(Type, Pid), - cleanup_vhost_store(Type, VHost, Pid) - end, - ok. - -client_init(Type, Ref, MsgOnDiskFun, CloseFDsFun, VHost) -> - VHostPid = maybe_start_store_for_vhost(Type, VHost), - rabbit_msg_store:client_init(VHostPid, Ref, MsgOnDiskFun, CloseFDsFun). - -maybe_start_store_for_vhost(Type, VHost) -> - case supervisor2:start_child(Type, [VHost]) of - {ok, Pid} -> Pid; - {error, {already_started, Pid}} -> Pid; - Error -> throw(Error) - end. - -vhost_store_pid(Type, VHost) -> - case ets:lookup(Type, VHost) of - [] -> no_pid; - [{VHost, Pid}] -> - case erlang:is_process_alive(Pid) of - true -> Pid; - false -> - cleanup_vhost_store(Type, VHost, Pid), - no_pid - end - end. - -cleanup_vhost_store(Type, VHost, Pid) -> - ets:delete_object(Type, {VHost, Pid}). - -successfully_recovered_state(Type, VHost) -> - case vhost_store_pid(Type, VHost) of - no_pid -> - throw({message_store_not_started, Type, VHost}); - Pid when is_pid(Pid) -> - rabbit_msg_store:successfully_recovered_state(Pid) - end. diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 34e23260ba..41e65e8a1f 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -30,7 +30,7 @@ -export([enable/0]). --export([start/1, stop/0]). +-export([start/2, stop/1]). -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, purge/1, purge_acks/1, @@ -83,22 +83,22 @@ enable() -> %%---------------------------------------------------------------------------- -start(QNames) -> +start(VHost, QNames) -> BQ = bq(), %% TODO this expand-collapse dance is a bit ridiculous but it's what %% rabbit_amqqueue:recover/0 expects. We could probably simplify %% this if we rejigged recovery a bit. {DupNames, ExpNames} = expand_queues(QNames), - case BQ:start(ExpNames) of + case BQ:start(VHost, ExpNames) of {ok, ExpRecovery} -> {ok, collapse_recovery(QNames, DupNames, ExpRecovery)}; Else -> Else end. -stop() -> +stop(VHost) -> BQ = bq(), - BQ:stop(). + BQ:stop(VHost). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index bf80fe53a5..e3a1e35ab3 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -17,10 +17,10 @@ -module(rabbit_queue_index). -export([erase/1, init/3, reset_state/1, recover/6, - terminate/2, delete_and_terminate/1, + terminate/3, delete_and_terminate/1, pre_publish/7, flush_pre_publish_cache/2, publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, - read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). + read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]). -export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). -export([scan_queue_segments/3]). @@ -261,7 +261,7 @@ on_sync_fun(), on_sync_fun()) -> {'undefined' | non_neg_integer(), 'undefined' | non_neg_integer(), qistate()}. --spec terminate([any()], qistate()) -> qistate(). +-spec terminate(rabbit_types:vhsot(), [any()], qistate()) -> qistate(). -spec delete_and_terminate(qistate()) -> qistate(). -spec publish(rabbit_types:msg_id(), seq_id(), rabbit_types:message_properties(), boolean(), @@ -278,7 +278,7 @@ -spec next_segment_boundary(seq_id()) -> seq_id(). -spec bounds(qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}. --spec start([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}. +-spec start(rabbit_types:vhsot(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}. -spec add_queue_ttl() -> 'ok'. @@ -321,9 +321,9 @@ recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end. -terminate(Terms, State = #qistate { dir = Dir }) -> +terminate(VHost, Terms, State = #qistate { dir = Dir }) -> {SegmentCounts, State1} = terminate(State), - rabbit_recovery_terms:store(filename:basename(Dir), + rabbit_recovery_terms:store(VHost, filename:basename(Dir), [{segments, SegmentCounts} | Terms]), State1. @@ -491,34 +491,36 @@ bounds(State = #qistate { segments = Segments }) -> end, {LowSeqId, NextSeqId, State}. -start(DurableQueueNames) -> - ok = rabbit_recovery_terms:start(), +start(VHost, DurableQueueNames) -> + ok = rabbit_recovery_terms:start(VHost), {DurableTerms, DurableDirectories} = lists:foldl( fun(QName, {RecoveryTerms, ValidDirectories}) -> DirName = queue_name_to_dir_name(QName), - RecoveryInfo = case rabbit_recovery_terms:read(DirName) of + RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of {error, _} -> non_clean_shutdown; {ok, Terms} -> Terms end, {[RecoveryInfo | RecoveryTerms], sets:add_element(DirName, ValidDirectories)} end, {[], sets:new()}, DurableQueueNames), - %% Any queue directory we've not been asked to recover is considered garbage rabbit_file:recursive_delete( [DirName || - DirName <- all_queue_directory_names(), + DirName <- all_queue_directory_names(VHost), not sets:is_element(filename:basename(DirName), DurableDirectories)]), - - rabbit_recovery_terms:clear(), + rabbit_recovery_terms:clear(VHost), %% The backing queue interface requires that the queue recovery terms %% which come back from start/1 are in the same order as DurableQueueNames OrderedTerms = lists:reverse(DurableTerms), {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. -stop() -> rabbit_recovery_terms:stop(). +stop(VHost) -> rabbit_recovery_terms:stop(VHost). + +all_queue_directory_names(VHost) -> + filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_path(VHost), + "queues", "*"])). all_queue_directory_names() -> filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_wildcard(), @@ -1447,6 +1449,6 @@ move_to_per_vhost_stores(#resource{} = QueueName) -> end, ok. -update_recovery_term(#resource{} = QueueName, Term) -> +update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) -> Key = queue_name_to_dir_name(QueueName), - rabbit_recovery_terms:store(Key, Term). + rabbit_recovery_terms:store(VHost, Key, Term). diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index c941126cd3..19dbcd2fd0 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -21,9 +21,9 @@ -behaviour(gen_server). --export([start/0, stop/0, store/2, read/1, clear/0]). +-export([start/1, stop/1, store/3, read/2, clear/1]). --export([start_link/0]). +-export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -32,40 +32,55 @@ -rabbit_upgrade({upgrade_recovery_terms, local, []}). -rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}). -%%---------------------------------------------------------------------------- - --spec start() -> rabbit_types:ok_or_error(term()). --spec stop() -> rabbit_types:ok_or_error(term()). --spec store(file:filename(), term()) -> rabbit_types:ok_or_error(term()). --spec read(file:filename()) -> rabbit_types:ok_or_error2(term(), not_found). --spec clear() -> 'ok'. +-include("rabbit.hrl"). %%---------------------------------------------------------------------------- --define(SERVER, ?MODULE). +-spec start(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()). +-spec stop(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()). +-spec store(rabbit_types:vhost(), file:filename(), term()) -> rabbit_types:ok_or_error(term()). +-spec read(rabbit_types:vhost(), file:filename()) -> rabbit_types:ok_or_error2(term(), not_found). +-spec clear(rabbit_types:vhost()) -> 'ok'. -start() -> rabbit_sup:start_child(?MODULE). +%%---------------------------------------------------------------------------- -stop() -> rabbit_sup:stop_child(?MODULE). +start(VHost) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + {ok, _} = supervisor2:start_child( + VHostSup, + {?MODULE, + {?MODULE, start_link, [VHost]}, + transient, ?WORKER_WAIT, worker, + [?MODULE]}), + ok. + +stop(VHost) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + case supervisor:terminate_child(VHostSup, ?MODULE) of + ok -> supervisor:delete_child(VHostSup, ?MODULE); + E -> E + end. -store(DirBaseName, Terms) -> dets:insert(?MODULE, {DirBaseName, Terms}). +store(VHost, DirBaseName, Terms) -> + dets:insert(VHost, {DirBaseName, Terms}). -read(DirBaseName) -> - case dets:lookup(?MODULE, DirBaseName) of +read(VHost, DirBaseName) -> + case dets:lookup(VHost, DirBaseName) of [{_, Terms}] -> {ok, Terms}; _ -> {error, not_found} end. -clear() -> - ok = dets:delete_all_objects(?MODULE), - flush(). +clear(VHost) -> + ok = dets:delete_all_objects(VHost), + flush(VHost). -start_link() -> gen_server:start_link(?MODULE, [], []). +start_link(VHost) -> + gen_server:start_link(?MODULE, [VHost], []). %%---------------------------------------------------------------------------- upgrade_recovery_terms() -> - open_table(), + open_global_table(), try QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"), Dirs = case rabbit_file:list_dir(QueuesDir) of @@ -75,37 +90,47 @@ upgrade_recovery_terms() -> [begin File = filename:join([QueuesDir, Dir, "clean.dot"]), case rabbit_file:read_term_file(File) of - {ok, Terms} -> ok = store(Dir, Terms); + {ok, Terms} -> ok = store(?MODULE, Dir, Terms); {error, _} -> ok end, file:delete(File) end || Dir <- Dirs], ok after - close_table() + close_global_table() end. persistent_bytes() -> dets_upgrade(fun persistent_bytes/1). persistent_bytes(Props) -> Props ++ [{persistent_bytes, 0}]. dets_upgrade(Fun)-> - open_table(), + open_global_table(), try ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) -> - store(DirBaseName, Fun(Terms)), + store(?MODULE, DirBaseName, Fun(Terms)), Acc end, ok, ?MODULE), ok after - close_table() + close_global_table() end. +open_global_table() -> + File = filename:join(rabbit_mnesia:dir(), "recovery.dets"), + {ok, _} = dets:open_file(?MODULE, [{file, File}, + {ram_file, true}, + {auto_save, infinity}]). + +close_global_table() -> + ok = dets:sync(?MODULE), + ok = dets:close(?MODULE). + %%---------------------------------------------------------------------------- -init(_) -> +init([VHost]) -> process_flag(trap_exit, true), - open_table(), - {ok, undefined}. + open_table(VHost), + {ok, VHost}. handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}. @@ -113,22 +138,23 @@ handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - close_table(). +terminate(_Reason, VHost) -> + close_table(VHost). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- -open_table() -> - File = filename:join(rabbit_mnesia:dir(), "recovery.dets"), - {ok, _} = dets:open_file(?MODULE, [{file, File}, - {ram_file, true}, - {auto_save, infinity}]). +open_table(VHost) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + File = filename:join(VHostDir, "recovery.dets"), + {ok, _} = dets:open_file(VHost, [{file, File}, + {ram_file, true}, + {auto_save, infinity}]). -flush() -> ok = dets:sync(?MODULE). +flush(VHost) -> ok = dets:sync(VHost). -close_table() -> - ok = flush(), - ok = dets:close(?MODULE). +close_table(VHost) -> + ok = flush(VHost), + ok = dets:close(VHost). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index eae464219a..ed2143a2b9 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -262,7 +262,8 @@ maybe_upgrade_local() -> maybe_migrate_queues_to_per_vhost_storage() -> Result = case rabbit_version:upgrades_required(message_store) of {error, version_not_available} -> version_not_available; - {error, starting_from_scratch} -> starting_from_scratch; + {error, starting_from_scratch} -> + starting_from_scratch; {error, _} = Err -> throw(Err); {ok, []} -> ok; {ok, Upgrades} -> apply_upgrades(message_store, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 300da96441..556f92acb7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -28,19 +28,17 @@ info/2, invoke/3, is_duplicate/2, set_queue_mode/2, zip_msgs_and_acks/4, multiple_routing_keys/0]). --export([start/1, stop/0]). - -%% exported for parallel map --export([add_vhost_msg_store/1]). +-export([start/2, stop/1]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/6]). +-export([start_msg_store/3, stop_msg_store/1, init/6]). -export([move_messages_to_vhost_store/0]). --export([stop_vhost_msg_store/1]). + -include_lib("stdlib/include/qlc.hrl"). -define(QUEUE_MIGRATION_BATCH_SIZE, 100). +-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}). %%---------------------------------------------------------------------------- %% Messages, and their position in the queue, can be in memory or on @@ -320,7 +318,10 @@ %% number of reduce_memory_usage executions, once it %% reaches a threshold the queue will manually trigger a runtime GC %% see: maybe_execute_gc/1 - memory_reduction_run_count + memory_reduction_run_count, + %% Queue data is grouped by VHost. We need to store it + %% to work with queue index. + virtual_host }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -345,8 +346,6 @@ }). -define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 --define(PERSISTENT_MSG_STORE_SUP, msg_store_persistent_vhost). --define(TRANSIENT_MSG_STORE_SUP, msg_store_transient_vhost). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). @@ -477,66 +476,37 @@ explicit_gc_run_operation_threshold_for_mode(Mode) -> %% Public API %%---------------------------------------------------------------------------- -start(DurableQueues) -> - {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), +start(VHost, DurableQueues) -> + {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues), %% Group recovery terms by vhost. - {[], VhostRefs} = lists:foldl( - fun - %% We need to skip a queue name - (non_clean_shutdown, {[_|QNames], VhostRefs}) -> - {QNames, VhostRefs}; - (Terms, {[QueueName | QNames], VhostRefs}) -> - case proplists:get_value(persistent_ref, Terms) of - undefined -> {QNames, VhostRefs}; - Ref -> - #resource{virtual_host = VHost} = QueueName, - Refs = case maps:find(VHost, VhostRefs) of - {ok, Val} -> Val; - error -> [] - end, - {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)} - end - end, - {DurableQueues, #{}}, - AllTerms), - start_msg_store(VhostRefs, StartFunState), + ClientRefs = [Ref || Terms <- AllTerms, + Terms /= non_clean_shutdown, + begin + Ref = proplists:get_value(persistent_ref, Terms), + Ref =/= undefined + end], + start_msg_store(VHost, ClientRefs, StartFunState), {ok, AllTerms}. -stop() -> - ok = stop_msg_store(), - ok = rabbit_queue_index:stop(). - -start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined -> - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup, - [?TRANSIENT_MSG_STORE_SUP, - undefined, {fun (ok) -> finished end, ok}]), - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup, - [?PERSISTENT_MSG_STORE_SUP, Refs, StartFunState]), - %% Start message store for all known vhosts - VHosts = rabbit_vhost:list(), - %% TODO: recovery is limited by queue index recovery - %% pool size. There is no point in parallelizing vhost - %% recovery until there will be a queue index - %% recovery pool per vhost - lists:foreach(fun(Vhost) -> - add_vhost_msg_store(Vhost) - end, - lists:sort(VHosts)), - ok. +stop(VHost) -> + ok = stop_msg_store(VHost), + ok = rabbit_queue_index:stop(VHost). -add_vhost_msg_store(VHost) -> +start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined -> rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]), - rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), - rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost), + {ok, _} = rabbit_vhost_msg_store:start(VHost, + ?TRANSIENT_MSG_STORE, + undefined, + ?EMPTY_START_FUN_STATE), + {ok, _} = rabbit_vhost_msg_store:start(VHost, + ?PERSISTENT_MSG_STORE, + Refs, + StartFunState), rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]). -stop_msg_store() -> - ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP), - ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP). - -stop_vhost_msg_store(VHost) -> - rabbit_msg_store_vhost_sup:delete_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), - rabbit_msg_store_vhost_sup:delete_vhost(?PERSISTENT_MSG_STORE_SUP, VHost), +stop_msg_store(VHost) -> + rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE), + rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE), ok. init(Queue, Recover, Callback) -> @@ -555,12 +525,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, VHost = QueueName#resource.virtual_host, init(IsDurable, IndexState, 0, 0, [], case IsDurable of - true -> msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, + true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback, VHost); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined, - AsyncCallback, VHost)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, + AsyncCallback, VHost), VHost); %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, @@ -569,7 +539,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, VHost = QueueName#resource.virtual_host, {PersistentClient, ContainsCheckFun} = case IsDurable of - true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, PRef, + true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun, AsyncCallback, VHost), {C, fun (MsgId) when is_binary(MsgId) -> @@ -579,17 +549,18 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, end}; false -> {undefined, fun(_MsgId) -> false end} end, - TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, + TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, - rabbit_msg_store_vhost_sup:successfully_recovered_state( - ?PERSISTENT_MSG_STORE_SUP, VHost), + rabbit_vhost_msg_store:successfully_recovered_state( + VHost, + ?PERSISTENT_MSG_STORE), ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, - PersistentClient, TransientClient). + PersistentClient, TransientClient, VHost). process_recovery_terms(Terms=non_clean_shutdown) -> {rabbit_guid:gen(), Terms}; @@ -600,7 +571,8 @@ process_recovery_terms(Terms) -> end. terminate(_Reason, State) -> - State1 = #vqstate { persistent_count = PCount, + State1 = #vqstate { virtual_host = VHost, + persistent_count = PCount, persistent_bytes = PBytes, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = @@ -614,9 +586,9 @@ terminate(_Reason, State) -> Terms = [{persistent_ref, PRef}, {persistent_count, PCount}, {persistent_bytes, PBytes}], - a(State1 #vqstate { index_state = rabbit_queue_index:terminate( - Terms, IndexState), - msg_store_clients = undefined }). + a(State1#vqstate { + index_state = rabbit_queue_index:terminate(VHost, Terms, IndexState), + msg_store_clients = undefined }). %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. @@ -1270,12 +1242,12 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> Callback, VHost). msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> - CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP), - rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun, - fun () -> - Callback(?MODULE, CloseFDsFun) - end, - VHost). + CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), + rabbit_vhost_msg_store:client_init(VHost, MsgStore, + Ref, MsgOnDiskFun, + fun () -> + Callback(?MODULE, CloseFDsFun) + end). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( @@ -1379,7 +1351,7 @@ expand_delta(_SeqId, #delta { count = Count, %%---------------------------------------------------------------------------- init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, - PersistentClient, TransientClient) -> + PersistentClient, TransientClient, VHost) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {DeltaCount1, DeltaBytes1} = @@ -1446,7 +1418,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, io_batch_size = IoBatchSize, mode = default, - memory_reduction_run_count = 0}, + memory_reduction_run_count = 0, + virtual_host = VHost}, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> @@ -2771,16 +2744,21 @@ multiple_routing_keys() -> %% Assumes message store is not running transform_storage(TransformFun) -> - transform_store(?PERSISTENT_MSG_STORE_SUP, TransformFun), - transform_store(?TRANSIENT_MSG_STORE_SUP, TransformFun). + transform_store(?PERSISTENT_MSG_STORE, TransformFun), + transform_store(?TRANSIENT_MSG_STORE, TransformFun). transform_store(Store, TransformFun) -> rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). move_messages_to_vhost_store() -> + case list_persistent_queues() of + [] -> ok; + Queues -> move_messages_to_vhost_store(Queues) + end. + +move_messages_to_vhost_store(Queues) -> log_upgrade("Moving messages to per-vhost message store"), - Queues = list_persistent_queues(), %% Move the queue index for each persistent queue to the new store lists:foreach( fun(Queue) -> @@ -2791,20 +2769,18 @@ move_messages_to_vhost_store() -> %% Legacy (global) msg_store may require recovery. %% This upgrade step should only be started %% if we are upgrading from a pre-3.7.0 version. - {QueuesWithTerms, RecoveryRefs, StartFunState} = start_recovery_terms(Queues), + {QueuesWithTerms, RecoveryRefs, StartFunState} = read_old_recovery_terms(Queues), OldStore = run_old_persistent_store(RecoveryRefs, StartFunState), + + VHosts = rabbit_vhost:list(), + %% New store should not be recovered. - NewStoreSup = start_new_store_sup(), - Vhosts = rabbit_vhost:list(), - lists:foreach(fun(VHost) -> - rabbit_msg_store_vhost_sup:add_vhost(NewStoreSup, VHost) - end, - Vhosts), + ok = start_new_store(VHosts), MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size, ?QUEUE_MIGRATION_BATCH_SIZE), in_batches(MigrationBatchSize, - {rabbit_variable_queue, migrate_queue, [OldStore, NewStoreSup]}, + {rabbit_variable_queue, migrate_queue, [OldStore]}, QueuesWithTerms, "message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ~n", "message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"), @@ -2813,7 +2789,7 @@ move_messages_to_vhost_store() -> delete_old_store(OldStore), ok = rabbit_queue_index:stop(), - ok = rabbit_sup:stop_child(NewStoreSup), + ok = stop_new_store(VHosts), ok. in_batches(Size, MFA, List, MessageStart, MessageEnd) -> @@ -2840,12 +2816,12 @@ in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) -> rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]), in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd). -migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore, NewStoreSup) -> +migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore) -> log_upgrade_verbose( "Migrating messages in queue ~s in vhost ~s to per-vhost message store~n", [Name, VHost]), OldStoreClient = get_global_store_client(OldStore), - NewStoreClient = get_per_vhost_store_client(QueueName, NewStoreSup), + NewStoreClient = get_per_vhost_store_client(QueueName), %% WARNING: During scan_queue_segments queue index state is being recovered %% and terminated. This can cause side effects! rabbit_queue_index:scan_queue_segments( @@ -2881,12 +2857,11 @@ migrate_message(MsgId, OldC, NewC) -> _ -> OldC end. -get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStoreSup) -> - rabbit_msg_store_vhost_sup:client_init(NewStoreSup, - rabbit_guid:gen(), - fun(_,_) -> ok end, - fun() -> ok end, - VHost). +get_per_vhost_store_client(#resource{virtual_host = VHost}) -> + rabbit_vhost_msg_store:client_init(VHost, ?PERSISTENT_MSG_STORE, + rabbit_guid:gen(), + fun(_,_) -> ok end, + fun() -> ok end). get_global_store_client(OldStore) -> rabbit_msg_store:client_init(OldStore, @@ -2905,9 +2880,11 @@ list_persistent_queues() -> mnesia:read(rabbit_queue, Name, read) =:= []])) end). -start_recovery_terms(Queues) -> +read_old_recovery_terms([]) -> + {[], [], ?EMPTY_START_FUN_STATE}; +read_old_recovery_terms(Queues) -> QueueNames = [Name || #amqqueue{name = Name} <- Queues], - {AllTerms, StartFunState} = rabbit_queue_index:start(QueueNames), + {AllTerms, StartFunState} = rabbit_queue_index:read_global_recovery_terms(QueueNames), Refs = [Ref || Terms <- AllTerms, Terms /= non_clean_shutdown, begin @@ -2923,13 +2900,22 @@ run_old_persistent_store(Refs, StartFunState) -> Refs, StartFunState]), OldStoreName. -start_new_store_sup() -> - % Start persistent store sup without recovery. - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, - rabbit_msg_store_vhost_sup, - [?PERSISTENT_MSG_STORE_SUP, - undefined, {fun (ok) -> finished end, ok}]), - ?PERSISTENT_MSG_STORE_SUP. +start_new_store(VHosts) -> + %% Ensure vhost supervisor is started, so we can add vhsots to it. + %% TODO: Start message store for vhost without a supervisor. + lists:foreach(fun(VHost) -> + % Start persistent store without recovery. + {ok, _} = rabbit_vhost_msg_store:start(VHost, ?PERSISTENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE) + end, + VHosts), + ok. + +stop_new_store(VHosts) -> + lists:foreach(fun(VHost) -> + ok = rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE) + end, + VHosts), + ok. delete_old_store(OldStore) -> ok = rabbit_sup:stop_child(OldStore), diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 5ed23d9114..6d046021fd 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -20,11 +20,12 @@ %%---------------------------------------------------------------------------- +-export([recover/0, recover/1]). -export([add/2, delete/2, exists/1, list/0, with/2, assert/1, update/2, set_limits/2, limits_of/1]). -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). -export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). --export([purge_messages/1]). +-export([delete_storage/1]). -spec add(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'. -spec delete(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'. @@ -42,6 +43,36 @@ -spec info_all(rabbit_types:info_keys(), reference(), pid()) -> 'ok'. +recover() -> + %% Clear out remnants of old incarnation, in case we restarted + %% faster than other nodes handled DOWN messages from us. + rabbit_amqqueue:on_node_down(node()), + + rabbit_amqqueue:warn_file_limit(), + %% rabbit_vhost_sup_sup will start the actual recovery. + %% So recovery will be run every time a vhost supervisor is restarted. + ok = rabbit_vhost_sup_sup:start(), + [{ok, _} = rabbit_vhost_sup_sup:vhost_sup(VHost) + || VHost <- rabbit_vhost:list()], + ok. + +recover(VHost) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + rabbit_log:info("Making sure data directory '~s' for vhost '~s' exists~n", + [VHostDir, VHost]), + VHostStubFile = filename:join(VHostDir, ".vhost"), + ok = rabbit_file:ensure_dir(VHostStubFile), + ok = file:write_file(VHostStubFile, VHost), +rabbit_log:info("Starting vhost ~p~n", [VHost]), + Qs = rabbit_amqqueue:recover(VHost), +rabbit_log:info("Queues recovered for vhost ~p~n", [VHost]), + ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), + [QName || #amqqueue{name = QName} <- Qs]), +rabbit_log:info("Bindings recovered for vhost ~p~n", [VHost]), + ok = rabbit_amqqueue:start(Qs), +rabbit_log:info("Queues started for vhost ~p~n", [VHost]), + ok. + %%---------------------------------------------------------------------------- -define(INFO_KEYS, [name, tracing]). @@ -96,17 +127,16 @@ delete(VHostPath, ActingUser) -> ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}, {user_who_performed_action, ActingUser}]), [ok = Fun() || Fun <- Funs], + %% After vhost was deleted from mnesia DB, we try to stop vhost supervisors + %% on all the nodes. + rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath), ok. -purge_messages(VHost) -> +delete_storage(VHost) -> VhostDir = msg_store_dir_path(VHost), rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]), - %% Message store is stopped to close file handles - rabbit_variable_queue:stop_vhost_msg_store(VHost), - ok = rabbit_file:recursive_delete([VhostDir]), - %% Ensure the store is terminated even if it was restarted during the delete operation - %% above. - rabbit_variable_queue:stop_vhost_msg_store(VHost). + %% Message store should be closed when vhost supervisor is closed. + ok = rabbit_file:recursive_delete([VhostDir]). assert_benign(ok, _) -> ok; assert_benign({ok, _}, _) -> ok; @@ -134,7 +164,6 @@ internal_delete(VHostPath, ActingUser) -> Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info), ActingUser) || Info <- rabbit_policy:list(VHostPath)], ok = mnesia:delete({rabbit_vhost, VHostPath}), - purge_messages(VHostPath), Fs1 ++ Fs2. exists(VHostPath) -> diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl new file mode 100644 index 0000000000..482ad082b8 --- /dev/null +++ b/src/rabbit_vhost_msg_store.erl @@ -0,0 +1,61 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_vhost_msg_store). + +-include("rabbit.hrl"). + +-export([start/4, stop/2, client_init/5, successfully_recovered_state/2]). + + +start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs); + ClientRefs == undefined -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + supervisor2:start_child(VHostSup, + {Type, {rabbit_msg_store, start_link, + [Type, VHostDir, ClientRefs, StartupFunState]}, + transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}). + +stop(VHost, Type) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + ok = supervisor2:terminate_child(VHostSup, Type), + ok = supervisor2:delete_child(VHostSup, Type). + +client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) -> + with_vhost_store(VHost, Type, fun(StorePid) -> + rabbit_msg_store:client_init(StorePid, Ref, MsgOnDiskFun, CloseFDsFun) + end). + +with_vhost_store(VHost, Type, Fun) -> + case vhost_store_pid(VHost, Type) of + no_pid -> + throw({message_store_not_started, Type, VHost}); + Pid when is_pid(Pid) -> + Fun(Pid) + end. + +vhost_store_pid(VHost, Type) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + case supervisor2:find_child(VHostSup, Type) of + [Pid] -> Pid; + [] -> no_pid + end. + +successfully_recovered_state(VHost, Type) -> + with_vhost_store(VHost, Type, fun(StorePid) -> + rabbit_msg_store:successfully_recovered_state(StorePid) + end).
\ No newline at end of file diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl new file mode 100644 index 0000000000..b8c7a649e5 --- /dev/null +++ b/src/rabbit_vhost_sup.erl @@ -0,0 +1,34 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_vhost_sup). + +-include("rabbit.hrl"). + +%% Supervisor is a per-vhost supervisor to contain queues and message stores +-behaviour(supervisor2). +-export([init/1]). +-export([start_link/1]). + +start_link(VHost) -> + supervisor2:start_link(?MODULE, [VHost]). + +init([VHost]) -> + {ok, {{one_for_all, 0, 1}, + [{rabbit_vhost_sup_watcher, + {rabbit_vhost_sup_watcher, start_link, [VHost]}, + intrinsic, ?WORKER_WAIT, worker, + [rabbit_vhost_sup]}]}}. diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl new file mode 100644 index 0000000000..58a5925af9 --- /dev/null +++ b/src/rabbit_vhost_sup_sup.erl @@ -0,0 +1,127 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_vhost_sup_sup). + +-include("rabbit.hrl"). + +-behaviour(supervisor2). + +-export([init/1]). + +-export([start_link/0, start/0]). +-export([vhost_sup/1, vhost_sup/2]). +-export([start_vhost/1, stop_and_delete_vhost/1, delete_on_all_nodes/1]). + +start() -> + rabbit_sup:start_supervisor_child(?MODULE). + +start_link() -> + supervisor2:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + ets:new(?MODULE, [named_table, public]), + {ok, {{simple_one_for_one, 1, 5}, + [{rabbit_vhost, {rabbit_vhost_sup_sup, start_vhost, []}, + transient, infinity, supervisor, + [rabbit_vhost_sup_sup, rabbit_vhost_sup]}]}}. + +start_vhost(VHost) -> + case rabbit_vhost_sup:start_link(VHost) of + {ok, Pid} -> + ok = save_vhost_pid(VHost, Pid), + ok = rabbit_vhost:recover(VHost), + {ok, Pid}; + Other -> + Other + end. + +stop_and_delete_vhost(VHost) -> + case vhost_pid(VHost) of + no_pid -> ok; + Pid when is_pid(Pid) -> + rabbit_log:info("Stopping vhost supervisor ~p for vhost ~p~n", + [Pid, VHost]), + case supervisor2:terminate_child(?MODULE, Pid) of + ok -> + ok = rabbit_vhost:delete_storage(VHost); + Other -> + Other + end + end. + +delete_on_all_nodes(VHost) -> +%% TODO: failing nodes + [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], + ok. + +%% We take an optimistic approach whan stopping a remote VHost supervisor. +stop_and_delete_vhost(VHost, Node) when Node == node(self()) -> + stop_and_delete_vhost(VHost); +stop_and_delete_vhost(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, stop_and_delete_vhost, [VHost]) of + ok -> ok; + {badrpc, RpcErr} -> + rabbit_log:error("Failed to stop and delete a vhost ~p" + " on node ~p." + " Reason: ~p", + [VHost, Node, RpcErr]), + {error, RpcErr} + end. + +vhost_sup(VHost, Local) when Local == node(self()) -> + vhost_sup(VHost); +vhost_sup(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {badrpc, RpcErr} -> + {error, RpcErr} + end. + +-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}. +vhost_sup(VHost) -> + case rabbit_vhost:exists(VHost) of + false -> {error, {no_such_vhost, VHost}}; + true -> + case vhost_pid(VHost) of + no_pid -> + case supervisor2:start_child(?MODULE, [VHost]) of + {ok, Pid} -> {ok, Pid}; + {error, {already_started, Pid}} -> {ok, Pid}; + Error -> throw(Error) + end; + Pid when is_pid(Pid) -> + {ok, Pid} + end + end. + +save_vhost_pid(VHost, Pid) -> + true = ets:insert(?MODULE, {VHost, Pid}), + ok. + +-spec vhost_pid(rabbit_types:vhost()) -> no_pid | pid(). +vhost_pid(VHost) -> + case ets:lookup(?MODULE, VHost) of + [] -> no_pid; + [{VHost, Pid}] -> + case erlang:is_process_alive(Pid) of + true -> Pid; + false -> + ets:delete_object(?MODULE, {VHost, Pid}), + no_pid + end + end. diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_sup_watcher.erl new file mode 100644 index 0000000000..3ce726621f --- /dev/null +++ b/src/rabbit_vhost_sup_watcher.erl @@ -0,0 +1,66 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +%% + +%% This module implements a watcher process which should stop +%% the parent supervisor if its vhost is missing from the mnesia DB + +-module(rabbit_vhost_sup_watcher). + +-include("rabbit.hrl"). + +-define(TICKTIME_RATIO, 4). + +-behaviour(gen_server2). +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + + +start_link(VHost) -> + gen_server2:start_link(?MODULE, [VHost], []). + + +init([VHost]) -> + Interval = interval(), + timer:send_interval(Interval, check_vhost), + {ok, VHost}. + +handle_call(_,_,VHost) -> + {reply, ok, VHost}. + +handle_cast(_, VHost) -> + {noreply, VHost}. + +handle_info(check_vhost, VHost) -> + case rabbit_vhost:exists(VHost) of + true -> {noreply, VHost}; + false -> + rabbit_log:error(" Vhost \"~p\" is gone." + " Stopping message store supervisor.", + [VHost]), + {stop, normal, VHost} + end; +handle_info(_, VHost) -> + {noreply, VHost}. + +terminate(_, _) -> ok. + +code_change(_OldVsn, VHost, _Extra) -> + {ok, VHost}. + +interval() -> + application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO.
\ No newline at end of file diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 326e0491d0..24398c477b 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -144,7 +144,7 @@ bytes(Words) -> try catch _:_ -> 0 end. - +%% TODO: per-vhost supervisor interesting_sups() -> [[rabbit_amqqueue_sup_sup], conn_sups() | interesting_sups0()]. |
