summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl39
1 files changed, 39 insertions, 0 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a4dc6e4c0d..16e5c513a5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -344,6 +344,7 @@
%%----------------------------------------------------------------------------
-rabbit_upgrade({multiple_routing_keys, local, []}).
+-rabbit_upgrade({move_messages_to_vhost_store, local, []}).
-type seq_id() :: non_neg_integer().
@@ -2671,3 +2672,41 @@ transform_storage(TransformFun) ->
transform_store(Store, TransformFun) ->
rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
+
+move_messages_to_vhost_store() ->
+ Queues = list_persistent_queues(),
+ OldStore = run_old_persistent_store(),
+ Migrations = spawn_for_each(fun(Queue) ->
+ migrate_queue(Queue, OldStore)
+ end, Queues),
+ wait(Migrations),
+ delete_old_store(OldStore).
+
+migrate_queue(Queue, OldStore) ->
+ OldStoreClient = get_client(OldStore),
+ NewStoreClient = get_new_store_client(Queue),
+ walk_queue_index(
+ fun(MessageIdInStore) ->
+ Msg = get_msg_from_store(OldStoreClient),
+ put_message_to_store(Msg, NewStoreClient)
+ end,
+ Queue).
+
+
+get_new_store_client(Queue) ->
+ Vhost = queue_vhost(Queue),
+ Store = run_persistent_store(Vhost),
+ get_client(Store).
+
+
+list_persistent_queues() ->
+ Node = node(),
+ mnesia:async_dirty(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
+ pid = Pid}
+ <- mnesia:table(rabbit_durable_queue),
+ node(Pid) == Node,
+ mnesia:read(rabbit_queue, Name, read) =:= []]))
+ end).
+