diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 39 |
1 files changed, 39 insertions, 0 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a4dc6e4c0d..16e5c513a5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -344,6 +344,7 @@ %%---------------------------------------------------------------------------- -rabbit_upgrade({multiple_routing_keys, local, []}). +-rabbit_upgrade({move_messages_to_vhost_store, local, []}). -type seq_id() :: non_neg_integer(). @@ -2671,3 +2672,41 @@ transform_storage(TransformFun) -> transform_store(Store, TransformFun) -> rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). + +move_messages_to_vhost_store() -> + Queues = list_persistent_queues(), + OldStore = run_old_persistent_store(), + Migrations = spawn_for_each(fun(Queue) -> + migrate_queue(Queue, OldStore) + end, Queues), + wait(Migrations), + delete_old_store(OldStore). + +migrate_queue(Queue, OldStore) -> + OldStoreClient = get_client(OldStore), + NewStoreClient = get_new_store_client(Queue), + walk_queue_index( + fun(MessageIdInStore) -> + Msg = get_msg_from_store(OldStoreClient), + put_message_to_store(Msg, NewStoreClient) + end, + Queue). + + +get_new_store_client(Queue) -> + Vhost = queue_vhost(Queue), + Store = run_persistent_store(Vhost), + get_client(Store). + + +list_persistent_queues() -> + Node = node(), + mnesia:async_dirty( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{name = Name, + pid = Pid} + <- mnesia:table(rabbit_durable_queue), + node(Pid) == Node, + mnesia:read(rabbit_queue, Name, read) =:= []])) + end). + |
