diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2016-11-13 01:01:09 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2016-11-13 01:01:09 +0300 |
| commit | 7a84a2492f541166591eee8f412c09911b4e13ab (patch) | |
| tree | be40f63382c49925ef3db732da87b6abcd7f2fdc | |
| parent | 245fd0804d451cef7f55a07e517b8ba176294e82 (diff) | |
| download | rabbitmq-server-git-7a84a2492f541166591eee8f412c09911b4e13ab.tar.gz | |
Towards functional message store migration fn
| -rw-r--r-- | src/rabbit_msg_store.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_msg_store_vhost_sup.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_sup.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 30 |
5 files changed, 43 insertions, 25 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 1e929c7f7c..cb87af70e1 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/4, successfully_recovered_state/1, +-export([start_link/4, start_global_store_link/4, successfully_recovered_state/1, client_init/4, client_terminate/1, client_delete_and_terminate/1, client_ref/1, close_all_indicated/1, write/3, write_flow/3, read/2, contains/2, remove/2]). @@ -479,10 +479,15 @@ start_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) -> [Name, Dir, ClientRefs, StartupFunState], [{timeout, infinity}]). +start_global_store_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) -> + gen_server2:start_link({local, Name}, ?MODULE, + [Name, Dir, ClientRefs, StartupFunState], + [{timeout, infinity}]). + successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). -client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server) -> +client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom(Server) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = gen_server2:call( @@ -548,7 +553,7 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds], server_cast(CState, {remove, CRef, MsgIds}). -set_maximum_since_use(Server, Age) when is_pid(Server) -> +set_maximum_since_use(Server, Age) when is_pid(Server); is_atom(Server) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl index 6b4988b06d..65bf2c74e8 100644 --- a/src/rabbit_msg_store_vhost_sup.erl +++ b/src/rabbit_msg_store_vhost_sup.erl @@ -26,7 +26,7 @@ add_vhost(Name, VHost) -> start_store_for_vhost(Name, ClientRefs, StartupFunState, VHost) -> case vhost_store_pid(Name, VHost) of no_pid -> - VHostDir = vhost_store_dir(VHost), + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), ok = rabbit_file:ensure_dir(VHostDir), rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]), case rabbit_msg_store:start_link(Name, VHostDir, ClientRefs, StartupFunState) of @@ -81,8 +81,3 @@ successfully_recovered_state(Name, VHost) -> Pid when is_pid(Pid) -> rabbit_msg_store:successfully_recovered_state(Pid) end. - -vhost_store_dir(VHost) -> - Dir = rabbit_mnesia:dir(), - EncodedName = list_to_binary(rabbit_vhost:dir(VHost)), - binary_to_list(filename:join([Dir, EncodedName])). diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index ad70540e5b..a457938dc9 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor). --export([start_link/0, start_child/1, start_child/2, start_child/3, +-export([start_link/0, start_child/1, start_child/2, start_child/3, start_child/4, start_supervisor_child/1, start_supervisor_child/2, start_supervisor_child/3, start_restartable_child/1, start_restartable_child/2, @@ -37,6 +37,7 @@ -spec start_child(atom()) -> 'ok'. -spec start_child(atom(), [any()]) -> 'ok'. -spec start_child(atom(), atom(), [any()]) -> 'ok'. +-spec start_child(atom(), atom(), atom(), [any()]) -> 'ok'. -spec start_supervisor_child(atom()) -> 'ok'. -spec start_supervisor_child(atom(), [any()]) -> 'ok'. -spec start_supervisor_child(atom(), atom(), [any()]) -> 'ok'. @@ -60,6 +61,13 @@ start_child(ChildId, Mod, Args) -> {ChildId, {Mod, start_link, Args}, transient, ?WORKER_WAIT, worker, [Mod]})). +start_child(ChildId, Mod, Fun, Args) -> + child_reply(supervisor:start_child( + ?SERVER, + {ChildId, {Mod, Fun, Args}, + transient, ?WORKER_WAIT, worker, [Mod]})). + + start_supervisor_child(Mod) -> start_supervisor_child(Mod, []). start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8995154886..fc8dee8385 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -2715,9 +2715,9 @@ transform_store(Store, TransformFun) -> move_messages_to_vhost_store() -> rabbit_log:info("Moving messages to per-vhsot message store"), Queues = list_persistent_queues(), - %% Old msg_store may require recovery. + %% Legacy (global) msg_store may require recovery. %% This upgrade step should only be started - %% if we are upgrading from version with old store. + %% if we are upgrading from a pre-3.7.0 version. {RecoveryTerms, StartFunState} = start_recovery_terms(Queues), OldStore = run_old_persistent_store(RecoveryTerms, StartFunState), %% New store should not be recovered. @@ -2743,9 +2743,11 @@ move_messages_to_vhost_store() -> ok = rabbit_sup:stop_child(NewStoreSup). migrate_queue(Queue, OldStore, NewStoreSup) -> + #amqqueue{name = QueueName} = Queue, + rabbit_log:info("Migrating messages in queue ~s in vhost ~s to per-vhost message store~n", + [QueueName#resource.name, QueueName#resource.virtual_host]), OldStoreClient = get_global_store_client(OldStore), NewStoreClient = get_per_vhost_store_client(Queue, NewStoreSup), - #amqqueue{name = QueueName} = Queue, rabbit_queue_index:move_to_per_vhost_stores(QueueName), %% WARNING: During scan_queue_segments queue index state is being recovered %% and terminated. This can cause side effects! @@ -2813,7 +2815,7 @@ start_recovery_terms(Queues) -> run_old_persistent_store(Refs, StartFunState) -> OldStoreName = ?PERSISTENT_MSG_STORE, - ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, + ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, start_global_store_link, [OldStoreName, rabbit_mnesia:dir(), Refs, StartFunState]), OldStoreName. diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 26fdaa0db8..bbf77f290f 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -23,7 +23,7 @@ -export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2, set_limits/2, limits_of/1]). -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). --export([dir/1]). +-export([dir/1, msg_store_dir_path/1]). -export([purge_messages/1]). -spec add(rabbit_types:vhost()) -> 'ok'. @@ -96,15 +96,15 @@ delete(VHostPath) -> [ok = Fun() || Fun <- Funs], ok. -purge_messages(VHostPath) -> - VhostDir = filename:join(rabbit_mnesia:dir(), dir(VHostPath)), - rabbit_log:info("Deleting vhost message store directory at '~s'~n", [VhostDir]), +purge_messages(VHost) -> + VhostDir = msg_store_dir_path(VHost), + rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]), %% Message store is stopped to close file handles - rabbit_variable_queue:stop_vhost_msg_store(VHostPath), + rabbit_variable_queue:stop_vhost_msg_store(VHost), ok = rabbit_file:recursive_delete([VhostDir]), - %% Second terminate is made in case message store is - %% restarted during deletion - rabbit_variable_queue:stop_vhost_msg_store(VHostPath). + %% Ensure the store is terminated even if it was restarted during the delete operation + %% above. + rabbit_variable_queue:stop_vhost_msg_store(VHost). assert_benign(ok) -> ok; assert_benign({ok, _}) -> ok; @@ -179,6 +179,17 @@ set_limits(VHost = #vhost{}, undefined) -> set_limits(VHost = #vhost{}, Limits) -> VHost#vhost{limits = Limits}. + +dir(Vhost) -> + <<Num:128>> = erlang:md5(term_to_binary(Vhost)), + rabbit_misc:format("~.36B", [Num]). + +msg_store_dir_path(VHost) -> + Dir = rabbit_mnesia:dir(), + EncodedName = list_to_binary(dir(VHost)), + binary_to_list(filename:join([Dir, "msg_stores", "vhosts", EncodedName])). + + %%---------------------------------------------------------------------------- infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. @@ -198,6 +209,3 @@ info_all(Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map( AggregatorPid, Ref, fun(VHost) -> info(VHost, Items) end, list()). -dir(Vhost) -> - <<Num:128>> = erlang:md5(term_to_binary(Vhost)), - rabbit_misc:format("~.36B", [Num]). |
