summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl29
1 files changed, 19 insertions, 10 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7f57c4320c..c711760417 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -2715,6 +2715,13 @@ transform_store(Store, TransformFun) ->
move_messages_to_vhost_store() ->
rabbit_log:info("Moving messages to per-vhost message store"),
Queues = list_persistent_queues(),
+ %% Move the queue index for each persistent queue to the new store
+ lists:map(
+ fun(Queue) ->
+ #amqqueue{name = QueueName} = Queue,
+ rabbit_queue_index:move_to_per_vhost_stores(QueueName)
+ end,
+ Queues),
%% Legacy (global) msg_store may require recovery.
%% This upgrade step should only be started
%% if we are upgrading from a pre-3.7.0 version.
@@ -2723,19 +2730,22 @@ move_messages_to_vhost_store() ->
%% New store should not be recovered.
NewStoreSup = start_new_store_sup(),
- {ok, Gatherer} = gatherer:start_link(),
+ % {ok, Gatherer} = gatherer:start_link(),
lists:map(
fun(Queue) ->
- ok = gatherer:fork(Gatherer),
- ok = worker_pool:submit_async(
- fun () ->
- migrate_queue(Queue, OldStore, NewStoreSup),
- gatherer:finish(Gatherer)
- end)
+ migrate_queue(Queue, OldStore, NewStoreSup),
+ #amqqueue{name = QueueName} = Queue,
+ rabbit_log:info("Queue migration finished ~p", [QueueName])
+ % ok = gatherer:fork(Gatherer),
+ % ok = worker_pool:submit_async(
+ % fun () ->
+ % migrate_queue(Queue, OldStore, NewStoreSup),
+ % gatherer:finish(Gatherer)
+ % end)
end,
Queues),
- empty = gatherer:out(Gatherer),
- ok = gatherer:stop(Gatherer),
+ % empty = gatherer:out(Gatherer),
+ % ok = gatherer:stop(Gatherer),
delete_old_store(OldStore),
@@ -2748,7 +2758,6 @@ migrate_queue(Queue, OldStore, NewStoreSup) ->
[QueueName#resource.name, QueueName#resource.virtual_host]),
OldStoreClient = get_global_store_client(OldStore),
NewStoreClient = get_per_vhost_store_client(Queue, NewStoreSup),
- rabbit_queue_index:move_to_per_vhost_stores(QueueName),
%% WARNING: During scan_queue_segments queue index state is being recovered
%% and terminated. This can cause side effects!
rabbit_queue_index:scan_queue_segments(