summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-01-26 12:19:50 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-01-26 12:19:50 +0000
commit0c9d75ada079600d96056defd5fee53a6e19b28e (patch)
treebc47d6dff81a48959544c63b066f80d2d26ad5a5 /src
parent1b2497cd867f1fa8c38845086797b44c77360c70 (diff)
parentfe9f73e59ec973740a09f2aa36778dc2d0de032d (diff)
downloadrabbitmq-server-git-0c9d75ada079600d96056defd5fee53a6e19b28e.tar.gz
Merged in default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mnesia.erl45
-rw-r--r--src/rabbit_prelaunch.erl4
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_upgrade.erl236
-rw-r--r--src/rabbit_upgrade_functions.erl10
5 files changed, 225 insertions, 72 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a9b4e17745..f7befebcd4 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -20,7 +20,8 @@
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, force_cluster/1, reset/0, force_reset/0,
is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
- empty_ram_only_tables/0, copy_db/1]).
+ empty_ram_only_tables/0, copy_db/1,
+ create_cluster_nodes_config/1, read_cluster_nodes_config/0]).
-export([table_names/0]).
@@ -54,6 +55,8 @@
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
+-spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok').
+-spec(read_cluster_nodes_config/0 :: () -> [node()]).
-endif.
@@ -366,26 +369,19 @@ init_db(ClusterNodes, Force) ->
end;
true -> ok
end,
- case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of
- {[], true, [_]} ->
- %% True single disc node, attempt upgrade
+ case {Nodes, mnesia:system_info(use_dir)} of
+ {[], false} ->
+ %% Nothing there at all, start from scratch
+ ok = create_schema();
+ {[], _} ->
+ %% We're the first node up
ok = wait_for_tables(),
- case rabbit_upgrade:maybe_upgrade() of
+ case rabbit_upgrade:maybe_upgrade(local) of
ok -> ensure_schema_ok();
version_not_available -> schema_ok_or_move()
end;
- {[], true, _} ->
- %% "Master" (i.e. without config) disc node in cluster,
- %% verify schema
- ok = wait_for_tables(),
- ensure_version_ok(rabbit_upgrade:read_version()),
- ensure_schema_ok();
- {[], false, _} ->
- %% Nothing there at all, start from scratch
- ok = create_schema();
- {[AnotherNode|_], _, _} ->
+ {[AnotherNode|_], _} ->
%% Subsequent node in cluster, catch up
- ensure_version_ok(rabbit_upgrade:read_version()),
ensure_version_ok(
rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
IsDiskNode = ClusterNodes == [] orelse
@@ -396,6 +392,14 @@ init_db(ClusterNodes, Force) ->
true -> disc;
false -> ram
end),
+ case rabbit_upgrade:maybe_upgrade(local) of
+ ok ->
+ ok;
+ %% If we're just starting up a new node we won't have
+ %% a version
+ version_not_available ->
+ ok = rabbit_upgrade:write_version()
+ end,
ensure_schema_ok()
end;
{error, Reason} ->
@@ -469,14 +473,7 @@ move_db() ->
ok.
copy_db(Destination) ->
- mnesia:stop(),
- case rabbit_misc:recursive_copy(dir(), Destination) of
- ok ->
- rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
- ok = wait_for_tables();
- {error, E} ->
- {error, E}
- end.
+ rabbit_misc:recursive_copy(dir(), Destination).
create_tables() ->
lists:foreach(fun ({Tab, TabDef}) ->
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index d9d92788e1..612aec8000 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -235,8 +235,8 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) ->
- [{apply,{rabbit,prepare,[]}}, Entry];
+process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) ->
+ [{apply,{rabbit_upgrade,maybe_upgrade_mnesia,[]}}, Entry];
process_entry(Entry) ->
[Entry].
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76b1136f8b..a3a970685d 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -167,7 +167,7 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({add_queue_ttl, []}).
+-rabbit_upgrade({add_queue_ttl, local, []}).
-ifdef(use_specs).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index b0a715233a..b222845d47 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -16,68 +16,213 @@
-module(rabbit_upgrade).
--export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]).
+-export([maybe_upgrade_mnesia/0, maybe_upgrade/1]).
+-export([read_version/0, write_version/0, desired_version/0,
+ desired_version/1]).
-include("rabbit.hrl").
-define(VERSION_FILENAME, "schema_version").
-define(LOCK_FILENAME, "schema_upgrade_lock").
+-define(SCOPES, [mnesia, local]).
%% -------------------------------------------------------------------
-ifdef(use_specs).
-type(step() :: atom()).
--type(version() :: [step()]).
+-type(version() :: [{scope(), [step()]}]).
+-type(scope() :: 'mnesia' | 'local').
--spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available').
+-spec(maybe_upgrade_mnesia/0 :: () -> 'ok').
+-spec(maybe_upgrade/1 :: (scope()) -> 'ok' | 'version_not_available').
-spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
-spec(write_version/0 :: () -> 'ok').
-spec(desired_version/0 :: () -> version()).
+-spec(desired_version/1 :: (scope()) -> [step()]).
-endif.
%% -------------------------------------------------------------------
-%% Try to upgrade the schema. If no information on the existing schema
-%% could be found, do nothing. rabbit_mnesia:check_schema_integrity()
-%% will catch the problem.
-maybe_upgrade() ->
- case read_version() of
- {ok, CurrentHeads} ->
- with_upgrade_graph(
- fun (G) ->
- case unknown_heads(CurrentHeads, G) of
- [] -> case upgrades_to_apply(CurrentHeads, G) of
- [] -> ok;
- Upgrades -> apply_upgrades(Upgrades)
- end;
- Unknown -> throw({error,
- {future_upgrades_found, Unknown}})
- end
- end);
- {error, enoent} ->
- version_not_available
+maybe_upgrade_mnesia() ->
+ rabbit:prepare(),
+ Nodes = rabbit_mnesia:all_clustered_nodes(),
+ case upgrades_required(mnesia) of
+ [_|_] = Upgrades ->
+ case am_i_upgrader(Nodes) of
+ true -> primary_upgrade(Upgrades, Nodes);
+ false -> non_primary_upgrade(Nodes)
+ end;
+ [] ->
+ ok;
+ version_not_available ->
+ case Nodes of
+ [_] -> ok;
+ _ -> die("Cluster upgrade needed but upgrading from "
+ "< 2.1.1.~n Unfortunately you will need to "
+ "rebuild the cluster.", [])
+ end
+ end.
+
+am_i_upgrader(Nodes) ->
+ Running = nodes_running(Nodes),
+ case Running of
+ [] ->
+ case am_i_disc_node() of
+ true -> true;
+ false -> die("Cluster upgrade needed but this is a ram "
+ "node.~n Please start any of the disc nodes "
+ "first.", [])
+ end;
+ [Another|_] ->
+ ClusterVersion =
+ case rpc:call(Another,
+ rabbit_upgrade, desired_version, [mnesia]) of
+ {badrpc, {'EXIT', {undef, _}}} -> unknown_old_version;
+ {badrpc, Reason} -> {unknown, Reason};
+ V -> V
+ end,
+ case desired_version(mnesia) of
+ ClusterVersion ->
+ %% The other node(s) have upgraded already, I am not the
+ %% upgrader
+ false;
+ MyVersion ->
+ %% The other node(s) are running an unexpected version.
+ die("Cluster upgrade needed but other nodes are "
+ "running ~p~nand I want ~p",
+ [ClusterVersion, MyVersion])
+ end
+ end.
+
+am_i_disc_node() ->
+ %% The cluster config does not list all disc nodes, but it will list us
+ %% if we're one.
+ case rabbit_mnesia:read_cluster_nodes_config() of
+ [] -> true;
+ DiscNodes -> lists:member(node(), DiscNodes)
+ end.
+
+die(Msg, Args) ->
+ %% We don't throw or exit here since that gets thrown
+ %% straight out into do_boot, generating an erl_crash.dump
+ %% and displaying any error message in a confusing way.
+ error_logger:error_msg(Msg, Args),
+ io:format("~n~n** " ++ Msg ++ " **~n~n~n", Args),
+ error_logger:logfile(close),
+ halt(1).
+
+primary_upgrade(Upgrades, Nodes) ->
+ Others = Nodes -- [node()],
+ apply_upgrades(
+ mnesia,
+ Upgrades,
+ fun () ->
+ rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ force_tables(),
+ case Others of
+ [] -> ok;
+ _ -> info("mnesia upgrades: Breaking cluster~n", []),
+ [{atomic, ok} = mnesia:del_table_copy(schema, Node)
+ || Node <- Others]
+ end
+ end),
+ ok.
+
+force_tables() ->
+ [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()].
+
+non_primary_upgrade(Nodes) ->
+ rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
+ cannot_delete_schema),
+ ok = rabbit_mnesia:create_cluster_nodes_config(Nodes),
+ write_version(mnesia),
+ ok.
+
+nodes_running(Nodes) ->
+ [N || N <- Nodes, node_running(N)].
+
+node_running(Node) ->
+ case rpc:call(Node, application, which_applications, []) of
+ {badrpc, _} -> false;
+ Apps -> lists:keysearch(rabbit, 1, Apps) =/= false
+ end.
+
+%% -------------------------------------------------------------------
+
+maybe_upgrade(Scope) ->
+ case upgrades_required(Scope) of
+ version_not_available -> version_not_available;
+ [] -> ok;
+ Upgrades -> apply_upgrades(Scope, Upgrades,
+ fun() -> ok end)
end.
read_version() ->
case rabbit_misc:read_term_file(schema_filename()) of
- {ok, [Heads]} -> {ok, Heads};
+ {ok, [V]} -> case is_new_version(V) of
+ false -> {ok, convert_old_version(V)};
+ true -> {ok, V}
+ end;
{error, _} = Err -> Err
end.
+read_version(Scope) ->
+ case read_version() of
+ {error, _} = E -> E;
+ {ok, V} -> {ok, orddict:fetch(Scope, V)}
+ end.
+
write_version() ->
ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]),
ok.
+write_version(Scope) ->
+ {ok, V0} = read_version(),
+ V = orddict:store(Scope, desired_version(Scope), V0),
+ ok = rabbit_misc:write_term_file(schema_filename(), [V]),
+ ok.
+
desired_version() ->
- with_upgrade_graph(fun (G) -> heads(G) end).
+ lists:foldl(
+ fun (Scope, Acc) ->
+ orddict:store(Scope, desired_version(Scope), Acc)
+ end,
+ orddict:new(), ?SCOPES).
+
+desired_version(Scope) ->
+ with_upgrade_graph(fun (G) -> heads(G) end, Scope).
+
+convert_old_version(Heads) ->
+ Locals = [add_queue_ttl],
+ V0 = orddict:new(),
+ V1 = orddict:store(mnesia, Heads -- Locals, V0),
+ orddict:store(local,
+ lists:filter(fun(H) -> lists:member(H, Locals) end, Heads),
+ V1).
%% -------------------------------------------------------------------
-with_upgrade_graph(Fun) ->
+upgrades_required(Scope) ->
+ case read_version(Scope) of
+ {ok, CurrentHeads} ->
+ with_upgrade_graph(
+ fun (G) ->
+ case unknown_heads(CurrentHeads, G) of
+ [] -> upgrades_to_apply(CurrentHeads, G);
+ Unknown -> throw({error,
+ {future_upgrades_found, Unknown}})
+ end
+ end, Scope);
+ {error, enoent} ->
+ version_not_available
+ end.
+
+with_upgrade_graph(Fun, Scope) ->
case rabbit_misc:build_acyclic_graph(
- fun vertices/2, fun edges/2,
+ fun (Module, Steps) -> vertices(Module, Steps, Scope) end,
+ fun (Module, Steps) -> edges(Module, Steps, Scope) end,
rabbit_misc:all_module_attributes(rabbit_upgrade)) of
{ok, G} -> try
Fun(G)
@@ -92,12 +237,14 @@ with_upgrade_graph(Fun) ->
throw({error, {cycle_in_upgrade_steps, StepNames}})
end.
-vertices(Module, Steps) ->
- [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps].
-
-edges(_Module, Steps) ->
- [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires].
+vertices(Module, Steps, Scope0) ->
+ [{StepName, {Module, StepName}} || {StepName, Scope1, _Reqs} <- Steps,
+ Scope0 == Scope1].
+edges(_Module, Steps, Scope0) ->
+ [{Require, StepName} || {StepName, Scope1, Requires} <- Steps,
+ Require <- Requires,
+ Scope0 == Scope1].
unknown_heads(Heads, G) ->
[H || H <- Heads, digraph:vertex(G, H) =:= false].
@@ -120,12 +267,12 @@ heads(G) ->
%% -------------------------------------------------------------------
-apply_upgrades(Upgrades) ->
+apply_upgrades(Scope, Upgrades, Fun) ->
LockFile = lock_filename(dir()),
case rabbit_misc:lock_file(LockFile) of
ok ->
BackupDir = dir() ++ "-upgrade-backup",
- info("Upgrades: ~w to apply~n", [length(Upgrades)]),
+ info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]),
case rabbit_mnesia:copy_db(BackupDir) of
ok ->
%% We need to make the backup after creating the
@@ -134,12 +281,15 @@ apply_upgrades(Upgrades) ->
%% the lock file exists in the backup too, which
%% is not intuitive. Remove it.
ok = file:delete(lock_filename(BackupDir)),
- info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]),
- [apply_upgrade(Upgrade) || Upgrade <- Upgrades],
- info("Upgrades: All upgrades applied successfully~n", []),
- ok = write_version(),
+ info("~s upgrades: Mnesia dir backed up to ~p~n",
+ [Scope, BackupDir]),
+ Fun(),
+ [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades],
+ info("~s upgrades: All upgrades applied successfully~n",
+ [Scope]),
+ ok = write_version(Scope),
ok = rabbit_misc:recursive_delete([BackupDir]),
- info("Upgrades: Mnesia backup removed~n", []),
+ info("~s upgrades: Mnesia backup removed~n", [Scope]),
ok = file:delete(LockFile);
{error, E} ->
%% If we can't backup, the upgrade hasn't started
@@ -152,8 +302,8 @@ apply_upgrades(Upgrades) ->
throw({error, previous_upgrade_failed})
end.
-apply_upgrade({M, F}) ->
- info("Upgrades: Applying ~w:~w~n", [M, F]),
+apply_upgrade(Scope, {M, F}) ->
+ info("~s upgrades: Applying ~w:~w~n", [Scope, M, F]),
ok = apply(M, F, []).
%% -------------------------------------------------------------------
@@ -167,3 +317,9 @@ lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME).
%% NB: we cannot use rabbit_log here since it may not have been
%% started yet
info(Msg, Args) -> error_logger:info_msg(Msg, Args).
+
+is_new_version(Version) ->
+ is_list(Version) andalso
+ length(Version) > 0 andalso
+ lists:all(fun(Item) -> is_tuple(Item) andalso size(Item) == 2 end,
+ Version).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 68b88b3e45..22c8262154 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -20,11 +20,11 @@
-compile([export_all]).
--rabbit_upgrade({remove_user_scope, []}).
--rabbit_upgrade({hash_passwords, []}).
--rabbit_upgrade({add_ip_to_listener, []}).
--rabbit_upgrade({internal_exchanges, []}).
--rabbit_upgrade({user_to_internal_user, [hash_passwords]}).
+-rabbit_upgrade({remove_user_scope, mnesia, []}).
+-rabbit_upgrade({hash_passwords, mnesia, []}).
+-rabbit_upgrade({add_ip_to_listener, mnesia, []}).
+-rabbit_upgrade({internal_exchanges, mnesia, []}).
+-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}).
%% -------------------------------------------------------------------