diff options
| -rw-r--r-- | src/rabbit.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 161 |
3 files changed, 99 insertions, 83 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index f10eb463ab..a7bebfc127 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -146,6 +146,14 @@ {requires, core_initialized}, {enables, routing_ready}]}). +-rabbit_boot_step({upgrade_queues, + [{description, "codec correctness check"}, + {mfa, {rabbit_upgrade, + maybe_upgrade_queues, + []}}, + {requires, [core_initialized]}, + {enables, recovery}]}). + -rabbit_boot_step({recovery, [{description, "exchange, queue and binding recovery"}, {mfa, {rabbit, recover, []}}, diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index f88b7cc73f..f5437e6b81 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -17,6 +17,7 @@ -module(rabbit_upgrade). -export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0, + maybe_upgrade_queues/0, nodes_running/1, secondary_upgrade/1]). -include("rabbit.hrl"). @@ -252,6 +253,18 @@ maybe_upgrade_local() -> %% ------------------------------------------------------------------- +maybe_upgrade_queues() -> + case rabbit_version:upgrades_required(queues) of + {error, version_not_available} -> version_not_available; + {error, starting_from_scratch} -> starting_from_scratch; + {error, _} = Err -> throw(Err); + {ok, []} -> ok; + {ok, Upgrades} -> apply_upgrades(queues, Upgrades, + fun() -> ok end) + end. + +%% ------------------------------------------------------------------- + apply_upgrades(Scope, Upgrades, Fun) -> ok = rabbit_file:lock_file(lock_filename()), info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8460e7ffa9..1adc038a80 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -350,7 +350,8 @@ %%---------------------------------------------------------------------------- -rabbit_upgrade({multiple_routing_keys, local, []}). -% -rabbit_upgrade({move_messages_to_vhost_store, local, []}). requires mnesia, requires rabbit_sup, requires worker_pool, requires fhc +-rabbit_upgrade({move_messages_to_vhost_store, queues, []}). + -compile(export_all). -type seq_id() :: non_neg_integer(). @@ -517,7 +518,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, MsgOnDiskFun, AsyncCallback, VHost); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined, + msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined, AsyncCallback, VHost)); %% We can be recovering a transient queue if it crashed @@ -538,7 +539,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, false -> {undefined, fun(_MsgId) -> false end} end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, - undefined, AsyncCallback, + undefined, AsyncCallback, VHost), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( @@ -1219,8 +1220,8 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP), rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun, - fun () -> - Callback(?MODULE, CloseFDsFun) + fun () -> + Callback(?MODULE, CloseFDsFun) end, VHost). @@ -2683,84 +2684,83 @@ transform_store(Store, TransformFun) -> rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). move_messages_to_vhost_store() -> - Queues = rabbit_variable_queue:list_persistent_queues(), - % Maybe recover old store. - {RecoveryTerms, StartFunState} = rabbit_variable_queue:start_recovery_terms(Queues), - OldStore = rabbit_variable_queue:run_old_persistent_store(RecoveryTerms, StartFunState), - NewStoreSup = rabbit_variable_queue:start_new_store_sup(), - lists:map(fun(Queue) -> - migrate_queue(Queue, OldStore, NewStoreSup) - end, Queues), - % Migrations = spawn_for_each(fun(Queue) -> - % migrate_queue(Queue, OldStore, NewStoreSup) - % end, Queues), - % wait(Migrations), - delete_old_store(OldStore). +rabbit_log:error("MIGRATING!!"), + Queues = list_persistent_queues(), + %% Old msg_store may require recovery. + %% This upgrade step should only be started + %% if we are upgrading from version with old store. + {RecoveryTerms, StartFunState} = start_recovery_terms(Queues), + OldStore = run_old_persistent_store(RecoveryTerms, StartFunState), + %% New store should not be recovered. + NewStoreSup = start_new_store_sup(), + lists:map(fun(Queue) -> + migrate_queue(Queue, OldStore, NewStoreSup) + end, + Queues), + + {ok, Gatherer} = gatherer:start_link(), + lists:map( + fun(Queue) -> + ok = gatherer:fork(Gatherer), + ok = worker_pool:submit_async( + fun () -> + migrate_queue(Queue, OldStore, NewStoreSup), + gatherer:finish(Gatherer) + end) + end, + Queues), + empty = gatherer:out(Gatherer), + ok = gatherer:stop(Gatherer), + + delete_old_store(OldStore), + + ok = rabbit_queue_index:stop(), + ok = rabbit_sup:stop_child(NewStoreSup). migrate_queue(Queue, OldStore, NewStoreSup) -> OldStoreClient = get_old_client(OldStore), NewStoreClient = get_new_store_client(Queue, NewStoreSup), - walk_queue_index( - fun(MessageIdInStore, OldC) -> - case rabbit_msg_store:read(MessageIdInStore, OldStoreClient) of - {{ok, Msg}, OldC1} -> - ok = rabbit_msg_store:write(MessageIdInStore, Msg, NewStoreClient), - OldC1; - _ -> OldC - end - end, - OldStoreClient, - Queue). - -walk_queue_index(Fun, Client, #amqqueue{name = QueueName}) -> - % WARNING: State is being recovered and terminated. This can cause side effects! + #amqqueue{name = QueueName} = Queue, + %% WARNING: During scan_queue_segments queue index state is being recovered + %% and terminated. This can cause side effects! rabbit_queue_index:scan_queue_segments( - fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, _IsAcked, ClientState) + %% We migrate only persistent messages, which is stored in msg_store + %% and is not acked yet + fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, false, OldC) when is_binary(MsgId) -> - Fun(MsgId, ClientState); - (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, ClientState) -> - ClientState - end, Client, QueueName). - -spawn_for_each(Fun, List) -> - Ref = erlang:make_ref(), - Self = self(), - Processes = lists:map( - fun(El) -> - spawn_link( - fun() -> - Fun(El), - Self ! {ok, self(), Ref} - end) + migrate_message(MsgId, OldC, NewStoreClient); + (_SeqId, _MsgId, _MsgProps, + _IsPersistent, _IsDelivered, _IsAcked, OldC) -> + OldC end, - List), - {Ref, Processes}. - -wait({Ref, Processes}) -> - lists:foreach( - fun(Proc) -> - receive {ok, Proc, Ref} -> ok - end - end, - Processes). - -get_new_store_client(Queue, NewStoreSup) -> - Vhost = queue_vhost(Queue), - get_new_client(NewStoreSup, Vhost). - -queue_vhost(#amqqueue{name = #resource{virtual_host = VHost}}) -> VHost. + OldStoreClient, + QueueName). + +migrate_message(MsgId, OldC, NewC) -> + case rabbit_msg_store:read(MsgId, OldC) of + {{ok, Msg}, OldC1} -> + case rabbit_msg_store:contains(MsgId, NewC) of + false -> ok = rabbit_msg_store:write(MsgId, Msg, NewC); + true -> ok + end, + % TODO: maybe remove in batches? + ok = rabbit_msg_store:remove([MsgId], OldC1), + OldC1; + _ -> OldC + end; -get_new_client(NewStoreSup, VHost) -> - rabbit_msg_store_vhost_sup:client_init(NewStoreSup, +get_new_store_client(#amqqueue{name = #resource{virtual_host = VHost}}, + NewStoreSup) -> + rabbit_msg_store_vhost_sup:client_init(NewStoreSup, rabbit_guid:gen(), - fun(_,_) -> ok end, - fun() -> ok end, + fun(_,_) -> ok end, + fun() -> ok end, VHost). get_old_client(OldStore) -> rabbit_msg_store:client_init(OldStore, rabbit_guid:gen(), - fun(_,_) -> ok end, + fun(_,_) -> ok end, fun() -> ok end). list_persistent_queues() -> @@ -2785,7 +2785,7 @@ start_recovery_terms(Queues) -> end], {Refs, StartFunState}. -run_old_persistent_store(Refs, StartFunState) -> +run_old_persistent_store(Refs, StartFunState) -> OldStoreName = ?PERSISTENT_MSG_STORE, ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, [OldStoreName, rabbit_mnesia:dir(), @@ -2794,22 +2794,17 @@ run_old_persistent_store(Refs, StartFunState) -> start_new_store_sup() -> % Start persistent store sup without recovery. - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup, + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, + rabbit_msg_store_vhost_sup, [?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(), undefined, {fun (ok) -> finished end, ok}]), ?PERSISTENT_MSG_STORE_SUP. delete_old_store(OldStore) -> - gen_server:stop(OldStore), - rabbit_file:recursive_delete([filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]). - - - -setup() -> - application:load(rabbit), - mnesia:start(), - rabbit_sup:start_link(), - rabbit:start_fhc(), - rabbit_sup:start_restartable_child(rabbit_guid), - rabbit_sup:start_supervisor_child(worker_pool_sup). + ok = rabbit_sup:stop_child(OldStore), + rabbit_file:recursive_delete( + [filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]), + %% Delete old transient store as well + rabbit_file:recursive_delete( + [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]). |
