diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 44 |
1 files changed, 40 insertions, 4 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 91ad41b8e0..01c412fd60 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -2689,17 +2689,44 @@ migrate_queue(Queue, OldStore, NewStoreSup) -> OldStoreClient = get_old_client(OldStore), NewStoreClient = get_new_store_client(Queue, NewStoreSup), walk_queue_index( - fun(MessageIdInStore) -> - Msg = get_msg_from_store(OldStoreClient), - put_message_to_store(Msg, NewStoreClient) + fun(MessageIdInStore, OldC) -> + case rabbit_msg_store:read(MessageIdInStore, OldStoreClient) of + {{ok, Msg}, OldC1} -> + ok = rabbit_msg_store:write(MessageIdInStore, Msg, NewStoreClient), + OldC1; + _ -> OldC + end end, Queue). +spawn_for_each(Fun, List) -> + Ref = erlang:make_ref(), + Self = self(), + Processes = lists:map( + fun(El) -> + spawn_link( + fun() -> + Fun(El), + Self ! {ok, self(), Ref} + end) + end, + List), + {Ref, Processes}. + +wait({Ref, Processes}) -> + lists:foreach( + fun(Proc) -> + receive {ok, Proc, Ref} -> ok + end + end, + Processes). get_new_store_client(Queue, NewStoreSup) -> Vhost = queue_vhost(Queue), get_new_client(NewStoreSup, Vhost). +queue_vhost(#amqqueue{name = #resource{virtual_host = VHost}}) -> VHost. + get_new_client(NewStoreSup, VHost) -> rabbit_msg_store_vhost_sup:client_init(NewStoreSup, rabbit_guid:gen(), @@ -2735,7 +2762,7 @@ start_recovery_terms(Queues) -> {Refs, StartFunState}. run_old_persistent_store(Refs, StartFunState) -> - OldStoreName = old_persistent_msg_store. + OldStoreName = old_persistent_msg_store, ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, [OldStoreName, rabbit_mnesia:dir(), Refs, StartFunState]), @@ -2753,3 +2780,12 @@ start_new_store_sup() -> undefined, {fun (ok) -> finished end, ok}]), ?PERSISTENT_MSG_STORE. +delete_old_store(OldStore) -> + gen_server:stop(OldStore), + rabbit_file:recursive_delete( + filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])). + + + + + |
