summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl44
1 files changed, 40 insertions, 4 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 91ad41b8e0..01c412fd60 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -2689,17 +2689,44 @@ migrate_queue(Queue, OldStore, NewStoreSup) ->
OldStoreClient = get_old_client(OldStore),
NewStoreClient = get_new_store_client(Queue, NewStoreSup),
walk_queue_index(
- fun(MessageIdInStore) ->
- Msg = get_msg_from_store(OldStoreClient),
- put_message_to_store(Msg, NewStoreClient)
+ fun(MessageIdInStore, OldC) ->
+ case rabbit_msg_store:read(MessageIdInStore, OldStoreClient) of
+ {{ok, Msg}, OldC1} ->
+ ok = rabbit_msg_store:write(MessageIdInStore, Msg, NewStoreClient),
+ OldC1;
+ _ -> OldC
+ end
end,
Queue).
+spawn_for_each(Fun, List) ->
+ Ref = erlang:make_ref(),
+ Self = self(),
+ Processes = lists:map(
+ fun(El) ->
+ spawn_link(
+ fun() ->
+ Fun(El),
+ Self ! {ok, self(), Ref}
+ end)
+ end,
+ List),
+ {Ref, Processes}.
+
+wait({Ref, Processes}) ->
+ lists:foreach(
+ fun(Proc) ->
+ receive {ok, Proc, Ref} -> ok
+ end
+ end,
+ Processes).
get_new_store_client(Queue, NewStoreSup) ->
Vhost = queue_vhost(Queue),
get_new_client(NewStoreSup, Vhost).
+queue_vhost(#amqqueue{name = #resource{virtual_host = VHost}}) -> VHost.
+
get_new_client(NewStoreSup, VHost) ->
rabbit_msg_store_vhost_sup:client_init(NewStoreSup,
rabbit_guid:gen(),
@@ -2735,7 +2762,7 @@ start_recovery_terms(Queues) ->
{Refs, StartFunState}.
run_old_persistent_store(Refs, StartFunState) ->
- OldStoreName = old_persistent_msg_store.
+ OldStoreName = old_persistent_msg_store,
ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store,
[OldStoreName, rabbit_mnesia:dir(),
Refs, StartFunState]),
@@ -2753,3 +2780,12 @@ start_new_store_sup() ->
undefined, {fun (ok) -> finished end, ok}]),
?PERSISTENT_MSG_STORE.
+delete_old_store(OldStore) ->
+ gen_server:stop(OldStore),
+ rabbit_file:recursive_delete(
+ filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])).
+
+
+
+
+