diff options
| -rw-r--r-- | Makefile | 4 | ||||
| -rw-r--r-- | priv/schema/rabbitmq.schema | 6 | ||||
| -rw-r--r-- | src/rabbit.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_sup_sup.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_msg_store_vhost_sup.erl | 103 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 64 | ||||
| -rw-r--r-- | src/rabbit_recovery_terms.erl | 114 | ||||
| -rw-r--r-- | src/rabbit_sup.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 238 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 44 | ||||
| -rw-r--r-- | src/rabbit_vhost_msg_store.erl | 61 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 171 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_watcher.erl | 66 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_wrapper.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 86 | ||||
| -rw-r--r-- | test/backing_queue_SUITE.erl | 67 | ||||
| -rw-r--r-- | test/channel_operation_timeout_test_queue.erl | 108 | ||||
| -rw-r--r-- | test/crashing_queues_SUITE.erl | 12 |
24 files changed, 897 insertions, 444 deletions
@@ -117,7 +117,9 @@ define PROJECT_ENV %% rabbitmq-server-589 {proxy_protocol, false}, {disk_monitor_failure_retries, 10}, - {disk_monitor_failure_retry_interval, 120000} + {disk_monitor_failure_retry_interval, 120000}, + %% either "stop_node" or "ignore" + {vhost_restart_strategy, stop_node} ] endef diff --git a/priv/schema/rabbitmq.schema b/priv/schema/rabbitmq.schema index fab07baeb4..6ccea48022 100644 --- a/priv/schema/rabbitmq.schema +++ b/priv/schema/rabbitmq.schema @@ -949,6 +949,12 @@ end}. {mapping, "proxy_protocol", "rabbit.proxy_protocol", [{datatype, {enum, [true, false]}}]}. +%% Whether to stop the rabbit application if VHost data +%% cannot be recovered. + +{mapping, "vhost_restart_strategy", "rabbit.vhost_restart_strategy", + [{datatype, {enum, [stop_node, ignore]}}]}. + % ========================== % Lager section % ========================== diff --git a/src/rabbit.erl b/src/rabbit.erl index 5228984ad2..8e6c9ead26 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -147,12 +147,6 @@ [{description, "core initialized"}, {requires, kernel_ready}]}). --rabbit_boot_step({empty_db_check, - [{description, "empty DB check"}, - {mfa, {?MODULE, maybe_insert_default_data, []}}, - {requires, core_initialized}, - {enables, routing_ready}]}). - -rabbit_boot_step({upgrade_queues, [{description, "per-vhost message store migration"}, {mfa, {rabbit_upgrade, @@ -164,7 +158,13 @@ -rabbit_boot_step({recovery, [{description, "exchange, queue and binding recovery"}, {mfa, {rabbit, recover, []}}, - {requires, core_initialized}, + {requires, [core_initialized]}, + {enables, routing_ready}]}). + +-rabbit_boot_step({empty_db_check, + [{description, "empty DB check"}, + {mfa, {?MODULE, maybe_insert_default_data, []}}, + {requires, recovery}, {enables, routing_ready}]}). -rabbit_boot_step({mirrored_queues, @@ -829,10 +829,7 @@ boot_delegate() -> recover() -> rabbit_policy:recover(), - Qs = rabbit_amqqueue:recover(), - ok = rabbit_binding:recover(rabbit_exchange:recover(), - [QName || #amqqueue{name = QName} <- Qs]), - rabbit_amqqueue:start(Qs). + rabbit_vhost:recover(). maybe_insert_default_data() -> case rabbit_table:needs_default_data() of diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index d0c55b2c0e..347dbbb48a 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -19,6 +19,8 @@ -behaviour(supervisor2). -export([start_link/0, start_queue_process/3]). +-export([start_for_vhost/1, stop_for_vhost/1, + find_for_vhost/2, find_for_vhost/1]). -export([init/1]). @@ -36,14 +38,42 @@ %%---------------------------------------------------------------------------- start_link() -> - supervisor2:start_link({local, ?SERVER}, ?MODULE, []). + supervisor2:start_link(?MODULE, []). start_queue_process(Node, Q, StartMode) -> - {ok, _SupPid, QPid} = supervisor2:start_child( - {?SERVER, Node}, [Q, StartMode]), + #amqqueue{name = #resource{virtual_host = VHost}} = Q, + {ok, Sup} = find_for_vhost(VHost, Node), + {ok, _SupPid, QPid} = supervisor2:start_child(Sup, [Q, StartMode]), QPid. init([]) -> {ok, {{simple_one_for_one, 10, 10}, [{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, temporary, ?SUPERVISOR_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}. + +-spec find_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. +find_for_vhost(VHost) -> + find_for_vhost(VHost, node()). + +-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}. +find_for_vhost(VHost, Node) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node), + case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of + [QSup] -> {ok, QSup}; + Result -> {error, {queue_supervisor_not_found, Result}} + end. + +-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. +start_for_vhost(VHost) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + supervisor2:start_child( + VHostSup, + {rabbit_amqqueue_sup_sup, + {rabbit_amqqueue_sup_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}). + +-spec stop_for_vhost(rabbit_types:vhost()) -> ok. +stop_for_vhost(VHost) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), + ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup).
\ No newline at end of file diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 4061098a9d..cc2797489d 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, policy_changed/2, callback/4, declare/7, +-export([recover/1, policy_changed/2, callback/4, declare/7, assert_equivalence/6, assert_args_equivalence/2, check_type/1, lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2, update_scratch/3, update_decorators/1, immutable/1, @@ -36,7 +36,7 @@ -type type() :: atom(). -type fun_name() :: atom(). --spec recover() -> [name()]. +-spec recover(rabbit_types:vhost()) -> [name()]. -spec callback (rabbit_types:exchange(), fun_name(), fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'. @@ -107,10 +107,11 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments, policy, user_who_performed_action]). -recover() -> +recover(VHost) -> Xs = rabbit_misc:table_filter( fun (#exchange{name = XName}) -> - mnesia:read({rabbit_exchange, XName}) =:= [] + XName#resource.virtual_host =:= VHost andalso + mnesia:read({rabbit_exchange, XName}) =:= [] end, fun (X, Tx) -> X1 = case Tx of diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index b9952178e0..fefa0de1c9 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,7 +26,7 @@ msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, zip_msgs_and_acks/4]). --export([start/1, stop/0, delete_crashed/1]). +-export([start/2, stop/1, delete_crashed/1]). -export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]). @@ -81,12 +81,12 @@ %% Backing queue %% --------------------------------------------------------------------------- -start(_DurableQueues) -> +start(_Vhost, _DurableQueues) -> %% This will never get called as this module will never be %% installed as the default BQ implementation. exit({not_valid_for_generic_backing_queue, ?MODULE}). -stop() -> +stop(_Vhost) -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 748a5afdf5..ee697be501 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -197,8 +197,10 @@ stop_pending_slaves(QName, Pids) -> case erlang:process_info(Pid, dictionary) of undefined -> ok; {dictionary, Dict} -> + Vhost = QName#resource.virtual_host, + {ok, AmqQSup} = rabbit_amqqueue_sup_sup:find_for_vhost(Vhost), case proplists:get_value('$ancestors', Dict) of - [Sup, rabbit_amqqueue_sup_sup | _] -> + [Sup, AmqQSup | _] -> exit(Sup, kill), exit(Pid, kill); _ -> diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl deleted file mode 100644 index 5031c5f043..0000000000 --- a/src/rabbit_msg_store_vhost_sup.erl +++ /dev/null @@ -1,103 +0,0 @@ --module(rabbit_msg_store_vhost_sup). - --include("rabbit.hrl"). - --behaviour(supervisor2). - --export([start_link/3, init/1, add_vhost/2, delete_vhost/2, - client_init/5, successfully_recovered_state/2]). - -%% Internal --export([start_store_for_vhost/4]). - -start_link(Type, VhostsClientRefs, StartupFunState) when is_map(VhostsClientRefs); - VhostsClientRefs == undefined -> - supervisor2:start_link({local, Type}, ?MODULE, - [Type, VhostsClientRefs, StartupFunState]). - -init([Type, VhostsClientRefs, StartupFunState]) -> - ets:new(Type, [named_table, public]), - {ok, {{simple_one_for_one, 1, 1}, - [{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_store_for_vhost, - [Type, VhostsClientRefs, StartupFunState]}, - transient, infinity, supervisor, [rabbit_msg_store]}]}}. - - -add_vhost(Type, VHost) -> - VHostPid = maybe_start_store_for_vhost(Type, VHost), - {ok, VHostPid}. - -start_store_for_vhost(Type, VhostsClientRefs, StartupFunState, VHost) -> - case vhost_store_pid(Type, VHost) of - no_pid -> - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - ok = rabbit_file:ensure_dir(VHostDir), - rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]), - VhostRefs = refs_for_vhost(VHost, VhostsClientRefs), - VhostStartupFunState = startup_fun_state_for_vhost(StartupFunState, VHost), - case rabbit_msg_store:start_link(Type, VHostDir, VhostRefs, VhostStartupFunState) of - {ok, Pid} -> - ets:insert(Type, {VHost, Pid}), - {ok, Pid}; - Other -> Other - end; - Pid when is_pid(Pid) -> - {error, {already_started, Pid}} - end. - -startup_fun_state_for_vhost({Fun, {start, [#resource{}|_] = QNames}}, VHost) -> - QNamesForVhost = [QName || QName = #resource{virtual_host = VH} <- QNames, - VH == VHost ], - {Fun, {start, QNamesForVhost}}; -startup_fun_state_for_vhost(State, _VHost) -> State. - -refs_for_vhost(_, undefined) -> undefined; -refs_for_vhost(VHost, Refs) -> - case maps:find(VHost, Refs) of - {ok, Val} -> Val; - error -> [] - end. - - -delete_vhost(Type, VHost) -> - case vhost_store_pid(Type, VHost) of - no_pid -> ok; - Pid when is_pid(Pid) -> - supervisor2:terminate_child(Type, Pid), - cleanup_vhost_store(Type, VHost, Pid) - end, - ok. - -client_init(Type, Ref, MsgOnDiskFun, CloseFDsFun, VHost) -> - VHostPid = maybe_start_store_for_vhost(Type, VHost), - rabbit_msg_store:client_init(VHostPid, Ref, MsgOnDiskFun, CloseFDsFun). - -maybe_start_store_for_vhost(Type, VHost) -> - case supervisor2:start_child(Type, [VHost]) of - {ok, Pid} -> Pid; - {error, {already_started, Pid}} -> Pid; - Error -> throw(Error) - end. - -vhost_store_pid(Type, VHost) -> - case ets:lookup(Type, VHost) of - [] -> no_pid; - [{VHost, Pid}] -> - case erlang:is_process_alive(Pid) of - true -> Pid; - false -> - cleanup_vhost_store(Type, VHost, Pid), - no_pid - end - end. - -cleanup_vhost_store(Type, VHost, Pid) -> - ets:delete_object(Type, {VHost, Pid}). - -successfully_recovered_state(Type, VHost) -> - case vhost_store_pid(Type, VHost) of - no_pid -> - throw({message_store_not_started, Type, VHost}); - Pid when is_pid(Pid) -> - rabbit_msg_store:successfully_recovered_state(Pid) - end. diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 34e23260ba..41e65e8a1f 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -30,7 +30,7 @@ -export([enable/0]). --export([start/1, stop/0]). +-export([start/2, stop/1]). -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, purge/1, purge_acks/1, @@ -83,22 +83,22 @@ enable() -> %%---------------------------------------------------------------------------- -start(QNames) -> +start(VHost, QNames) -> BQ = bq(), %% TODO this expand-collapse dance is a bit ridiculous but it's what %% rabbit_amqqueue:recover/0 expects. We could probably simplify %% this if we rejigged recovery a bit. {DupNames, ExpNames} = expand_queues(QNames), - case BQ:start(ExpNames) of + case BQ:start(VHost, ExpNames) of {ok, ExpRecovery} -> {ok, collapse_recovery(QNames, DupNames, ExpRecovery)}; Else -> Else end. -stop() -> +stop(VHost) -> BQ = bq(), - BQ:stop(). + BQ:stop(VHost). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 67f783a8dd..a71eaf1ff4 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -17,16 +17,19 @@ -module(rabbit_queue_index). -export([erase/1, init/3, reset_state/1, recover/6, - terminate/2, delete_and_terminate/1, + terminate/3, delete_and_terminate/1, pre_publish/7, flush_pre_publish_cache/2, publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, - read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). + read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]). -export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). -export([scan_queue_segments/3]). %% Migrates from global to per-vhost message stores --export([move_to_per_vhost_stores/1, update_recovery_term/2]). +-export([move_to_per_vhost_stores/1, + update_recovery_term/2, + read_global_recovery_terms/1, + cleanup_global_recovery_terms/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -261,7 +264,7 @@ on_sync_fun(), on_sync_fun()) -> {'undefined' | non_neg_integer(), 'undefined' | non_neg_integer(), qistate()}. --spec terminate([any()], qistate()) -> qistate(). +-spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate(). -spec delete_and_terminate(qistate()) -> qistate(). -spec publish(rabbit_types:msg_id(), seq_id(), rabbit_types:message_properties(), boolean(), @@ -278,7 +281,7 @@ -spec next_segment_boundary(seq_id()) -> seq_id(). -spec bounds(qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}. --spec start([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}. +-spec start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}. -spec add_queue_ttl() -> 'ok'. @@ -321,9 +324,9 @@ recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end. -terminate(Terms, State = #qistate { dir = Dir }) -> +terminate(VHost, Terms, State = #qistate { dir = Dir }) -> {SegmentCounts, State1} = terminate(State), - rabbit_recovery_terms:store(filename:basename(Dir), + rabbit_recovery_terms:store(VHost, filename:basename(Dir), [{segments, SegmentCounts} | Terms]), State1. @@ -491,34 +494,63 @@ bounds(State = #qistate { segments = Segments }) -> end, {LowSeqId, NextSeqId, State}. -start(DurableQueueNames) -> - ok = rabbit_recovery_terms:start(), +start(VHost, DurableQueueNames) -> + ok = rabbit_recovery_terms:start(VHost), {DurableTerms, DurableDirectories} = lists:foldl( fun(QName, {RecoveryTerms, ValidDirectories}) -> DirName = queue_name_to_dir_name(QName), - RecoveryInfo = case rabbit_recovery_terms:read(DirName) of + RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of {error, _} -> non_clean_shutdown; {ok, Terms} -> Terms end, {[RecoveryInfo | RecoveryTerms], sets:add_element(DirName, ValidDirectories)} end, {[], sets:new()}, DurableQueueNames), - %% Any queue directory we've not been asked to recover is considered garbage rabbit_file:recursive_delete( [DirName || - DirName <- all_queue_directory_names(), + DirName <- all_queue_directory_names(VHost), not sets:is_element(filename:basename(DirName), DurableDirectories)]), + rabbit_recovery_terms:clear(VHost), - rabbit_recovery_terms:clear(), + %% The backing queue interface requires that the queue recovery terms + %% which come back from start/1 are in the same order as DurableQueueNames + OrderedTerms = lists:reverse(DurableTerms), + {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. + + +read_global_recovery_terms(DurableQueueNames) -> + ok = rabbit_recovery_terms:open_global_table(), + + DurableTerms = + lists:foldl( + fun(QName, RecoveryTerms) -> + DirName = queue_name_to_dir_name(QName), + RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of + {error, _} -> non_clean_shutdown; + {ok, Terms} -> Terms + end, + [RecoveryInfo | RecoveryTerms] + end, [], DurableQueueNames), + ok = rabbit_recovery_terms:close_global_table(), %% The backing queue interface requires that the queue recovery terms %% which come back from start/1 are in the same order as DurableQueueNames OrderedTerms = lists:reverse(DurableTerms), {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. -stop() -> rabbit_recovery_terms:stop(). +cleanup_global_recovery_terms() -> + rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]), + rabbit_recovery_terms:delete_global_table(), + ok. + + +stop(VHost) -> rabbit_recovery_terms:stop(VHost). + +all_queue_directory_names(VHost) -> + filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_path(VHost), + "queues", "*"])). all_queue_directory_names() -> filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_wildcard(), @@ -1447,6 +1479,6 @@ move_to_per_vhost_stores(#resource{} = QueueName) -> end, ok. -update_recovery_term(#resource{} = QueueName, Term) -> +update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) -> Key = queue_name_to_dir_name(QueueName), - rabbit_recovery_terms:store(Key, Term). + rabbit_recovery_terms:store(VHost, Key, Term). diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index c941126cd3..be9b1b6227 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -21,51 +21,69 @@ -behaviour(gen_server). --export([start/0, stop/0, store/2, read/1, clear/0]). +-export([start/1, stop/1, store/3, read/2, clear/1]). --export([start_link/0]). +-export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([upgrade_recovery_terms/0, persistent_bytes/0]). +-export([open_global_table/0, close_global_table/0, + read_global/1, delete_global_table/0]). +-export([open_table/1, close_table/1]). -rabbit_upgrade({upgrade_recovery_terms, local, []}). -rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}). -%%---------------------------------------------------------------------------- - --spec start() -> rabbit_types:ok_or_error(term()). --spec stop() -> rabbit_types:ok_or_error(term()). --spec store(file:filename(), term()) -> rabbit_types:ok_or_error(term()). --spec read(file:filename()) -> rabbit_types:ok_or_error2(term(), not_found). --spec clear() -> 'ok'. +-include("rabbit.hrl"). %%---------------------------------------------------------------------------- --define(SERVER, ?MODULE). +-spec start(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()). +-spec stop(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()). +-spec store(rabbit_types:vhost(), file:filename(), term()) -> rabbit_types:ok_or_error(term()). +-spec read(rabbit_types:vhost(), file:filename()) -> rabbit_types:ok_or_error2(term(), not_found). +-spec clear(rabbit_types:vhost()) -> 'ok'. -start() -> rabbit_sup:start_child(?MODULE). +%%---------------------------------------------------------------------------- -stop() -> rabbit_sup:stop_child(?MODULE). +start(VHost) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + {ok, _} = supervisor2:start_child( + VHostSup, + {?MODULE, + {?MODULE, start_link, [VHost]}, + transient, ?WORKER_WAIT, worker, + [?MODULE]}), + ok. + +stop(VHost) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + case supervisor:terminate_child(VHostSup, ?MODULE) of + ok -> supervisor:delete_child(VHostSup, ?MODULE); + E -> E + end. -store(DirBaseName, Terms) -> dets:insert(?MODULE, {DirBaseName, Terms}). +store(VHost, DirBaseName, Terms) -> + dets:insert(VHost, {DirBaseName, Terms}). -read(DirBaseName) -> - case dets:lookup(?MODULE, DirBaseName) of +read(VHost, DirBaseName) -> + case dets:lookup(VHost, DirBaseName) of [{_, Terms}] -> {ok, Terms}; _ -> {error, not_found} end. -clear() -> - ok = dets:delete_all_objects(?MODULE), - flush(). +clear(VHost) -> + ok = dets:delete_all_objects(VHost), + flush(VHost). -start_link() -> gen_server:start_link(?MODULE, [], []). +start_link(VHost) -> + gen_server:start_link(?MODULE, [VHost], []). %%---------------------------------------------------------------------------- upgrade_recovery_terms() -> - open_table(), + open_global_table(), try QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"), Dirs = case rabbit_file:list_dir(QueuesDir) of @@ -75,37 +93,54 @@ upgrade_recovery_terms() -> [begin File = filename:join([QueuesDir, Dir, "clean.dot"]), case rabbit_file:read_term_file(File) of - {ok, Terms} -> ok = store(Dir, Terms); + {ok, Terms} -> ok = store(?MODULE, Dir, Terms); {error, _} -> ok end, file:delete(File) end || Dir <- Dirs], ok after - close_table() + close_global_table() end. persistent_bytes() -> dets_upgrade(fun persistent_bytes/1). persistent_bytes(Props) -> Props ++ [{persistent_bytes, 0}]. dets_upgrade(Fun)-> - open_table(), + open_global_table(), try ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) -> - store(DirBaseName, Fun(Terms)), + store(?MODULE, DirBaseName, Fun(Terms)), Acc end, ok, ?MODULE), ok after - close_table() + close_global_table() end. +open_global_table() -> + File = filename:join(rabbit_mnesia:dir(), "recovery.dets"), + {ok, _} = dets:open_file(?MODULE, [{file, File}, + {ram_file, true}, + {auto_save, infinity}]), + ok. + +close_global_table() -> + ok = dets:sync(?MODULE), + ok = dets:close(?MODULE). + +read_global(DirBaseName) -> + read(?MODULE, DirBaseName). + +delete_global_table() -> + file:delete(filename:join(rabbit_mnesia:dir(), "recovery.dets")). + %%---------------------------------------------------------------------------- -init(_) -> +init([VHost]) -> process_flag(trap_exit, true), - open_table(), - {ok, undefined}. + open_table(VHost), + {ok, VHost}. handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}. @@ -113,22 +148,23 @@ handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - close_table(). +terminate(_Reason, VHost) -> + close_table(VHost). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- -open_table() -> - File = filename:join(rabbit_mnesia:dir(), "recovery.dets"), - {ok, _} = dets:open_file(?MODULE, [{file, File}, - {ram_file, true}, - {auto_save, infinity}]). +open_table(VHost) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + File = filename:join(VHostDir, "recovery.dets"), + {ok, _} = dets:open_file(VHost, [{file, File}, + {ram_file, true}, + {auto_save, infinity}]). -flush() -> ok = dets:sync(?MODULE). +flush(VHost) -> ok = dets:sync(VHost). -close_table() -> - ok = flush(), - ok = dets:close(?MODULE). +close_table(VHost) -> + ok = flush(VHost), + ok = dets:close(VHost). diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 38d561fa80..0622d16e61 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -16,7 +16,7 @@ -module(rabbit_sup). --behaviour(supervisor). +-behaviour(supervisor2). -export([start_link/0, start_child/1, start_child/2, start_child/3, start_child/4, start_supervisor_child/1, start_supervisor_child/2, @@ -25,7 +25,7 @@ start_delayed_restartable_child/1, start_delayed_restartable_child/2, stop_child/1]). --export([init/1]). +-export([init/1, prep_stop/0]). -include("rabbit.hrl"). @@ -49,20 +49,20 @@ %%---------------------------------------------------------------------------- -start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). start_child(Mod) -> start_child(Mod, []). start_child(Mod, Args) -> start_child(Mod, Mod, Args). start_child(ChildId, Mod, Args) -> - child_reply(supervisor:start_child( + child_reply(supervisor2:start_child( ?SERVER, {ChildId, {Mod, start_link, Args}, transient, ?WORKER_WAIT, worker, [Mod]})). start_child(ChildId, Mod, Fun, Args) -> - child_reply(supervisor:start_child( + child_reply(supervisor2:start_child( ?SERVER, {ChildId, {Mod, Fun, Args}, transient, ?WORKER_WAIT, worker, [Mod]})). @@ -73,7 +73,7 @@ start_supervisor_child(Mod) -> start_supervisor_child(Mod, []). start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args). start_supervisor_child(ChildId, Mod, Args) -> - child_reply(supervisor:start_child( + child_reply(supervisor2:start_child( ?SERVER, {ChildId, {Mod, start_link, Args}, transient, infinity, supervisor, [Mod]})). @@ -85,20 +85,25 @@ start_delayed_restartable_child(M, A) -> start_restartable_child(M, A, true). start_restartable_child(Mod, Args, Delay) -> Name = list_to_atom(atom_to_list(Mod) ++ "_sup"), - child_reply(supervisor:start_child( + child_reply(supervisor2:start_child( ?SERVER, {Name, {rabbit_restartable_sup, start_link, [Name, {Mod, start_link, Args}, Delay]}, transient, infinity, supervisor, [rabbit_restartable_sup]})). stop_child(ChildId) -> - case supervisor:terminate_child(?SERVER, ChildId) of - ok -> supervisor:delete_child(?SERVER, ChildId); + case supervisor2:terminate_child(?SERVER, ChildId) of + ok -> supervisor2:delete_child(?SERVER, ChildId); E -> E end. init([]) -> {ok, {{one_for_all, 0, 1}, []}}. +prep_stop() -> + rabbit_log:info("Stopping dependencies...~n",[]), + Apps = rabbit_plugins:active(), + rabbit:stop_apps(app_utils:app_dependency_order(Apps, true)), + rabbit_log:info("Dependencies stopped...~n",[]). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index eae464219a..ed2143a2b9 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -262,7 +262,8 @@ maybe_upgrade_local() -> maybe_migrate_queues_to_per_vhost_storage() -> Result = case rabbit_version:upgrades_required(message_store) of {error, version_not_available} -> version_not_available; - {error, starting_from_scratch} -> starting_from_scratch; + {error, starting_from_scratch} -> + starting_from_scratch; {error, _} = Err -> throw(Err); {ok, []} -> ok; {ok, Upgrades} -> apply_upgrades(message_store, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 300da96441..86321003c9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -28,19 +28,17 @@ info/2, invoke/3, is_duplicate/2, set_queue_mode/2, zip_msgs_and_acks/4, multiple_routing_keys/0]). --export([start/1, stop/0]). - -%% exported for parallel map --export([add_vhost_msg_store/1]). +-export([start/2, stop/1]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/6]). +-export([start_msg_store/3, stop_msg_store/1, init/6]). -export([move_messages_to_vhost_store/0]). --export([stop_vhost_msg_store/1]). + -include_lib("stdlib/include/qlc.hrl"). -define(QUEUE_MIGRATION_BATCH_SIZE, 100). +-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}). %%---------------------------------------------------------------------------- %% Messages, and their position in the queue, can be in memory or on @@ -320,7 +318,10 @@ %% number of reduce_memory_usage executions, once it %% reaches a threshold the queue will manually trigger a runtime GC %% see: maybe_execute_gc/1 - memory_reduction_run_count + memory_reduction_run_count, + %% Queue data is grouped by VHost. We need to store it + %% to work with queue index. + virtual_host }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -345,8 +346,6 @@ }). -define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 --define(PERSISTENT_MSG_STORE_SUP, msg_store_persistent_vhost). --define(TRANSIENT_MSG_STORE_SUP, msg_store_transient_vhost). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). @@ -477,66 +476,37 @@ explicit_gc_run_operation_threshold_for_mode(Mode) -> %% Public API %%---------------------------------------------------------------------------- -start(DurableQueues) -> - {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), +start(VHost, DurableQueues) -> + {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues), %% Group recovery terms by vhost. - {[], VhostRefs} = lists:foldl( - fun - %% We need to skip a queue name - (non_clean_shutdown, {[_|QNames], VhostRefs}) -> - {QNames, VhostRefs}; - (Terms, {[QueueName | QNames], VhostRefs}) -> - case proplists:get_value(persistent_ref, Terms) of - undefined -> {QNames, VhostRefs}; - Ref -> - #resource{virtual_host = VHost} = QueueName, - Refs = case maps:find(VHost, VhostRefs) of - {ok, Val} -> Val; - error -> [] - end, - {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)} - end - end, - {DurableQueues, #{}}, - AllTerms), - start_msg_store(VhostRefs, StartFunState), + ClientRefs = [Ref || Terms <- AllTerms, + Terms /= non_clean_shutdown, + begin + Ref = proplists:get_value(persistent_ref, Terms), + Ref =/= undefined + end], + start_msg_store(VHost, ClientRefs, StartFunState), {ok, AllTerms}. -stop() -> - ok = stop_msg_store(), - ok = rabbit_queue_index:stop(). - -start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined -> - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup, - [?TRANSIENT_MSG_STORE_SUP, - undefined, {fun (ok) -> finished end, ok}]), - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup, - [?PERSISTENT_MSG_STORE_SUP, Refs, StartFunState]), - %% Start message store for all known vhosts - VHosts = rabbit_vhost:list(), - %% TODO: recovery is limited by queue index recovery - %% pool size. There is no point in parallelizing vhost - %% recovery until there will be a queue index - %% recovery pool per vhost - lists:foreach(fun(Vhost) -> - add_vhost_msg_store(Vhost) - end, - lists:sort(VHosts)), - ok. +stop(VHost) -> + ok = stop_msg_store(VHost), + ok = rabbit_queue_index:stop(VHost). -add_vhost_msg_store(VHost) -> +start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined -> rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]), - rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), - rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost), + {ok, _} = rabbit_vhost_msg_store:start(VHost, + ?TRANSIENT_MSG_STORE, + undefined, + ?EMPTY_START_FUN_STATE), + {ok, _} = rabbit_vhost_msg_store:start(VHost, + ?PERSISTENT_MSG_STORE, + Refs, + StartFunState), rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]). -stop_msg_store() -> - ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP), - ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP). - -stop_vhost_msg_store(VHost) -> - rabbit_msg_store_vhost_sup:delete_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), - rabbit_msg_store_vhost_sup:delete_vhost(?PERSISTENT_MSG_STORE_SUP, VHost), +stop_msg_store(VHost) -> + rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE), + rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE), ok. init(Queue, Recover, Callback) -> @@ -555,12 +525,12 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, VHost = QueueName#resource.virtual_host, init(IsDurable, IndexState, 0, 0, [], case IsDurable of - true -> msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, + true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback, VHost); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined, - AsyncCallback, VHost)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, + AsyncCallback, VHost), VHost); %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, @@ -569,7 +539,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, VHost = QueueName#resource.virtual_host, {PersistentClient, ContainsCheckFun} = case IsDurable of - true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, PRef, + true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun, AsyncCallback, VHost), {C, fun (MsgId) when is_binary(MsgId) -> @@ -579,17 +549,18 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, end}; false -> {undefined, fun(_MsgId) -> false end} end, - TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, + TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, - rabbit_msg_store_vhost_sup:successfully_recovered_state( - ?PERSISTENT_MSG_STORE_SUP, VHost), + rabbit_vhost_msg_store:successfully_recovered_state( + VHost, + ?PERSISTENT_MSG_STORE), ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, - PersistentClient, TransientClient). + PersistentClient, TransientClient, VHost). process_recovery_terms(Terms=non_clean_shutdown) -> {rabbit_guid:gen(), Terms}; @@ -600,23 +571,24 @@ process_recovery_terms(Terms) -> end. terminate(_Reason, State) -> - State1 = #vqstate { persistent_count = PCount, + State1 = #vqstate { virtual_host = VHost, + persistent_count = PCount, persistent_bytes = PBytes, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = purge_pending_ack(true, State), PRef = case MSCStateP of undefined -> undefined; - _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), + _ -> ok = maybe_client_terminate(MSCStateP), rabbit_msg_store:client_ref(MSCStateP) end, ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT), Terms = [{persistent_ref, PRef}, {persistent_count, PCount}, {persistent_bytes, PBytes}], - a(State1 #vqstate { index_state = rabbit_queue_index:terminate( - Terms, IndexState), - msg_store_clients = undefined }). + a(State1#vqstate { + index_state = rabbit_queue_index:terminate(VHost, Terms, IndexState), + msg_store_clients = undefined }). %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. @@ -1270,12 +1242,12 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> Callback, VHost). msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> - CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP), - rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun, - fun () -> - Callback(?MODULE, CloseFDsFun) - end, - VHost). + CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), + rabbit_vhost_msg_store:client_init(VHost, MsgStore, + Ref, MsgOnDiskFun, + fun () -> + Callback(?MODULE, CloseFDsFun) + end). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( @@ -1379,7 +1351,7 @@ expand_delta(_SeqId, #delta { count = Count, %%---------------------------------------------------------------------------- init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, - PersistentClient, TransientClient) -> + PersistentClient, TransientClient, VHost) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {DeltaCount1, DeltaBytes1} = @@ -1446,7 +1418,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, io_batch_size = IoBatchSize, mode = default, - memory_reduction_run_count = 0}, + memory_reduction_run_count = 0, + virtual_host = VHost}, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> @@ -2771,16 +2744,21 @@ multiple_routing_keys() -> %% Assumes message store is not running transform_storage(TransformFun) -> - transform_store(?PERSISTENT_MSG_STORE_SUP, TransformFun), - transform_store(?TRANSIENT_MSG_STORE_SUP, TransformFun). + transform_store(?PERSISTENT_MSG_STORE, TransformFun), + transform_store(?TRANSIENT_MSG_STORE, TransformFun). transform_store(Store, TransformFun) -> rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). move_messages_to_vhost_store() -> + case list_persistent_queues() of + % [] -> ok; + Queues -> move_messages_to_vhost_store(Queues) + end. + +move_messages_to_vhost_store(Queues) -> log_upgrade("Moving messages to per-vhost message store"), - Queues = list_persistent_queues(), %% Move the queue index for each persistent queue to the new store lists:foreach( fun(Queue) -> @@ -2791,30 +2769,30 @@ move_messages_to_vhost_store() -> %% Legacy (global) msg_store may require recovery. %% This upgrade step should only be started %% if we are upgrading from a pre-3.7.0 version. - {QueuesWithTerms, RecoveryRefs, StartFunState} = start_recovery_terms(Queues), + {QueuesWithTerms, RecoveryRefs, StartFunState} = read_old_recovery_terms(Queues), OldStore = run_old_persistent_store(RecoveryRefs, StartFunState), + + VHosts = rabbit_vhost:list(), + %% New store should not be recovered. - NewStoreSup = start_new_store_sup(), - Vhosts = rabbit_vhost:list(), - lists:foreach(fun(VHost) -> - rabbit_msg_store_vhost_sup:add_vhost(NewStoreSup, VHost) - end, - Vhosts), + NewMsgStore = start_new_store(VHosts), + %% Recovery terms should be started for all vhosts for new store. + [{ok, _} = rabbit_recovery_terms:open_table(VHost) || VHost <- VHosts], + MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size, ?QUEUE_MIGRATION_BATCH_SIZE), in_batches(MigrationBatchSize, - {rabbit_variable_queue, migrate_queue, [OldStore, NewStoreSup]}, + {rabbit_variable_queue, migrate_queue, [OldStore, NewMsgStore]}, QueuesWithTerms, "message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ~n", "message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"), log_upgrade("Message store migration finished"), - delete_old_store(OldStore), - - ok = rabbit_queue_index:stop(), - ok = rabbit_sup:stop_child(NewStoreSup), - ok. + ok = delete_old_store(OldStore), + ok = rabbit_queue_index:cleanup_global_recovery_terms(), + [ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts], + ok = stop_new_store(NewMsgStore). in_batches(Size, MFA, List, MessageStart, MessageEnd) -> in_batches(Size, 1, MFA, List, MessageStart, MessageEnd). @@ -2840,12 +2818,14 @@ in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) -> rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]), in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd). -migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore, NewStoreSup) -> +migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, + RecoveryTerm}, + OldStore, NewStore) -> log_upgrade_verbose( "Migrating messages in queue ~s in vhost ~s to per-vhost message store~n", [Name, VHost]), OldStoreClient = get_global_store_client(OldStore), - NewStoreClient = get_per_vhost_store_client(QueueName, NewStoreSup), + NewStoreClient = get_per_vhost_store_client(QueueName, NewStore), %% WARNING: During scan_queue_segments queue index state is being recovered %% and terminated. This can cause side effects! rabbit_queue_index:scan_queue_segments( @@ -2881,12 +2861,10 @@ migrate_message(MsgId, OldC, NewC) -> _ -> OldC end. -get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStoreSup) -> - rabbit_msg_store_vhost_sup:client_init(NewStoreSup, - rabbit_guid:gen(), - fun(_,_) -> ok end, - fun() -> ok end, - VHost). +get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStore) -> + {VHost, StorePid} = lists:keyfind(VHost, 1, NewStore), + rabbit_msg_store:client_init(StorePid, rabbit_guid:gen(), + fun(_,_) -> ok end, fun() -> ok end). get_global_store_client(OldStore) -> rabbit_msg_store:client_init(OldStore, @@ -2905,9 +2883,11 @@ list_persistent_queues() -> mnesia:read(rabbit_queue, Name, read) =:= []])) end). -start_recovery_terms(Queues) -> +read_old_recovery_terms([]) -> + {[], [], ?EMPTY_START_FUN_STATE}; +read_old_recovery_terms(Queues) -> QueueNames = [Name || #amqqueue{name = Name} <- Queues], - {AllTerms, StartFunState} = rabbit_queue_index:start(QueueNames), + {AllTerms, StartFunState} = rabbit_queue_index:read_global_recovery_terms(QueueNames), Refs = [Ref || Terms <- AllTerms, Terms /= non_clean_shutdown, begin @@ -2923,13 +2903,25 @@ run_old_persistent_store(Refs, StartFunState) -> Refs, StartFunState]), OldStoreName. -start_new_store_sup() -> - % Start persistent store sup without recovery. - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, - rabbit_msg_store_vhost_sup, - [?PERSISTENT_MSG_STORE_SUP, - undefined, {fun (ok) -> finished end, ok}]), - ?PERSISTENT_MSG_STORE_SUP. +start_new_store(VHosts) -> + %% Ensure vhost supervisor is started, so we can add vhsots to it. + lists:map(fun(VHost) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + {ok, Pid} = rabbit_msg_store:start_link(?PERSISTENT_MSG_STORE, + VHostDir, + undefined, + ?EMPTY_START_FUN_STATE), + {VHost, Pid} + end, + VHosts). + +stop_new_store(NewStore) -> + lists:foreach(fun({_VHost, StorePid}) -> + unlink(StorePid), + exit(StorePid, shutdown) + end, + NewStore), + ok. delete_old_store(OldStore) -> ok = rabbit_sup:stop_child(OldStore), @@ -2937,7 +2929,8 @@ delete_old_store(OldStore) -> [filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]), %% Delete old transient store as well rabbit_file:recursive_delete( - [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]). + [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]), + ok. log_upgrade(Msg) -> log_upgrade(Msg, []). @@ -2950,3 +2943,14 @@ log_upgrade_verbose(Msg) -> log_upgrade_verbose(Msg, Args) -> rabbit_log_upgrade:info(Msg, Args). + +maybe_client_terminate(MSCStateP) -> + %% Queue might have been asked to stop by the supervisor, it needs a clean + %% shutdown in order for the supervising strategy to work - if it reaches max + %% restarts might bring the vhost down. + try + rabbit_msg_store:client_terminate(MSCStateP) + catch + _:_ -> + ok + end. diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 5ed23d9114..1d1ea16cca 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -20,11 +20,12 @@ %%---------------------------------------------------------------------------- +-export([recover/0, recover/1]). -export([add/2, delete/2, exists/1, list/0, with/2, assert/1, update/2, set_limits/2, limits_of/1]). -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). -export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). --export([purge_messages/1]). +-export([delete_storage/1]). -spec add(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'. -spec delete(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'. @@ -42,6 +43,32 @@ -spec info_all(rabbit_types:info_keys(), reference(), pid()) -> 'ok'. +recover() -> + %% Clear out remnants of old incarnation, in case we restarted + %% faster than other nodes handled DOWN messages from us. + rabbit_amqqueue:on_node_down(node()), + + rabbit_amqqueue:warn_file_limit(), + %% rabbit_vhost_sup_sup will start the actual recovery. + %% So recovery will be run every time a vhost supervisor is restarted. + ok = rabbit_vhost_sup_sup:start(), + [{ok, _} = rabbit_vhost_sup_sup:vhost_sup(VHost) + || VHost <- rabbit_vhost:list()], + ok. + +recover(VHost) -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + rabbit_log:info("Making sure data directory '~s' for vhost '~s' exists~n", + [VHostDir, VHost]), + VHostStubFile = filename:join(VHostDir, ".vhost"), + ok = rabbit_file:ensure_dir(VHostStubFile), + ok = file:write_file(VHostStubFile, VHost), + Qs = rabbit_amqqueue:recover(VHost), + ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), + [QName || #amqqueue{name = QName} <- Qs]), + ok = rabbit_amqqueue:start(Qs), + ok. + %%---------------------------------------------------------------------------- -define(INFO_KEYS, [name, tracing]). @@ -75,6 +102,7 @@ add(VHostPath, ActingUser) -> {<<"amq.rabbitmq.trace">>, topic, true}]], ok end), + ok = rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath), rabbit_event:notify(vhost_created, info(VHostPath) ++ [{user_who_performed_action, ActingUser}]), R. @@ -96,17 +124,16 @@ delete(VHostPath, ActingUser) -> ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}, {user_who_performed_action, ActingUser}]), [ok = Fun() || Fun <- Funs], + %% After vhost was deleted from mnesia DB, we try to stop vhost supervisors + %% on all the nodes. + rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath), ok. -purge_messages(VHost) -> +delete_storage(VHost) -> VhostDir = msg_store_dir_path(VHost), rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]), - %% Message store is stopped to close file handles - rabbit_variable_queue:stop_vhost_msg_store(VHost), - ok = rabbit_file:recursive_delete([VhostDir]), - %% Ensure the store is terminated even if it was restarted during the delete operation - %% above. - rabbit_variable_queue:stop_vhost_msg_store(VHost). + %% Message store should be closed when vhost supervisor is closed. + ok = rabbit_file:recursive_delete([VhostDir]). assert_benign(ok, _) -> ok; assert_benign({ok, _}, _) -> ok; @@ -134,7 +161,6 @@ internal_delete(VHostPath, ActingUser) -> Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info), ActingUser) || Info <- rabbit_policy:list(VHostPath)], ok = mnesia:delete({rabbit_vhost, VHostPath}), - purge_messages(VHostPath), Fs1 ++ Fs2. exists(VHostPath) -> diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl new file mode 100644 index 0000000000..482ad082b8 --- /dev/null +++ b/src/rabbit_vhost_msg_store.erl @@ -0,0 +1,61 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_vhost_msg_store). + +-include("rabbit.hrl"). + +-export([start/4, stop/2, client_init/5, successfully_recovered_state/2]). + + +start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs); + ClientRefs == undefined -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + supervisor2:start_child(VHostSup, + {Type, {rabbit_msg_store, start_link, + [Type, VHostDir, ClientRefs, StartupFunState]}, + transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}). + +stop(VHost, Type) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + ok = supervisor2:terminate_child(VHostSup, Type), + ok = supervisor2:delete_child(VHostSup, Type). + +client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) -> + with_vhost_store(VHost, Type, fun(StorePid) -> + rabbit_msg_store:client_init(StorePid, Ref, MsgOnDiskFun, CloseFDsFun) + end). + +with_vhost_store(VHost, Type, Fun) -> + case vhost_store_pid(VHost, Type) of + no_pid -> + throw({message_store_not_started, Type, VHost}); + Pid when is_pid(Pid) -> + Fun(Pid) + end. + +vhost_store_pid(VHost, Type) -> + {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), + case supervisor2:find_child(VHostSup, Type) of + [Pid] -> Pid; + [] -> no_pid + end. + +successfully_recovered_state(VHost, Type) -> + with_vhost_store(VHost, Type, fun(StorePid) -> + rabbit_msg_store:successfully_recovered_state(StorePid) + end).
\ No newline at end of file diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl new file mode 100644 index 0000000000..b8c7a649e5 --- /dev/null +++ b/src/rabbit_vhost_sup.erl @@ -0,0 +1,34 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_vhost_sup). + +-include("rabbit.hrl"). + +%% Supervisor is a per-vhost supervisor to contain queues and message stores +-behaviour(supervisor2). +-export([init/1]). +-export([start_link/1]). + +start_link(VHost) -> + supervisor2:start_link(?MODULE, [VHost]). + +init([VHost]) -> + {ok, {{one_for_all, 0, 1}, + [{rabbit_vhost_sup_watcher, + {rabbit_vhost_sup_watcher, start_link, [VHost]}, + intrinsic, ?WORKER_WAIT, worker, + [rabbit_vhost_sup]}]}}. diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl new file mode 100644 index 0000000000..e528d64e0e --- /dev/null +++ b/src/rabbit_vhost_sup_sup.erl @@ -0,0 +1,171 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_vhost_sup_sup). + +-include("rabbit.hrl"). + +-behaviour(supervisor2). + +-export([init/1]). + +-export([start_link/0, start/0]). +-export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). +-export([delete_on_all_nodes/1]). +-export([start_vhost/1, start_vhost/2, start_on_all_nodes/1]). + +%% Internal +-export([stop_and_delete_vhost/1]). + +-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}). + +start() -> + case supervisor:start_child(rabbit_sup, {?MODULE, {?MODULE, start_link, []}, + permanent, infinity, supervisor, [?MODULE]}) of + {ok, _} -> ok; + {error, Err} -> {error, Err} + end. + +start_link() -> + supervisor2:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + VhostRestart = case application:get_env(rabbit, vhost_restart_strategy, stop_node) of + ignore -> transient; + stop_node -> permanent; + transient -> transient; + permanent -> permanent + end, + + ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]), + {ok, {{simple_one_for_one, 0, 5}, + [{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []}, + VhostRestart, infinity, supervisor, + [rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}. + +start_on_all_nodes(VHost) -> + [ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], + ok. + +delete_on_all_nodes(VHost) -> + [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], + ok. + +start_vhost(VHost, Node) when Node == node(self()) -> + start_vhost(VHost); +start_vhost(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {badrpc, RpcErr} -> + {error, RpcErr} + end. + +start_vhost(VHost) -> + case rabbit_vhost:exists(VHost) of + false -> {error, {no_such_vhost, VHost}}; + true -> + case vhost_sup_pid(VHost) of + no_pid -> + case supervisor2:start_child(?MODULE, [VHost]) of + {ok, _} -> ok; + {error, {already_started, _}} -> ok; + Error -> throw(Error) + end, + {ok, _} = vhost_sup_pid(VHost); + {ok, Pid} when is_pid(Pid) -> + {ok, Pid} + end + end. + +stop_and_delete_vhost(VHost) -> + case get_vhost_sup(VHost) of + not_found -> ok; + #vhost_sup{wrapper_pid = WrapperPid, + vhost_sup_pid = VHostSupPid} = VHostSup -> + case is_process_alive(WrapperPid) of + false -> ok; + true -> + rabbit_log:info("Stopping vhost supervisor ~p" + " for vhost ~p~n", + [VHostSupPid, VHost]), + case supervisor2:terminate_child(?MODULE, WrapperPid) of + ok -> + ets:delete_object(?MODULE, VHostSup), + ok = rabbit_vhost:delete_storage(VHost); + Other -> + Other + end + end + end. + +%% We take an optimistic approach whan stopping a remote VHost supervisor. +stop_and_delete_vhost(VHost, Node) when Node == node(self()) -> + stop_and_delete_vhost(VHost); +stop_and_delete_vhost(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, stop_and_delete_vhost, [VHost]) of + ok -> ok; + {badrpc, RpcErr} -> + rabbit_log:error("Failed to stop and delete a vhost ~p" + " on node ~p." + " Reason: ~p", + [VHost, Node, RpcErr]), + {error, RpcErr} + end. + +-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}. +vhost_sup(VHost, Local) when Local == node(self()) -> + vhost_sup(VHost); +vhost_sup(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {badrpc, RpcErr} -> + {error, RpcErr} + end. + +-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}. +vhost_sup(VHost) -> + start_vhost(VHost). + +-spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok. +save_vhost_sup(VHost, WrapperPid, VHostPid) -> + true = ets:insert(?MODULE, #vhost_sup{vhost = VHost, + vhost_sup_pid = VHostPid, + wrapper_pid = WrapperPid}), + ok. + +-spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}. +get_vhost_sup(VHost) -> + case ets:lookup(?MODULE, VHost) of + [] -> not_found; + [#vhost_sup{} = VHostSup] -> VHostSup + end. + +-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}. +vhost_sup_pid(VHost) -> + case get_vhost_sup(VHost) of + not_found -> + no_pid; + #vhost_sup{vhost_sup_pid = Pid} = VHostSup -> + case erlang:is_process_alive(Pid) of + true -> {ok, Pid}; + false -> + ets:delete_object(?MODULE, VHostSup), + no_pid + end + end. + diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_sup_watcher.erl new file mode 100644 index 0000000000..3ce726621f --- /dev/null +++ b/src/rabbit_vhost_sup_watcher.erl @@ -0,0 +1,66 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +%% + +%% This module implements a watcher process which should stop +%% the parent supervisor if its vhost is missing from the mnesia DB + +-module(rabbit_vhost_sup_watcher). + +-include("rabbit.hrl"). + +-define(TICKTIME_RATIO, 4). + +-behaviour(gen_server2). +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + + +start_link(VHost) -> + gen_server2:start_link(?MODULE, [VHost], []). + + +init([VHost]) -> + Interval = interval(), + timer:send_interval(Interval, check_vhost), + {ok, VHost}. + +handle_call(_,_,VHost) -> + {reply, ok, VHost}. + +handle_cast(_, VHost) -> + {noreply, VHost}. + +handle_info(check_vhost, VHost) -> + case rabbit_vhost:exists(VHost) of + true -> {noreply, VHost}; + false -> + rabbit_log:error(" Vhost \"~p\" is gone." + " Stopping message store supervisor.", + [VHost]), + {stop, normal, VHost} + end; +handle_info(_, VHost) -> + {noreply, VHost}. + +terminate(_, _) -> ok. + +code_change(_OldVsn, VHost, _Extra) -> + {ok, VHost}. + +interval() -> + application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO.
\ No newline at end of file diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl new file mode 100644 index 0000000000..3396f71fa9 --- /dev/null +++ b/src/rabbit_vhost_sup_wrapper.erl @@ -0,0 +1,53 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_vhost_sup_wrapper). + +-include("rabbit.hrl"). + +-behaviour(supervisor2). +-export([init/1]). +-export([start_link/1]). +-export([start_vhost_sup/1]). + +start_link(VHost) -> + supervisor2:start_link(?MODULE, [VHost]). + +%% This module is a wrapper around vhost supervisor to +%% provide exactly once restart. + +%% rabbit_vhost_sup supervisor children are added dynamically, +%% so one_for_all strategy cannot be used. + +init([VHost]) -> + %% Two restarts in 1 hour. One per message store. + {ok, {{one_for_all, 2, 3600000}, + [{rabbit_vhost_sup, + {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, + permanent, infinity, supervisor, + [rabbit_vhost_sup]}]}}. + +start_vhost_sup(VHost) -> + case rabbit_vhost_sup:start_link(VHost) of + {ok, Pid} -> + %% Save vhost sup record with wrapper pid and vhost sup pid. + ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid), + %% We can start recover as soon as we have vhost_sup record saved + ok = rabbit_vhost:recover(VHost), + {ok, Pid}; + Other -> + Other + end. diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 326e0491d0..b65536c0d4 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -41,17 +41,18 @@ memory() -> [aggregate(Names, Sums, memory, fun (X) -> X end) || Names <- distinguished_interesting_sups()], - Mnesia = mnesia_memory(), - MsgIndexETS = ets_memory([msg_store_persistent_vhost, msg_store_transient_vhost]), - MetricsETS = ets_memory([rabbit_metrics]), - MetricsProc = try - [{_, M}] = process_info(whereis(rabbit_metrics), [memory]), - M - catch - error:badarg -> - 0 - end, - MgmtDbETS = ets_memory([rabbit_mgmt_storage]), + Mnesia = mnesia_memory(), + MsgIndexETS = ets_memory(msg_stores()), + MetricsETS = ets_memory([rabbit_metrics]), + MetricsProc = + try + [{_, M}] = process_info(whereis(rabbit_metrics), [memory]), + M + catch + error:badarg -> + 0 + end, + MgmtDbETS = ets_memory([rabbit_mgmt_storage]), [{total, Total}, {processes, Processes}, @@ -124,8 +125,8 @@ mnesia_memory() -> _ -> 0 end. -ets_memory(OwnerNames) -> - lists:sum([V || {_K, V} <- ets_tables_memory(OwnerNames)]). +ets_memory(Owners) -> + lists:sum([V || {_K, V} <- ets_tables_memory(Owners)]). ets_tables_memory(all) -> [{ets:info(T, name), bytes(ets:info(T, memory))} @@ -133,11 +134,14 @@ ets_tables_memory(all) -> is_atom(T)]; ets_tables_memory(OwnerName) when is_atom(OwnerName) -> ets_tables_memory([OwnerName]); -ets_tables_memory(OwnerNames) when is_list(OwnerNames) -> - Owners = [whereis(N) || N <- OwnerNames], +ets_tables_memory(Owners) when is_list(Owners) -> + OwnerPids = lists:map(fun(O) when is_pid(O) -> O; + (O) when is_atom(O) -> whereis(O) + end, + Owners), [{ets:info(T, name), bytes(ets:info(T, memory))} || T <- ets:all(), - lists:member(ets:info(T, owner), Owners)]. + lists:member(ets:info(T, owner), OwnerPids)]. bytes(Words) -> try Words * erlang:system_info(wordsize) @@ -146,10 +150,37 @@ bytes(Words) -> try end. interesting_sups() -> - [[rabbit_amqqueue_sup_sup], conn_sups() | interesting_sups0()]. + [queue_sups(), conn_sups() | interesting_sups0()]. + +queue_sups() -> + all_vhosts_children(rabbit_amqqueue_sup_sup). + +msg_stores() -> + all_vhosts_children(msg_store_transient) + ++ + all_vhosts_children(msg_store_persistent). + +all_vhosts_children(Name) -> + case whereis(rabbit_vhost_sup_sup) of + undefined -> []; + Pid when is_pid(Pid) -> + lists:filtermap( + fun({_, VHostSupWrapper, _, _}) -> + case supervisor2:find_child(VHostSupWrapper, + rabbit_vhost_sup) of + [] -> false; + [VHostSup] -> + case supervisor2:find_child(VHostSup, Name) of + [QSup] -> {true, QSup}; + [] -> false + end + end + end, + supervisor:which_children(rabbit_vhost_sup_sup)) + end. interesting_sups0() -> - MsgIndexProcs = [msg_store_transient_vhost, msg_store_persistent_vhost], + MsgIndexProcs = msg_stores(), MgmtDbProcs = [rabbit_mgmt_sup_sup], PluginProcs = plugin_sups(), [MsgIndexProcs, MgmtDbProcs, PluginProcs]. @@ -166,18 +197,19 @@ ranch_server_sups() -> error:badarg -> [] end. -conn_sups(With) -> [{Sup, With} || Sup <- conn_sups()]. +with(Sups, With) -> [{Sup, With} || Sup <- Sups]. -distinguishers() -> [{rabbit_amqqueue_sup_sup, fun queue_type/1} | - conn_sups(fun conn_type/1)]. +distinguishers() -> with(queue_sups(), fun queue_type/1) ++ + with(conn_sups(), fun conn_type/1). distinguished_interesting_sups() -> - [[{rabbit_amqqueue_sup_sup, master}], - [{rabbit_amqqueue_sup_sup, slave}], - conn_sups(reader), - conn_sups(writer), - conn_sups(channel), - conn_sups(other)] + [ + with(queue_sups(), master), + with(queue_sups(), slave), + with(conn_sups(), reader), + with(conn_sups(), writer), + with(conn_sups(), channel), + with(conn_sups(), other)] ++ interesting_sups0(). plugin_sups() -> diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index e17b2afbf3..3ff215f497 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -21,10 +21,11 @@ -compile(export_all). --define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost). --define(TRANSIENT_MSG_STORE, msg_store_transient_vhost). +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). -define(TIMEOUT, 30000). +-define(VHOST, <<"/">>). -define(VARIABLE_QUEUE_TESTCASES, [ variable_queue_dynamic_duration_change, @@ -253,9 +254,9 @@ msg_store1(_Config) -> MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3), ok = rabbit_msg_store:client_terminate(MSCState4), %% stop and restart, preserving every other msg in 2nd half - ok = rabbit_variable_queue:stop_msg_store(), - ok = rabbit_variable_queue:start_msg_store( - #{}, {fun ([]) -> finished; + ok = rabbit_variable_queue:stop_msg_store(?VHOST), + ok = rabbit_variable_queue:start_msg_store(?VHOST, + [], {fun ([]) -> finished; ([MsgId|MsgIdsTail]) when length(MsgIdsTail) rem 2 == 0 -> {MsgId, 1, MsgIdsTail}; @@ -330,8 +331,8 @@ msg_store1(_Config) -> passed. restart_msg_store_empty() -> - ok = rabbit_variable_queue:stop_msg_store(), - ok = rabbit_variable_queue:start_msg_store( + ok = rabbit_variable_queue:stop_msg_store(?VHOST), + ok = rabbit_variable_queue:start_msg_store(?VHOST, undefined, {fun (ok) -> finished end, ok}). msg_id_bin(X) -> @@ -376,10 +377,10 @@ on_disk_stop(Pid) -> msg_store_client_init_capture(MsgStore, Ref) -> Pid = spawn(fun on_disk_capture/0), - {Pid, rabbit_msg_store_vhost_sup:client_init( - MsgStore, Ref, fun (MsgIds, _ActionTaken) -> - Pid ! {on_disk, MsgIds} - end, undefined, <<"/">>)}. + {Pid, rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref, + fun (MsgIds, _ActionTaken) -> + Pid ! {on_disk, MsgIds} + end, undefined)}. msg_store_contains(Atom, MsgIds, MSCState) -> Atom = lists:foldl( @@ -456,14 +457,16 @@ test_msg_store_confirm_timer() -> Ref = rabbit_guid:gen(), MsgId = msg_id_bin(1), Self = self(), - MSCState = rabbit_msg_store_vhost_sup:client_init( - ?PERSISTENT_MSG_STORE, Ref, - fun (MsgIds, _ActionTaken) -> - case gb_sets:is_member(MsgId, MsgIds) of - true -> Self ! on_disk; - false -> ok - end - end, undefined, <<"/">>), + MSCState = rabbit_vhost_msg_store:client_init( + ?VHOST, + ?PERSISTENT_MSG_STORE, + Ref, + fun (MsgIds, _ActionTaken) -> + case gb_sets:is_member(MsgId, MsgIds) of + true -> Self ! on_disk; + false -> ok + end + end, undefined), ok = msg_store_write([MsgId], MSCState), ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false), ok = msg_store_remove([MsgId], MSCState), @@ -651,8 +654,8 @@ bq_queue_index1(_Config) -> Qi8 end), - ok = rabbit_variable_queue:stop(), - {ok, _} = rabbit_variable_queue:start([]), + ok = rabbit_variable_queue:stop(?VHOST), + {ok, _} = rabbit_variable_queue:start(?VHOST, []), passed. @@ -672,8 +675,8 @@ bq_queue_index_props1(_Config) -> Qi2 end), - ok = rabbit_variable_queue:stop(), - {ok, _} = rabbit_variable_queue:start([]), + ok = rabbit_variable_queue:stop(?VHOST), + {ok, _} = rabbit_variable_queue:start(?VHOST, []), passed. @@ -718,7 +721,7 @@ bq_queue_recover1(Config) -> true, false, [], none, <<"acting-user">>), publish_and_confirm(Q, <<>>, Count), - SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(QPid), + SupPid = rabbit_ct_broker_helpers:get_queue_sup_pid(Q), true = is_pid(SupPid), exit(SupPid, kill), exit(QPid, kill), @@ -726,8 +729,8 @@ bq_queue_recover1(Config) -> receive {'DOWN', MRef, process, QPid, _Info} -> ok after 10000 -> exit(timeout_waiting_for_queue_death) end, - rabbit_amqqueue:stop(), - rabbit_amqqueue:start(rabbit_amqqueue:recover()), + rabbit_amqqueue:stop(?VHOST), + rabbit_amqqueue:start(rabbit_amqqueue:recover(?VHOST)), {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, @@ -1275,14 +1278,14 @@ init_test_queue(QName) -> Res. restart_test_queue(Qi, QName) -> - _ = rabbit_queue_index:terminate([], Qi), - ok = rabbit_variable_queue:stop(), - {ok, _} = rabbit_variable_queue:start([QName]), + _ = rabbit_queue_index:terminate(?VHOST, [], Qi), + ok = rabbit_variable_queue:stop(?VHOST), + {ok, _} = rabbit_variable_queue:start(?VHOST, [QName]), init_test_queue(QName). empty_test_queue(QName) -> - ok = rabbit_variable_queue:stop(), - {ok, _} = rabbit_variable_queue:start([]), + ok = rabbit_variable_queue:stop(?VHOST), + {ok, _} = rabbit_variable_queue:start(?VHOST, []), {0, 0, Qi} = init_test_queue(QName), _ = rabbit_queue_index:delete_and_terminate(Qi), ok. @@ -1337,7 +1340,7 @@ nop(_) -> ok. nop(_, _) -> ok. msg_store_client_init(MsgStore, Ref) -> - rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, undefined, undefined, <<"/">>). + rabbit_vhost_msg_store:client_init(?VHOST, MsgStore, Ref, undefined, undefined). variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl index 10d97726f4..09d7ab4e95 100644 --- a/test/channel_operation_timeout_test_queue.erl +++ b/test/channel_operation_timeout_test_queue.erl @@ -28,10 +28,10 @@ info/2, invoke/3, is_duplicate/2, set_queue_mode/2, zip_msgs_and_acks/4, multiple_routing_keys/0]). --export([start/1, stop/0]). +-export([start/2, stop/1]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/6]). +-export([start_msg_store/3, stop_msg_store/1, init/6]). %%---------------------------------------------------------------------------- %% This test backing queue follows the variable queue implementation, with @@ -87,7 +87,8 @@ io_batch_size, %% default queue or lazy queue - mode + mode, + virtual_host }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -111,10 +112,11 @@ }). -define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 --define(PERSISTENT_MSG_STORE, msg_store_persistent_vhost). --define(TRANSIENT_MSG_STORE, msg_store_transient_vhost). +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). -define(QUEUE, lqueue). -define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>). +-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -184,7 +186,8 @@ disk_write_count :: non_neg_integer(), io_batch_size :: pos_integer(), - mode :: 'default' | 'lazy' }. + mode :: 'default' | 'lazy', + virtual_host :: rabbit_types:vhost() }. %% Duplicated from rabbit_backing_queue -spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. @@ -213,55 +216,39 @@ %% Public API %%---------------------------------------------------------------------------- -start(DurableQueues) -> - {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), +start(VHost, DurableQueues) -> + {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues), %% Group recovery terms by vhost. - {[], VhostRefs} = lists:foldl( - fun - %% We need to skip a queue name - (non_clean_shutdown, {[_|QNames], VhostRefs}) -> - {QNames, VhostRefs}; - (Terms, {[QueueName | QNames], VhostRefs}) -> - case proplists:get_value(persistent_ref, Terms) of - undefined -> {QNames, VhostRefs}; - Ref -> - #resource{virtual_host = VHost} = QueueName, - Refs = case maps:find(VHost, VhostRefs) of - {ok, Val} -> Val; - error -> [] - end, - {QNames, maps:put(VHost, [Ref|Refs], VhostRefs)} - end - end, - {DurableQueues, #{}}, - AllTerms), - start_msg_store(VhostRefs, StartFunState), + ClientRefs = [Ref || Terms <- AllTerms, + Terms /= non_clean_shutdown, + begin + Ref = proplists:get_value(persistent_ref, Terms), + Ref =/= undefined + end], + start_msg_store(VHost, ClientRefs, StartFunState), {ok, AllTerms}. -stop() -> - ok = stop_msg_store(), - ok = rabbit_queue_index:stop(). - -start_msg_store(Refs, StartFunState) -> - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup, - [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), - undefined, {fun (ok) -> finished end, ok}]), - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup, - [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), - Refs, StartFunState]), - %% Start message store for all known vhosts - VHosts = rabbit_vhost:list(), - lists:foreach( - fun(VHost) -> - rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost), - rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost) - end, - VHosts), +stop(VHost) -> + ok = stop_msg_store(VHost), + ok = rabbit_queue_index:stop(VHost). + +start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined -> + rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]), + {ok, _} = rabbit_vhost_msg_store:start(VHost, + ?TRANSIENT_MSG_STORE, + undefined, + ?EMPTY_START_FUN_STATE), + {ok, _} = rabbit_vhost_msg_store:start(VHost, + ?PERSISTENT_MSG_STORE, + Refs, + StartFunState), + rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]). + +stop_msg_store(VHost) -> + rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE), + rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE), ok. -stop_msg_store() -> - ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), - ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). init(Queue, Recover, Callback) -> init( @@ -284,7 +271,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, VHost); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost), VHost); %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, @@ -309,10 +296,10 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, - rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost), + rabbit_vhost_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost), ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, - PersistentClient, TransientClient). + PersistentClient, TransientClient, VHost). process_recovery_terms(Terms=non_clean_shutdown) -> {rabbit_guid:gen(), Terms}; @@ -326,7 +313,8 @@ terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, persistent_bytes = PBytes, index_state = IndexState, - msg_store_clients = {MSCStateP, MSCStateT} } = + msg_store_clients = {MSCStateP, MSCStateT}, + virtual_host = VHost } = purge_pending_ack(true, State), PRef = case MSCStateP of undefined -> undefined; @@ -338,7 +326,7 @@ terminate(_Reason, State) -> {persistent_count, PCount}, {persistent_bytes, PBytes}], a(State1 #vqstate { index_state = rabbit_queue_index:terminate( - Terms, IndexState), + VHost, Terms, IndexState), msg_store_clients = undefined }). %% the only difference between purge and delete is that delete also @@ -990,10 +978,9 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_msg_store_vhost_sup:client_init( + rabbit_vhost_msg_store:client_init(VHost, MsgStore, Ref, MsgOnDiskFun, - fun () -> Callback(?MODULE, CloseFDsFun) end, - VHost). + fun () -> Callback(?MODULE, CloseFDsFun) end). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( @@ -1084,7 +1071,7 @@ expand_delta(_SeqId, #delta { count = Count } = Delta) -> %%---------------------------------------------------------------------------- init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, - PersistentClient, TransientClient) -> + PersistentClient, TransientClient, VHost) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {DeltaCount1, DeltaBytes1} = @@ -1148,7 +1135,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, io_batch_size = IoBatchSize, - mode = default }, + mode = default, + virtual_host = VHost }, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl index 457a4110fa..6e78c1579f 100644 --- a/test/crashing_queues_SUITE.erl +++ b/test/crashing_queues_SUITE.erl @@ -218,12 +218,18 @@ kill_queue(Node, QName) -> queue_pid(Node, QName) -> #amqqueue{pid = QPid, - state = State} = lookup(Node, QName), + state = State, + name = #resource{virtual_host = VHost}} = lookup(Node, QName), case State of - crashed -> case sup_child(Node, rabbit_amqqueue_sup_sup) of + crashed -> + case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of + {error, {queue_supervisor_not_found, Result}} -> {error, no_sup}; + {ok, SPid} -> + case sup_child(Node, SPid) of {ok, _} -> QPid; %% restarting {error, no_child} -> crashed %% given up - end; + end + end; _ -> QPid end. |
