diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-04-19 17:02:52 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-10-20 14:49:52 +0100 |
| commit | d1bffd00f326c4e43cbfaadc20a306d1a598ac9a (patch) | |
| tree | d8552acee8697e7ee425bf264d48926246e87165 | |
| parent | ec15e971cfc8a44b5279f5b8350663cb373b4c1c (diff) | |
| download | rabbitmq-server-git-d1bffd00f326c4e43cbfaadc20a306d1a598ac9a.tar.gz | |
Work in progress: Migration to vhost based message store
| -rw-r--r-- | src/rabbit_variable_queue.erl | 59 |
1 files changed, 51 insertions, 8 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 16e5c513a5..91ad41b8e0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -2675,16 +2675,19 @@ transform_store(Store, TransformFun) -> move_messages_to_vhost_store() -> Queues = list_persistent_queues(), - OldStore = run_old_persistent_store(), + % Maybe recover old store. + {RecoveryTerms, StartFunState} = start_recovery_terms(Queues), + OldStore = run_old_persistent_store(RecoveryTerms, StartFunState), + NewStoreSup = start_new_store_sup(), Migrations = spawn_for_each(fun(Queue) -> - migrate_queue(Queue, OldStore) + migrate_queue(Queue, OldStore, NewStoreSup) end, Queues), wait(Migrations), delete_old_store(OldStore). -migrate_queue(Queue, OldStore) -> - OldStoreClient = get_client(OldStore), - NewStoreClient = get_new_store_client(Queue), +migrate_queue(Queue, OldStore, NewStoreSup) -> + OldStoreClient = get_old_client(OldStore), + NewStoreClient = get_new_store_client(Queue, NewStoreSup), walk_queue_index( fun(MessageIdInStore) -> Msg = get_msg_from_store(OldStoreClient), @@ -2693,11 +2696,22 @@ migrate_queue(Queue, OldStore) -> Queue). -get_new_store_client(Queue) -> +get_new_store_client(Queue, NewStoreSup) -> Vhost = queue_vhost(Queue), - Store = run_persistent_store(Vhost), - get_client(Store). + get_new_client(NewStoreSup, Vhost). +get_new_client(NewStoreSup, VHost) -> + rabbit_msg_store_vhost_sup:client_init(NewStoreSup, + rabbit_guid:gen(), + fun(_,_) -> ok end, + fun() -> ok end, + VHost). + +get_old_client(OldStore) -> + rabbit_msg_store:client_init(OldStore, + rabbit_guid:gen(), + fun(_,_) -> ok end, + fun() -> ok end). list_persistent_queues() -> Node = node(), @@ -2710,3 +2724,32 @@ list_persistent_queues() -> mnesia:read(rabbit_queue, Name, read) =:= []])) end). +start_recovery_terms(Queues) -> + {AllTerms, StartFunState} = rabbit_queue_index:start(Queues), + Refs = [Ref || Terms <- AllTerms, + Terms /= non_clean_shutdown, + begin + Ref = proplists:get_value(persistent_ref, Terms), + Ref =/= undefined + end], + {Refs, StartFunState}. + +run_old_persistent_store(Refs, StartFunState) -> + OldStoreName = old_persistent_msg_store. + ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, + [OldStoreName, rabbit_mnesia:dir(), + Refs, StartFunState]), + OldStoreName. + +run_persistent_store(Vhost) -> + + + ?PERSISTENT_MSG_STORE. + +start_new_store_sup() -> + % Start persistent store sup without recovery. + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup, + [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), + undefined, {fun (ok) -> finished end, ok}]), + ?PERSISTENT_MSG_STORE. + |
