diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 29 |
1 files changed, 19 insertions, 10 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7f57c4320c..c711760417 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -2715,6 +2715,13 @@ transform_store(Store, TransformFun) -> move_messages_to_vhost_store() -> rabbit_log:info("Moving messages to per-vhost message store"), Queues = list_persistent_queues(), + %% Move the queue index for each persistent queue to the new store + lists:map( + fun(Queue) -> + #amqqueue{name = QueueName} = Queue, + rabbit_queue_index:move_to_per_vhost_stores(QueueName) + end, + Queues), %% Legacy (global) msg_store may require recovery. %% This upgrade step should only be started %% if we are upgrading from a pre-3.7.0 version. @@ -2723,19 +2730,22 @@ move_messages_to_vhost_store() -> %% New store should not be recovered. NewStoreSup = start_new_store_sup(), - {ok, Gatherer} = gatherer:start_link(), + % {ok, Gatherer} = gatherer:start_link(), lists:map( fun(Queue) -> - ok = gatherer:fork(Gatherer), - ok = worker_pool:submit_async( - fun () -> - migrate_queue(Queue, OldStore, NewStoreSup), - gatherer:finish(Gatherer) - end) + migrate_queue(Queue, OldStore, NewStoreSup), + #amqqueue{name = QueueName} = Queue, + rabbit_log:info("Queue migration finished ~p", [QueueName]) + % ok = gatherer:fork(Gatherer), + % ok = worker_pool:submit_async( + % fun () -> + % migrate_queue(Queue, OldStore, NewStoreSup), + % gatherer:finish(Gatherer) + % end) end, Queues), - empty = gatherer:out(Gatherer), - ok = gatherer:stop(Gatherer), + % empty = gatherer:out(Gatherer), + % ok = gatherer:stop(Gatherer), delete_old_store(OldStore), @@ -2748,7 +2758,6 @@ migrate_queue(Queue, OldStore, NewStoreSup) -> [QueueName#resource.name, QueueName#resource.virtual_host]), OldStoreClient = get_global_store_client(OldStore), NewStoreClient = get_per_vhost_store_client(Queue, NewStoreSup), - rabbit_queue_index:move_to_per_vhost_stores(QueueName), %% WARNING: During scan_queue_segments queue index state is being recovered %% and terminated. This can cause side effects! rabbit_queue_index:scan_queue_segments( |
