summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2016-04-19 17:02:52 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2016-10-20 14:49:52 +0100
commitd1bffd00f326c4e43cbfaadc20a306d1a598ac9a (patch)
treed8552acee8697e7ee425bf264d48926246e87165
parentec15e971cfc8a44b5279f5b8350663cb373b4c1c (diff)
downloadrabbitmq-server-git-d1bffd00f326c4e43cbfaadc20a306d1a598ac9a.tar.gz
Work in progress: Migration to vhost based message store
-rw-r--r--src/rabbit_variable_queue.erl59
1 files changed, 51 insertions, 8 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 16e5c513a5..91ad41b8e0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -2675,16 +2675,19 @@ transform_store(Store, TransformFun) ->
move_messages_to_vhost_store() ->
Queues = list_persistent_queues(),
- OldStore = run_old_persistent_store(),
+ % Maybe recover old store.
+ {RecoveryTerms, StartFunState} = start_recovery_terms(Queues),
+ OldStore = run_old_persistent_store(RecoveryTerms, StartFunState),
+ NewStoreSup = start_new_store_sup(),
Migrations = spawn_for_each(fun(Queue) ->
- migrate_queue(Queue, OldStore)
+ migrate_queue(Queue, OldStore, NewStoreSup)
end, Queues),
wait(Migrations),
delete_old_store(OldStore).
-migrate_queue(Queue, OldStore) ->
- OldStoreClient = get_client(OldStore),
- NewStoreClient = get_new_store_client(Queue),
+migrate_queue(Queue, OldStore, NewStoreSup) ->
+ OldStoreClient = get_old_client(OldStore),
+ NewStoreClient = get_new_store_client(Queue, NewStoreSup),
walk_queue_index(
fun(MessageIdInStore) ->
Msg = get_msg_from_store(OldStoreClient),
@@ -2693,11 +2696,22 @@ migrate_queue(Queue, OldStore) ->
Queue).
-get_new_store_client(Queue) ->
+get_new_store_client(Queue, NewStoreSup) ->
Vhost = queue_vhost(Queue),
- Store = run_persistent_store(Vhost),
- get_client(Store).
+ get_new_client(NewStoreSup, Vhost).
+get_new_client(NewStoreSup, VHost) ->
+ rabbit_msg_store_vhost_sup:client_init(NewStoreSup,
+ rabbit_guid:gen(),
+ fun(_,_) -> ok end,
+ fun() -> ok end,
+ VHost).
+
+get_old_client(OldStore) ->
+ rabbit_msg_store:client_init(OldStore,
+ rabbit_guid:gen(),
+ fun(_,_) -> ok end,
+ fun() -> ok end).
list_persistent_queues() ->
Node = node(),
@@ -2710,3 +2724,32 @@ list_persistent_queues() ->
mnesia:read(rabbit_queue, Name, read) =:= []]))
end).
+start_recovery_terms(Queues) ->
+ {AllTerms, StartFunState} = rabbit_queue_index:start(Queues),
+ Refs = [Ref || Terms <- AllTerms,
+ Terms /= non_clean_shutdown,
+ begin
+ Ref = proplists:get_value(persistent_ref, Terms),
+ Ref =/= undefined
+ end],
+ {Refs, StartFunState}.
+
+run_old_persistent_store(Refs, StartFunState) ->
+ OldStoreName = old_persistent_msg_store.
+ ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store,
+ [OldStoreName, rabbit_mnesia:dir(),
+ Refs, StartFunState]),
+ OldStoreName.
+
+run_persistent_store(Vhost) ->
+
+
+ ?PERSISTENT_MSG_STORE.
+
+start_new_store_sup() ->
+ % Start persistent store sup without recovery.
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
+ [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
+ undefined, {fun (ok) -> finished end, ok}]),
+ ?PERSISTENT_MSG_STORE.
+