diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-01-26 12:19:50 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-01-26 12:19:50 +0000 |
| commit | 0c9d75ada079600d96056defd5fee53a6e19b28e (patch) | |
| tree | bc47d6dff81a48959544c63b066f80d2d26ad5a5 /src | |
| parent | 1b2497cd867f1fa8c38845086797b44c77360c70 (diff) | |
| parent | fe9f73e59ec973740a09f2aa36778dc2d0de032d (diff) | |
| download | rabbitmq-server-git-0c9d75ada079600d96056defd5fee53a6e19b28e.tar.gz | |
Merged in default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mnesia.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_prelaunch.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 236 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 10 |
5 files changed, 225 insertions, 72 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a9b4e17745..f7befebcd4 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -20,7 +20,8 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1]). + empty_ram_only_tables/0, copy_db/1, + create_cluster_nodes_config/1, read_cluster_nodes_config/0]). -export([table_names/0]). @@ -54,6 +55,8 @@ -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). +-spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). +-spec(read_cluster_nodes_config/0 :: () -> [node()]). -endif. @@ -366,26 +369,19 @@ init_db(ClusterNodes, Force) -> end; true -> ok end, - case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of - {[], true, [_]} -> - %% True single disc node, attempt upgrade + case {Nodes, mnesia:system_info(use_dir)} of + {[], false} -> + %% Nothing there at all, start from scratch + ok = create_schema(); + {[], _} -> + %% We're the first node up ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade() of + case rabbit_upgrade:maybe_upgrade(local) of ok -> ensure_schema_ok(); version_not_available -> schema_ok_or_move() end; - {[], true, _} -> - %% "Master" (i.e. without config) disc node in cluster, - %% verify schema - ok = wait_for_tables(), - ensure_version_ok(rabbit_upgrade:read_version()), - ensure_schema_ok(); - {[], false, _} -> - %% Nothing there at all, start from scratch - ok = create_schema(); - {[AnotherNode|_], _, _} -> + {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up - ensure_version_ok(rabbit_upgrade:read_version()), ensure_version_ok( rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), IsDiskNode = ClusterNodes == [] orelse @@ -396,6 +392,14 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), + case rabbit_upgrade:maybe_upgrade(local) of + ok -> + ok; + %% If we're just starting up a new node we won't have + %% a version + version_not_available -> + ok = rabbit_upgrade:write_version() + end, ensure_schema_ok() end; {error, Reason} -> @@ -469,14 +473,7 @@ move_db() -> ok. copy_db(Destination) -> - mnesia:stop(), - case rabbit_misc:recursive_copy(dir(), Destination) of - ok -> - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = wait_for_tables(); - {error, E} -> - {error, E} - end. + rabbit_misc:recursive_copy(dir(), Destination). create_tables() -> lists:foreach(fun ({Tab, TabDef}) -> diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index d9d92788e1..612aec8000 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -235,8 +235,8 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> - [{apply,{rabbit,prepare,[]}}, Entry]; +process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) -> + [{apply,{rabbit_upgrade,maybe_upgrade_mnesia,[]}}, Entry]; process_entry(Entry) -> [Entry]. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 76b1136f8b..a3a970685d 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -167,7 +167,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, []}). +-rabbit_upgrade({add_queue_ttl, local, []}). -ifdef(use_specs). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index b0a715233a..b222845d47 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -16,68 +16,213 @@ -module(rabbit_upgrade). --export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]). +-export([maybe_upgrade_mnesia/0, maybe_upgrade/1]). +-export([read_version/0, write_version/0, desired_version/0, + desired_version/1]). -include("rabbit.hrl"). -define(VERSION_FILENAME, "schema_version"). -define(LOCK_FILENAME, "schema_upgrade_lock"). +-define(SCOPES, [mnesia, local]). %% ------------------------------------------------------------------- -ifdef(use_specs). -type(step() :: atom()). --type(version() :: [step()]). +-type(version() :: [{scope(), [step()]}]). +-type(scope() :: 'mnesia' | 'local'). --spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). +-spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). +-spec(maybe_upgrade/1 :: (scope()) -> 'ok' | 'version_not_available'). -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). -spec(desired_version/0 :: () -> version()). +-spec(desired_version/1 :: (scope()) -> [step()]). -endif. %% ------------------------------------------------------------------- -%% Try to upgrade the schema. If no information on the existing schema -%% could be found, do nothing. rabbit_mnesia:check_schema_integrity() -%% will catch the problem. -maybe_upgrade() -> - case read_version() of - {ok, CurrentHeads} -> - with_upgrade_graph( - fun (G) -> - case unknown_heads(CurrentHeads, G) of - [] -> case upgrades_to_apply(CurrentHeads, G) of - [] -> ok; - Upgrades -> apply_upgrades(Upgrades) - end; - Unknown -> throw({error, - {future_upgrades_found, Unknown}}) - end - end); - {error, enoent} -> - version_not_available +maybe_upgrade_mnesia() -> + rabbit:prepare(), + Nodes = rabbit_mnesia:all_clustered_nodes(), + case upgrades_required(mnesia) of + [_|_] = Upgrades -> + case am_i_upgrader(Nodes) of + true -> primary_upgrade(Upgrades, Nodes); + false -> non_primary_upgrade(Nodes) + end; + [] -> + ok; + version_not_available -> + case Nodes of + [_] -> ok; + _ -> die("Cluster upgrade needed but upgrading from " + "< 2.1.1.~n Unfortunately you will need to " + "rebuild the cluster.", []) + end + end. + +am_i_upgrader(Nodes) -> + Running = nodes_running(Nodes), + case Running of + [] -> + case am_i_disc_node() of + true -> true; + false -> die("Cluster upgrade needed but this is a ram " + "node.~n Please start any of the disc nodes " + "first.", []) + end; + [Another|_] -> + ClusterVersion = + case rpc:call(Another, + rabbit_upgrade, desired_version, [mnesia]) of + {badrpc, {'EXIT', {undef, _}}} -> unknown_old_version; + {badrpc, Reason} -> {unknown, Reason}; + V -> V + end, + case desired_version(mnesia) of + ClusterVersion -> + %% The other node(s) have upgraded already, I am not the + %% upgrader + false; + MyVersion -> + %% The other node(s) are running an unexpected version. + die("Cluster upgrade needed but other nodes are " + "running ~p~nand I want ~p", + [ClusterVersion, MyVersion]) + end + end. + +am_i_disc_node() -> + %% The cluster config does not list all disc nodes, but it will list us + %% if we're one. + case rabbit_mnesia:read_cluster_nodes_config() of + [] -> true; + DiscNodes -> lists:member(node(), DiscNodes) + end. + +die(Msg, Args) -> + %% We don't throw or exit here since that gets thrown + %% straight out into do_boot, generating an erl_crash.dump + %% and displaying any error message in a confusing way. + error_logger:error_msg(Msg, Args), + io:format("~n~n** " ++ Msg ++ " **~n~n~n", Args), + error_logger:logfile(close), + halt(1). + +primary_upgrade(Upgrades, Nodes) -> + Others = Nodes -- [node()], + apply_upgrades( + mnesia, + Upgrades, + fun () -> + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + force_tables(), + case Others of + [] -> ok; + _ -> info("mnesia upgrades: Breaking cluster~n", []), + [{atomic, ok} = mnesia:del_table_copy(schema, Node) + || Node <- Others] + end + end), + ok. + +force_tables() -> + [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()]. + +non_primary_upgrade(Nodes) -> + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema), + ok = rabbit_mnesia:create_cluster_nodes_config(Nodes), + write_version(mnesia), + ok. + +nodes_running(Nodes) -> + [N || N <- Nodes, node_running(N)]. + +node_running(Node) -> + case rpc:call(Node, application, which_applications, []) of + {badrpc, _} -> false; + Apps -> lists:keysearch(rabbit, 1, Apps) =/= false + end. + +%% ------------------------------------------------------------------- + +maybe_upgrade(Scope) -> + case upgrades_required(Scope) of + version_not_available -> version_not_available; + [] -> ok; + Upgrades -> apply_upgrades(Scope, Upgrades, + fun() -> ok end) end. read_version() -> case rabbit_misc:read_term_file(schema_filename()) of - {ok, [Heads]} -> {ok, Heads}; + {ok, [V]} -> case is_new_version(V) of + false -> {ok, convert_old_version(V)}; + true -> {ok, V} + end; {error, _} = Err -> Err end. +read_version(Scope) -> + case read_version() of + {error, _} = E -> E; + {ok, V} -> {ok, orddict:fetch(Scope, V)} + end. + write_version() -> ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), ok. +write_version(Scope) -> + {ok, V0} = read_version(), + V = orddict:store(Scope, desired_version(Scope), V0), + ok = rabbit_misc:write_term_file(schema_filename(), [V]), + ok. + desired_version() -> - with_upgrade_graph(fun (G) -> heads(G) end). + lists:foldl( + fun (Scope, Acc) -> + orddict:store(Scope, desired_version(Scope), Acc) + end, + orddict:new(), ?SCOPES). + +desired_version(Scope) -> + with_upgrade_graph(fun (G) -> heads(G) end, Scope). + +convert_old_version(Heads) -> + Locals = [add_queue_ttl], + V0 = orddict:new(), + V1 = orddict:store(mnesia, Heads -- Locals, V0), + orddict:store(local, + lists:filter(fun(H) -> lists:member(H, Locals) end, Heads), + V1). %% ------------------------------------------------------------------- -with_upgrade_graph(Fun) -> +upgrades_required(Scope) -> + case read_version(Scope) of + {ok, CurrentHeads} -> + with_upgrade_graph( + fun (G) -> + case unknown_heads(CurrentHeads, G) of + [] -> upgrades_to_apply(CurrentHeads, G); + Unknown -> throw({error, + {future_upgrades_found, Unknown}}) + end + end, Scope); + {error, enoent} -> + version_not_available + end. + +with_upgrade_graph(Fun, Scope) -> case rabbit_misc:build_acyclic_graph( - fun vertices/2, fun edges/2, + fun (Module, Steps) -> vertices(Module, Steps, Scope) end, + fun (Module, Steps) -> edges(Module, Steps, Scope) end, rabbit_misc:all_module_attributes(rabbit_upgrade)) of {ok, G} -> try Fun(G) @@ -92,12 +237,14 @@ with_upgrade_graph(Fun) -> throw({error, {cycle_in_upgrade_steps, StepNames}}) end. -vertices(Module, Steps) -> - [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. - -edges(_Module, Steps) -> - [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. +vertices(Module, Steps, Scope0) -> + [{StepName, {Module, StepName}} || {StepName, Scope1, _Reqs} <- Steps, + Scope0 == Scope1]. +edges(_Module, Steps, Scope0) -> + [{Require, StepName} || {StepName, Scope1, Requires} <- Steps, + Require <- Requires, + Scope0 == Scope1]. unknown_heads(Heads, G) -> [H || H <- Heads, digraph:vertex(G, H) =:= false]. @@ -120,12 +267,12 @@ heads(G) -> %% ------------------------------------------------------------------- -apply_upgrades(Upgrades) -> +apply_upgrades(Scope, Upgrades, Fun) -> LockFile = lock_filename(dir()), case rabbit_misc:lock_file(LockFile) of ok -> BackupDir = dir() ++ "-upgrade-backup", - info("Upgrades: ~w to apply~n", [length(Upgrades)]), + info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), case rabbit_mnesia:copy_db(BackupDir) of ok -> %% We need to make the backup after creating the @@ -134,12 +281,15 @@ apply_upgrades(Upgrades) -> %% the lock file exists in the backup too, which %% is not intuitive. Remove it. ok = file:delete(lock_filename(BackupDir)), - info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), - [apply_upgrade(Upgrade) || Upgrade <- Upgrades], - info("Upgrades: All upgrades applied successfully~n", []), - ok = write_version(), + info("~s upgrades: Mnesia dir backed up to ~p~n", + [Scope, BackupDir]), + Fun(), + [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], + info("~s upgrades: All upgrades applied successfully~n", + [Scope]), + ok = write_version(Scope), ok = rabbit_misc:recursive_delete([BackupDir]), - info("Upgrades: Mnesia backup removed~n", []), + info("~s upgrades: Mnesia backup removed~n", [Scope]), ok = file:delete(LockFile); {error, E} -> %% If we can't backup, the upgrade hasn't started @@ -152,8 +302,8 @@ apply_upgrades(Upgrades) -> throw({error, previous_upgrade_failed}) end. -apply_upgrade({M, F}) -> - info("Upgrades: Applying ~w:~w~n", [M, F]), +apply_upgrade(Scope, {M, F}) -> + info("~s upgrades: Applying ~w:~w~n", [Scope, M, F]), ok = apply(M, F, []). %% ------------------------------------------------------------------- @@ -167,3 +317,9 @@ lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). %% NB: we cannot use rabbit_log here since it may not have been %% started yet info(Msg, Args) -> error_logger:info_msg(Msg, Args). + +is_new_version(Version) -> + is_list(Version) andalso + length(Version) > 0 andalso + lists:all(fun(Item) -> is_tuple(Item) andalso size(Item) == 2 end, + Version). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 68b88b3e45..22c8262154 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -20,11 +20,11 @@ -compile([export_all]). --rabbit_upgrade({remove_user_scope, []}). --rabbit_upgrade({hash_passwords, []}). --rabbit_upgrade({add_ip_to_listener, []}). --rabbit_upgrade({internal_exchanges, []}). --rabbit_upgrade({user_to_internal_user, [hash_passwords]}). +-rabbit_upgrade({remove_user_scope, mnesia, []}). +-rabbit_upgrade({hash_passwords, mnesia, []}). +-rabbit_upgrade({add_ip_to_listener, mnesia, []}). +-rabbit_upgrade({internal_exchanges, mnesia, []}). +-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). %% ------------------------------------------------------------------- |
