summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2016-11-28 16:27:45 +0000
committerDaniil Fedotov <dfedotov@pivotal.io>2016-11-28 16:27:45 +0000
commit55451523b84168ceeb36398f7a8e6734b35e969a (patch)
treeb960c8e810a97ef4fa221984a92666e7ae1687ea /src
parent03bdc78d54c0bf1c1c9b8bf7fa544ef511d08a17 (diff)
downloadrabbitmq-server-git-55451523b84168ceeb36398f7a8e6734b35e969a.tar.gz
Migrate queues in batches of 100 in parallel. Write recovery terms for migrated queues.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl6
-rw-r--r--src/rabbit_variable_queue.erl84
2 files changed, 54 insertions, 36 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 07ae20f19e..4d2e865d3b 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -26,7 +26,7 @@
-export([scan_queue_segments/3]).
%% Migrates from global to per-vhost message stores
--export([move_to_per_vhost_stores/1]).
+-export([move_to_per_vhost_stores/1, update_recovery_term/2]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -1419,3 +1419,7 @@ move_to_per_vhost_stores(#resource{} = QueueName) ->
[QueueName])
end,
ok.
+
+update_recovery_term(#resource{} = QueueName, Term) ->
+ Key = queue_name_to_dir_name(QueueName),
+ rabbit_recovery_terms:store(Key, Term). \ No newline at end of file
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c711760417..bafbc1d1a9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -37,6 +37,8 @@
-export([stop_vhost_msg_store/1]).
-include_lib("stdlib/include/qlc.hrl").
+-define(QUEUE_MIGRATION_BATCH_SIZE, 100).
+
%%----------------------------------------------------------------------------
%% Messages, and their position in the queue, can be in memory or on
%% disk, or both. Persistent messages will have both message and
@@ -351,7 +353,7 @@
%%----------------------------------------------------------------------------
-rabbit_upgrade({multiple_routing_keys, local, []}).
--rabbit_upgrade({move_messages_to_vhost_store, queues, []}).
+-rabbit_upgrade({move_messages_to_vhost_store, message_store, []}).
-compile(export_all).
@@ -2716,7 +2718,7 @@ move_messages_to_vhost_store() ->
rabbit_log:info("Moving messages to per-vhost message store"),
Queues = list_persistent_queues(),
%% Move the queue index for each persistent queue to the new store
- lists:map(
+ lists:foreach(
fun(Queue) ->
#amqqueue{name = QueueName} = Queue,
rabbit_queue_index:move_to_per_vhost_stores(QueueName)
@@ -2725,39 +2727,49 @@ move_messages_to_vhost_store() ->
%% Legacy (global) msg_store may require recovery.
%% This upgrade step should only be started
%% if we are upgrading from a pre-3.7.0 version.
- {RecoveryTerms, StartFunState} = start_recovery_terms(Queues),
- OldStore = run_old_persistent_store(RecoveryTerms, StartFunState),
+ {QueuesWithTerms, RecoveryRefs, StartFunState} = start_recovery_terms(Queues),
+
+ OldStore = run_old_persistent_store(RecoveryRefs, StartFunState),
%% New store should not be recovered.
NewStoreSup = start_new_store_sup(),
+ in_batches(?QUEUE_MIGRATION_BATCH_SIZE,
+ {rabbit_variable_queue, migrate_queue, [OldStore, NewStoreSup]},
+ QueuesWithTerms),
- % {ok, Gatherer} = gatherer:start_link(),
- lists:map(
- fun(Queue) ->
- migrate_queue(Queue, OldStore, NewStoreSup),
- #amqqueue{name = QueueName} = Queue,
- rabbit_log:info("Queue migration finished ~p", [QueueName])
- % ok = gatherer:fork(Gatherer),
- % ok = worker_pool:submit_async(
- % fun () ->
- % migrate_queue(Queue, OldStore, NewStoreSup),
- % gatherer:finish(Gatherer)
- % end)
- end,
- Queues),
- % empty = gatherer:out(Gatherer),
- % ok = gatherer:stop(Gatherer),
-
+ rabbit_log:info("Message store migration finished"),
delete_old_store(OldStore),
ok = rabbit_queue_index:stop(),
- ok = rabbit_sup:stop_child(NewStoreSup).
+ ok = rabbit_sup:stop_child(NewStoreSup),
+ ok.
-migrate_queue(Queue, OldStore, NewStoreSup) ->
- #amqqueue{name = QueueName} = Queue,
+in_batches(Size, MFA, List) ->
+ in_batches(Size, 1, MFA, List).
+
+in_batches(_, _, _, []) -> ok;
+in_batches(Size, BatchNum, MFA, List) ->
+ {Batch, Tail} = case Size > length(List) of
+ true -> {List, []};
+ false -> lists:split(Size, List)
+ end,
+ rabbit_log:info("Migrating batch ~p of ~p queues ~n", [BatchNum, Size]),
+ {M, F, A} = MFA,
+ Keys = [ rpc:async_call(node(), M, F, [El | A]) || El <- Batch ],
+ lists:foreach(fun(Key) ->
+ case rpc:yield(Key) of
+ {badrpc, Err} -> throw(Err);
+ _ -> ok
+ end
+ end,
+ Keys),
+ rabbit_log:info("Batch ~p of ~p queues migrated ~n", [BatchNum, Size]),
+ in_batches(Size, BatchNum + 1, MFA, Tail).
+
+migrate_queue({QueueName, RecoveryTerm}, OldStore, NewStoreSup) ->
rabbit_log:info("Migrating messages in queue ~s in vhost ~s to per-vhost message store~n",
[QueueName#resource.name, QueueName#resource.virtual_host]),
OldStoreClient = get_global_store_client(OldStore),
- NewStoreClient = get_per_vhost_store_client(Queue, NewStoreSup),
+ NewStoreClient = get_per_vhost_store_client(QueueName, NewStoreSup),
%% WARNING: During scan_queue_segments queue index state is being recovered
%% and terminated. This can cause side effects!
rabbit_queue_index:scan_queue_segments(
@@ -2771,23 +2783,25 @@ migrate_queue(Queue, OldStore, NewStoreSup) ->
OldC
end,
OldStoreClient,
- QueueName).
+ QueueName),
+ rabbit_msg_store:client_terminate(OldStoreClient),
+ rabbit_msg_store:client_terminate(NewStoreClient),
+ NewClientRef = rabbit_msg_store:client_ref(NewStoreClient),
+ NewRecoveryTerm = lists:keyreplace(persistent_ref, 1, RecoveryTerm,
+ {persistent_ref, NewClientRef}),
+ rabbit_queue_index:update_recovery_term(QueueName, NewRecoveryTerm),
+ rabbit_log:info("Queue migration finished ~p", [QueueName]),
+ {QueueName, NewClientRef}.
migrate_message(MsgId, OldC, NewC) ->
case rabbit_msg_store:read(MsgId, OldC) of
{{ok, Msg}, OldC1} ->
- case rabbit_msg_store:contains(MsgId, NewC) of
- false -> ok = rabbit_msg_store:write(MsgId, Msg, NewC);
- true -> ok
- end,
- % TODO: maybe remove in batches?
- ok = rabbit_msg_store:remove([MsgId], OldC1),
+ ok = rabbit_msg_store:write(MsgId, Msg, NewC),
OldC1;
_ -> OldC
end.
-get_per_vhost_store_client(#amqqueue{name = #resource{virtual_host = VHost}},
- NewStoreSup) ->
+get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStoreSup) ->
rabbit_msg_store_vhost_sup:client_init(NewStoreSup,
rabbit_guid:gen(),
fun(_,_) -> ok end,
@@ -2820,7 +2834,7 @@ start_recovery_terms(Queues) ->
Ref = proplists:get_value(persistent_ref, Terms),
Ref =/= undefined
end],
- {Refs, StartFunState}.
+ {lists:zip(QueueNames, AllTerms), Refs, StartFunState}.
run_old_persistent_store(Refs, StartFunState) ->
OldStoreName = ?PERSISTENT_MSG_STORE,