diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
1 files changed, 16 insertions, 10 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index dda7605969..3295da6b8d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -508,12 +508,16 @@ start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined -> ?MSG_STORE_RESTORE_BATCH_SIZE), in_batches(RestoreBatchSize, {rabbit_variable_queue, add_vhost_msg_store, []}, - VHosts), + VHosts, + "Recovering batch ~p of ~p vhosts ~n", + "Batch ~p of ~p vhsots recovered ~n"), ok. add_vhost_msg_store(VHost) -> + rabbit_log:info("Starting message store vor vhost ~p~n", [VHost]), rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), - rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost). + rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost), + rabbit_log:info("Message store is started vor vhost ~p~n", [VHost]). stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP), @@ -2761,7 +2765,9 @@ move_messages_to_vhost_store() -> ?QUEUE_MIGRATION_BATCH_SIZE), in_batches(MigrationBatchSize, {rabbit_variable_queue, migrate_queue, [OldStore, NewStoreSup]}, - QueuesWithTerms), + QueuesWithTerms, + "Migrating batch ~p of ~p queues ~n", + "Batch ~p of ~p queues migrated ~n"), log_upgrade("Message store migration finished"), delete_old_store(OldStore), @@ -2770,16 +2776,16 @@ move_messages_to_vhost_store() -> ok = rabbit_sup:stop_child(NewStoreSup), ok. -in_batches(Size, MFA, List) -> - in_batches(Size, 1, MFA, List). +in_batches(Size, MFA, List, MessageStart, MessageEnd) -> + in_batches(Size, 1, MFA, List, MessageStart, MessageEnd). -in_batches(_, _, _, []) -> ok; -in_batches(Size, BatchNum, MFA, List) -> +in_batches(_, _, _, [], _, _) -> ok; +in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) -> {Batch, Tail} = case Size > length(List) of true -> {List, []}; false -> lists:split(Size, List) end, - log_upgrade("Migrating batch ~p of ~p queues ~n", [BatchNum, Size]), + log_upgrade(MessageStart, [BatchNum, Size]), {M, F, A} = MFA, Keys = [ rpc:async_call(node(), M, F, [El | A]) || El <- Batch ], lists:foreach(fun(Key) -> @@ -2789,8 +2795,8 @@ in_batches(Size, BatchNum, MFA, List) -> end end, Keys), - log_upgrade("Batch ~p of ~p queues migrated ~n", [BatchNum, Size]), - in_batches(Size, BatchNum + 1, MFA, Tail). + log_upgrade(MessageEnd, [BatchNum, Size]), + in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd). migrate_queue({QueueName, RecoveryTerm}, OldStore, NewStoreSup) -> log_upgrade_verbose( |
