summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2016-04-21 13:19:18 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2016-10-20 14:49:52 +0100
commit325e2fcd8a66a2db702e819cc8da1bafde4ac8ed (patch)
tree605567889215df5d39a852d926a89cd1198ae3f2
parent492e23bf188820756e07076c4539692376cfcbc6 (diff)
downloadrabbitmq-server-git-325e2fcd8a66a2db702e819cc8da1bafde4ac8ed.tar.gz
New upgrade scope `queues` started by boot step before queue recovery
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_upgrade.erl13
-rw-r--r--src/rabbit_variable_queue.erl161
3 files changed, 99 insertions, 83 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index f10eb463ab..a7bebfc127 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -146,6 +146,14 @@
{requires, core_initialized},
{enables, routing_ready}]}).
+-rabbit_boot_step({upgrade_queues,
+ [{description, "codec correctness check"},
+ {mfa, {rabbit_upgrade,
+ maybe_upgrade_queues,
+ []}},
+ {requires, [core_initialized]},
+ {enables, recovery}]}).
+
-rabbit_boot_step({recovery,
[{description, "exchange, queue and binding recovery"},
{mfa, {rabbit, recover, []}},
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index f88b7cc73f..f5437e6b81 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -17,6 +17,7 @@
-module(rabbit_upgrade).
-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0,
+ maybe_upgrade_queues/0,
nodes_running/1, secondary_upgrade/1]).
-include("rabbit.hrl").
@@ -252,6 +253,18 @@ maybe_upgrade_local() ->
%% -------------------------------------------------------------------
+maybe_upgrade_queues() ->
+ case rabbit_version:upgrades_required(queues) of
+ {error, version_not_available} -> version_not_available;
+ {error, starting_from_scratch} -> starting_from_scratch;
+ {error, _} = Err -> throw(Err);
+ {ok, []} -> ok;
+ {ok, Upgrades} -> apply_upgrades(queues, Upgrades,
+ fun() -> ok end)
+ end.
+
+%% -------------------------------------------------------------------
+
apply_upgrades(Scope, Upgrades, Fun) ->
ok = rabbit_file:lock_file(lock_filename()),
info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8460e7ffa9..1adc038a80 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -350,7 +350,8 @@
%%----------------------------------------------------------------------------
-rabbit_upgrade({multiple_routing_keys, local, []}).
-% -rabbit_upgrade({move_messages_to_vhost_store, local, []}). requires mnesia, requires rabbit_sup, requires worker_pool, requires fhc
+-rabbit_upgrade({move_messages_to_vhost_store, queues, []}).
+
-compile(export_all).
-type seq_id() :: non_neg_integer().
@@ -517,7 +518,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
MsgOnDiskFun, AsyncCallback, VHost);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined,
+ msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined,
AsyncCallback, VHost));
%% We can be recovering a transient queue if it crashed
@@ -538,7 +539,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
false -> {undefined, fun(_MsgId) -> false end}
end,
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP,
- undefined, AsyncCallback,
+ undefined, AsyncCallback,
VHost),
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
@@ -1219,8 +1220,8 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP),
rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun,
- fun () ->
- Callback(?MODULE, CloseFDsFun)
+ fun () ->
+ Callback(?MODULE, CloseFDsFun)
end,
VHost).
@@ -2683,84 +2684,83 @@ transform_store(Store, TransformFun) ->
rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
move_messages_to_vhost_store() ->
- Queues = rabbit_variable_queue:list_persistent_queues(),
- % Maybe recover old store.
- {RecoveryTerms, StartFunState} = rabbit_variable_queue:start_recovery_terms(Queues),
- OldStore = rabbit_variable_queue:run_old_persistent_store(RecoveryTerms, StartFunState),
- NewStoreSup = rabbit_variable_queue:start_new_store_sup(),
- lists:map(fun(Queue) ->
- migrate_queue(Queue, OldStore, NewStoreSup)
- end, Queues),
- % Migrations = spawn_for_each(fun(Queue) ->
- % migrate_queue(Queue, OldStore, NewStoreSup)
- % end, Queues),
- % wait(Migrations),
- delete_old_store(OldStore).
+rabbit_log:error("MIGRATING!!"),
+ Queues = list_persistent_queues(),
+ %% Old msg_store may require recovery.
+ %% This upgrade step should only be started
+ %% if we are upgrading from version with old store.
+ {RecoveryTerms, StartFunState} = start_recovery_terms(Queues),
+ OldStore = run_old_persistent_store(RecoveryTerms, StartFunState),
+ %% New store should not be recovered.
+ NewStoreSup = start_new_store_sup(),
+ lists:map(fun(Queue) ->
+ migrate_queue(Queue, OldStore, NewStoreSup)
+ end,
+ Queues),
+
+ {ok, Gatherer} = gatherer:start_link(),
+ lists:map(
+ fun(Queue) ->
+ ok = gatherer:fork(Gatherer),
+ ok = worker_pool:submit_async(
+ fun () ->
+ migrate_queue(Queue, OldStore, NewStoreSup),
+ gatherer:finish(Gatherer)
+ end)
+ end,
+ Queues),
+ empty = gatherer:out(Gatherer),
+ ok = gatherer:stop(Gatherer),
+
+ delete_old_store(OldStore),
+
+ ok = rabbit_queue_index:stop(),
+ ok = rabbit_sup:stop_child(NewStoreSup).
migrate_queue(Queue, OldStore, NewStoreSup) ->
OldStoreClient = get_old_client(OldStore),
NewStoreClient = get_new_store_client(Queue, NewStoreSup),
- walk_queue_index(
- fun(MessageIdInStore, OldC) ->
- case rabbit_msg_store:read(MessageIdInStore, OldStoreClient) of
- {{ok, Msg}, OldC1} ->
- ok = rabbit_msg_store:write(MessageIdInStore, Msg, NewStoreClient),
- OldC1;
- _ -> OldC
- end
- end,
- OldStoreClient,
- Queue).
-
-walk_queue_index(Fun, Client, #amqqueue{name = QueueName}) ->
- % WARNING: State is being recovered and terminated. This can cause side effects!
+ #amqqueue{name = QueueName} = Queue,
+ %% WARNING: During scan_queue_segments queue index state is being recovered
+ %% and terminated. This can cause side effects!
rabbit_queue_index:scan_queue_segments(
- fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, _IsAcked, ClientState)
+ %% We migrate only persistent messages, which is stored in msg_store
+ %% and is not acked yet
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, false, OldC)
when is_binary(MsgId) ->
- Fun(MsgId, ClientState);
- (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, ClientState) ->
- ClientState
- end, Client, QueueName).
-
-spawn_for_each(Fun, List) ->
- Ref = erlang:make_ref(),
- Self = self(),
- Processes = lists:map(
- fun(El) ->
- spawn_link(
- fun() ->
- Fun(El),
- Self ! {ok, self(), Ref}
- end)
+ migrate_message(MsgId, OldC, NewStoreClient);
+ (_SeqId, _MsgId, _MsgProps,
+ _IsPersistent, _IsDelivered, _IsAcked, OldC) ->
+ OldC
end,
- List),
- {Ref, Processes}.
-
-wait({Ref, Processes}) ->
- lists:foreach(
- fun(Proc) ->
- receive {ok, Proc, Ref} -> ok
- end
- end,
- Processes).
-
-get_new_store_client(Queue, NewStoreSup) ->
- Vhost = queue_vhost(Queue),
- get_new_client(NewStoreSup, Vhost).
-
-queue_vhost(#amqqueue{name = #resource{virtual_host = VHost}}) -> VHost.
+ OldStoreClient,
+ QueueName).
+
+migrate_message(MsgId, OldC, NewC) ->
+ case rabbit_msg_store:read(MsgId, OldC) of
+ {{ok, Msg}, OldC1} ->
+ case rabbit_msg_store:contains(MsgId, NewC) of
+ false -> ok = rabbit_msg_store:write(MsgId, Msg, NewC);
+ true -> ok
+ end,
+ % TODO: maybe remove in batches?
+ ok = rabbit_msg_store:remove([MsgId], OldC1),
+ OldC1;
+ _ -> OldC
+ end;
-get_new_client(NewStoreSup, VHost) ->
- rabbit_msg_store_vhost_sup:client_init(NewStoreSup,
+get_new_store_client(#amqqueue{name = #resource{virtual_host = VHost}},
+ NewStoreSup) ->
+ rabbit_msg_store_vhost_sup:client_init(NewStoreSup,
rabbit_guid:gen(),
- fun(_,_) -> ok end,
- fun() -> ok end,
+ fun(_,_) -> ok end,
+ fun() -> ok end,
VHost).
get_old_client(OldStore) ->
rabbit_msg_store:client_init(OldStore,
rabbit_guid:gen(),
- fun(_,_) -> ok end,
+ fun(_,_) -> ok end,
fun() -> ok end).
list_persistent_queues() ->
@@ -2785,7 +2785,7 @@ start_recovery_terms(Queues) ->
end],
{Refs, StartFunState}.
-run_old_persistent_store(Refs, StartFunState) ->
+run_old_persistent_store(Refs, StartFunState) ->
OldStoreName = ?PERSISTENT_MSG_STORE,
ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store,
[OldStoreName, rabbit_mnesia:dir(),
@@ -2794,22 +2794,17 @@ run_old_persistent_store(Refs, StartFunState) ->
start_new_store_sup() ->
% Start persistent store sup without recovery.
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP,
+ rabbit_msg_store_vhost_sup,
[?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
undefined, {fun (ok) -> finished end, ok}]),
?PERSISTENT_MSG_STORE_SUP.
delete_old_store(OldStore) ->
- gen_server:stop(OldStore),
- rabbit_file:recursive_delete([filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]).
-
-
-
-setup() ->
- application:load(rabbit),
- mnesia:start(),
- rabbit_sup:start_link(),
- rabbit:start_fhc(),
- rabbit_sup:start_restartable_child(rabbit_guid),
- rabbit_sup:start_supervisor_child(worker_pool_sup).
+ ok = rabbit_sup:stop_child(OldStore),
+ rabbit_file:recursive_delete(
+ [filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]),
+ %% Delete old transient store as well
+ rabbit_file:recursive_delete(
+ [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]).