diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-11-28 16:27:45 +0000 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-11-28 16:27:45 +0000 |
| commit | 55451523b84168ceeb36398f7a8e6734b35e969a (patch) | |
| tree | b960c8e810a97ef4fa221984a92666e7ae1687ea /src | |
| parent | 03bdc78d54c0bf1c1c9b8bf7fa544ef511d08a17 (diff) | |
| download | rabbitmq-server-git-55451523b84168ceeb36398f7a8e6734b35e969a.tar.gz | |
Migrate queues in batches of 100 in parallel. Write recovery terms for migrated queues.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 84 |
2 files changed, 54 insertions, 36 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 07ae20f19e..4d2e865d3b 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -26,7 +26,7 @@ -export([scan_queue_segments/3]). %% Migrates from global to per-vhost message stores --export([move_to_per_vhost_stores/1]). +-export([move_to_per_vhost_stores/1, update_recovery_term/2]). -define(CLEAN_FILENAME, "clean.dot"). @@ -1419,3 +1419,7 @@ move_to_per_vhost_stores(#resource{} = QueueName) -> [QueueName]) end, ok. + +update_recovery_term(#resource{} = QueueName, Term) -> + Key = queue_name_to_dir_name(QueueName), + rabbit_recovery_terms:store(Key, Term).
\ No newline at end of file diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c711760417..bafbc1d1a9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -37,6 +37,8 @@ -export([stop_vhost_msg_store/1]). -include_lib("stdlib/include/qlc.hrl"). +-define(QUEUE_MIGRATION_BATCH_SIZE, 100). + %%---------------------------------------------------------------------------- %% Messages, and their position in the queue, can be in memory or on %% disk, or both. Persistent messages will have both message and @@ -351,7 +353,7 @@ %%---------------------------------------------------------------------------- -rabbit_upgrade({multiple_routing_keys, local, []}). --rabbit_upgrade({move_messages_to_vhost_store, queues, []}). +-rabbit_upgrade({move_messages_to_vhost_store, message_store, []}). -compile(export_all). @@ -2716,7 +2718,7 @@ move_messages_to_vhost_store() -> rabbit_log:info("Moving messages to per-vhost message store"), Queues = list_persistent_queues(), %% Move the queue index for each persistent queue to the new store - lists:map( + lists:foreach( fun(Queue) -> #amqqueue{name = QueueName} = Queue, rabbit_queue_index:move_to_per_vhost_stores(QueueName) @@ -2725,39 +2727,49 @@ move_messages_to_vhost_store() -> %% Legacy (global) msg_store may require recovery. %% This upgrade step should only be started %% if we are upgrading from a pre-3.7.0 version. - {RecoveryTerms, StartFunState} = start_recovery_terms(Queues), - OldStore = run_old_persistent_store(RecoveryTerms, StartFunState), + {QueuesWithTerms, RecoveryRefs, StartFunState} = start_recovery_terms(Queues), + + OldStore = run_old_persistent_store(RecoveryRefs, StartFunState), %% New store should not be recovered. NewStoreSup = start_new_store_sup(), + in_batches(?QUEUE_MIGRATION_BATCH_SIZE, + {rabbit_variable_queue, migrate_queue, [OldStore, NewStoreSup]}, + QueuesWithTerms), - % {ok, Gatherer} = gatherer:start_link(), - lists:map( - fun(Queue) -> - migrate_queue(Queue, OldStore, NewStoreSup), - #amqqueue{name = QueueName} = Queue, - rabbit_log:info("Queue migration finished ~p", [QueueName]) - % ok = gatherer:fork(Gatherer), - % ok = worker_pool:submit_async( - % fun () -> - % migrate_queue(Queue, OldStore, NewStoreSup), - % gatherer:finish(Gatherer) - % end) - end, - Queues), - % empty = gatherer:out(Gatherer), - % ok = gatherer:stop(Gatherer), - + rabbit_log:info("Message store migration finished"), delete_old_store(OldStore), ok = rabbit_queue_index:stop(), - ok = rabbit_sup:stop_child(NewStoreSup). + ok = rabbit_sup:stop_child(NewStoreSup), + ok. -migrate_queue(Queue, OldStore, NewStoreSup) -> - #amqqueue{name = QueueName} = Queue, +in_batches(Size, MFA, List) -> + in_batches(Size, 1, MFA, List). + +in_batches(_, _, _, []) -> ok; +in_batches(Size, BatchNum, MFA, List) -> + {Batch, Tail} = case Size > length(List) of + true -> {List, []}; + false -> lists:split(Size, List) + end, + rabbit_log:info("Migrating batch ~p of ~p queues ~n", [BatchNum, Size]), + {M, F, A} = MFA, + Keys = [ rpc:async_call(node(), M, F, [El | A]) || El <- Batch ], + lists:foreach(fun(Key) -> + case rpc:yield(Key) of + {badrpc, Err} -> throw(Err); + _ -> ok + end + end, + Keys), + rabbit_log:info("Batch ~p of ~p queues migrated ~n", [BatchNum, Size]), + in_batches(Size, BatchNum + 1, MFA, Tail). + +migrate_queue({QueueName, RecoveryTerm}, OldStore, NewStoreSup) -> rabbit_log:info("Migrating messages in queue ~s in vhost ~s to per-vhost message store~n", [QueueName#resource.name, QueueName#resource.virtual_host]), OldStoreClient = get_global_store_client(OldStore), - NewStoreClient = get_per_vhost_store_client(Queue, NewStoreSup), + NewStoreClient = get_per_vhost_store_client(QueueName, NewStoreSup), %% WARNING: During scan_queue_segments queue index state is being recovered %% and terminated. This can cause side effects! rabbit_queue_index:scan_queue_segments( @@ -2771,23 +2783,25 @@ migrate_queue(Queue, OldStore, NewStoreSup) -> OldC end, OldStoreClient, - QueueName). + QueueName), + rabbit_msg_store:client_terminate(OldStoreClient), + rabbit_msg_store:client_terminate(NewStoreClient), + NewClientRef = rabbit_msg_store:client_ref(NewStoreClient), + NewRecoveryTerm = lists:keyreplace(persistent_ref, 1, RecoveryTerm, + {persistent_ref, NewClientRef}), + rabbit_queue_index:update_recovery_term(QueueName, NewRecoveryTerm), + rabbit_log:info("Queue migration finished ~p", [QueueName]), + {QueueName, NewClientRef}. migrate_message(MsgId, OldC, NewC) -> case rabbit_msg_store:read(MsgId, OldC) of {{ok, Msg}, OldC1} -> - case rabbit_msg_store:contains(MsgId, NewC) of - false -> ok = rabbit_msg_store:write(MsgId, Msg, NewC); - true -> ok - end, - % TODO: maybe remove in batches? - ok = rabbit_msg_store:remove([MsgId], OldC1), + ok = rabbit_msg_store:write(MsgId, Msg, NewC), OldC1; _ -> OldC end. -get_per_vhost_store_client(#amqqueue{name = #resource{virtual_host = VHost}}, - NewStoreSup) -> +get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStoreSup) -> rabbit_msg_store_vhost_sup:client_init(NewStoreSup, rabbit_guid:gen(), fun(_,_) -> ok end, @@ -2820,7 +2834,7 @@ start_recovery_terms(Queues) -> Ref = proplists:get_value(persistent_ref, Terms), Ref =/= undefined end], - {Refs, StartFunState}. + {lists:zip(QueueNames, AllTerms), Refs, StartFunState}. run_old_persistent_store(Refs, StartFunState) -> OldStoreName = ?PERSISTENT_MSG_STORE, |
