diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-01-06 18:03:26 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-01-06 18:03:26 +0000 |
| commit | 903ca29aadf6be24cb428f8e66fc003e08833b13 (patch) | |
| tree | e05f04a96a1c28ecd6587ee24a702e36c2bee90b /src | |
| parent | 4de1294c7c8172943de213d106db8ad96a0971df (diff) | |
| parent | e986737e0f1c0fc4e2f2c5369db7fc2eb9a7534f (diff) | |
| download | rabbitmq-server-git-903ca29aadf6be24cb428f8e66fc003e08833b13.tar.gz | |
Merge from default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mnesia.erl | 127 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 50 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 35 |
4 files changed, 158 insertions, 56 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 38cc82a656..c536c64f55 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -43,6 +43,8 @@ %% other mnesia-using Erlang applications, such as ejabberd -export([create_tables/0]). +-define(EXAMPLE_RABBIT_TABLE, rabbit_durable_exchange). + -include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -139,6 +141,11 @@ all_clustered_nodes() -> running_clustered_nodes() -> mnesia:system_info(running_db_nodes). +forget_other_nodes() -> + Nodes = all_clustered_nodes() -- [node()], + [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Nodes], + ok. + empty_ram_only_tables() -> Node = node(), lists:foreach( @@ -159,7 +166,7 @@ nodes_of_type(Type) -> %% Specifically, we check whether a certain table, which we know %% will be written to disk on a disc node, is stored on disk or in %% RAM. - mnesia:table_info(rabbit_durable_exchange, Type). + mnesia:table_info(?EXAMPLE_RABBIT_TABLE, Type). table_definitions() -> [{rabbit_user, @@ -381,37 +388,51 @@ init_db(ClusterNodes, Force) -> end; true -> ok end, - case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of - {[], true, [_]} -> - %% True single disc node, attempt upgrade - ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade() of - ok -> ensure_schema_ok(); - version_not_available -> schema_ok_or_move() - end; - {[], true, _} -> - %% "Master" (i.e. without config) disc node in cluster, - %% verify schema - ok = wait_for_tables(), - ensure_version_ok(rabbit_upgrade:read_version()), - ensure_schema_ok(); - {[], false, _} -> + case {Nodes, mnesia:system_info(use_dir)} of + {[], false} -> %% Nothing there at all, start from scratch ok = create_schema(); - {[AnotherNode|_], _, _} -> - %% Subsequent node in cluster, catch up - ensure_version_ok(rabbit_upgrade:read_version()), - ensure_version_ok( - rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(case IsDiskNode of - true -> disc; - false -> ram - end), - ensure_schema_ok() + {_, _} -> + DiscNodes = mnesia:table_info(schema, disc_copies), + case are_we_upgrader(DiscNodes) of + true -> + %% True single disc node, or last disc + %% node in cluster to shut down, attempt + %% upgrade + ok = wait_for_tables(), + case rabbit_upgrade:maybe_upgrade( + [mnesia, local], + fun () -> ok end, + fun forget_other_nodes/0) of + ok -> ensure_schema_ok(); + version_not_available -> schema_ok_or_move() + end; + false -> + %% Subsequent node in cluster, catch up + %% TODO how to do this? + %% ensure_version_ok( + %% rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + case rabbit_upgrade:maybe_upgrade( + [local], + ensure_nodes_running_fun(DiscNodes), + reset_fun(DiscNodes -- [node()])) of + ok -> + ok; + %% If we're just starting up a new node + %% we won't have a version + version_not_available -> + ok = rabbit_upgrade:write_version() + end, + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(case IsDiskNode of + true -> disc; + false -> ram + end), + ensure_schema_ok() + end end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -450,6 +471,52 @@ ensure_schema_ok() -> {error, Reason} -> throw({error, {schema_invalid, Reason}}) end. +ensure_nodes_running_fun(Nodes) -> + fun() -> + case nodes_running(Nodes) of + [] -> + exit("Cluster upgrade needed. The first node you start " + "should be the last node to be shut down."); + _ -> + ok + end + end. + +reset_fun(Nodes) -> + fun() -> + mnesia:stop(), + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema), + rabbit_misc:ensure_ok(mnesia:start(), + cannot_start_mnesia), + {ok, _} = mnesia:change_config(extra_db_nodes, Nodes), + ok + end. + +%% Were we the last node in the cluster to shut down or is there no cluster? +%% The answer to this is yes if: +%% * We are our canonical source for reading a table +%% - If the canonical source is "nowhere" or another node, we are out of date +%% * No other nodes are running Mnesia and have finished booting Rabbit. +%% - Since any node will be its own canonical source once the cluster is up. + +are_we_upgrader(Nodes) -> + Where = mnesia:table_info(?EXAMPLE_RABBIT_TABLE, where_to_read), + Node = node(), + case {Where, nodes_running(Nodes)} of + {Node, []} -> true; + {_, _} -> false + end. + +nodes_running(Nodes) -> + [N || N <- Nodes, node_running(N)]. + +node_running(Node) -> + case rpc:call(Node, application, which_applications, []) of + {badrpc, _} -> false; + Apps -> lists:keysearch(rabbit, 1, Apps) =/= false + end. + create_schema() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 76c0a4ef86..6adcd8b0ce 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -182,7 +182,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, []}). +-rabbit_upgrade({add_queue_ttl, local, []}). -ifdef(use_specs). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 97a07514e4..c852a0f953 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -21,7 +21,7 @@ -module(rabbit_upgrade). --export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]). +-export([maybe_upgrade/3, read_version/0, write_version/0, desired_version/0]). -include("rabbit.hrl"). @@ -33,9 +33,11 @@ -ifdef(use_specs). -type(step() :: atom()). +-type(scope() :: 'mnesia' | 'local'). -type(version() :: [step()]). --spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). +-spec(maybe_upgrade/3 :: ([scope()], fun (() -> 'ok'), fun (() -> 'ok')) + -> 'ok' | 'version_not_available'). -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). -spec(desired_version/0 :: () -> version()). @@ -47,24 +49,27 @@ %% Try to upgrade the schema. If no information on the existing schema %% could be found, do nothing. rabbit_mnesia:check_schema_integrity() %% will catch the problem. -maybe_upgrade() -> +maybe_upgrade(Scopes, GuardFun, UpgradeFun) -> case read_version() of {ok, CurrentHeads} -> with_upgrade_graph( - fun (G) -> - case unknown_heads(CurrentHeads, G) of - [] -> case upgrades_to_apply(CurrentHeads, G) of - [] -> ok; - Upgrades -> apply_upgrades(Upgrades) - end; - Unknown -> throw({error, - {future_upgrades_found, Unknown}}) - end - end); + fun (G) -> maybe_upgrade_graph(CurrentHeads, Scopes, + GuardFun, UpgradeFun, G) end); {error, enoent} -> version_not_available end. +maybe_upgrade_graph(CurrentHeads, Scopes, GuardFun, UpgradeFun, G) -> + case unknown_heads(CurrentHeads, G) of + [] -> + case upgrades_to_apply(CurrentHeads, Scopes, G) of + [] -> ok; + Upgrades -> apply_upgrades(Upgrades, GuardFun, UpgradeFun) + end; + Unknown -> + throw({error, {future_upgrades_found, Unknown}}) + end. + read_version() -> case rabbit_misc:read_term_file(schema_filename()) of {ok, [Heads]} -> {ok, Heads}; @@ -98,16 +103,17 @@ with_upgrade_graph(Fun) -> end. vertices(Module, Steps) -> - [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. + [{StepName, {Scope, {Module, StepName}}} || + {StepName, Scope, _Reqs} <- Steps]. edges(_Module, Steps) -> - [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. - + [{Require, StepName} || {StepName, _Scope, Requires} <- Steps, + Require <- Requires]. unknown_heads(Heads, G) -> [H || H <- Heads, digraph:vertex(G, H) =:= false]. -upgrades_to_apply(Heads, G) -> +upgrades_to_apply(Heads, Scopes, G) -> %% Take all the vertices which can reach the known heads. That's %% everything we've already applied. Subtract that from all %% vertices: that's what we have to apply. @@ -117,15 +123,18 @@ upgrades_to_apply(Heads, G) -> sets:from_list(digraph_utils:reaching(Heads, G)))), %% Form a subgraph from that list and find a topological ordering %% so we can invoke them in order. - [element(2, digraph:vertex(G, StepName)) || - StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. + Sorted = [element(2, digraph:vertex(G, StepName)) || + StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))], + %% Only return the upgrades for the appropriate scopes + [Upgrade || {Scope, Upgrade} <- Sorted, lists:member(Scope, Scopes)]. heads(G) -> lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). %% ------------------------------------------------------------------- -apply_upgrades(Upgrades) -> +apply_upgrades(Upgrades, GuardFun, UpgradeFun) -> + GuardFun(), LockFile = lock_filename(dir()), case rabbit_misc:lock_file(LockFile) of ok -> @@ -140,6 +149,7 @@ apply_upgrades(Upgrades) -> %% is not intuitive. Remove it. ok = file:delete(lock_filename(BackupDir)), info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), + ok = UpgradeFun(), [apply_upgrade(Upgrade) || Upgrade <- Upgrades], info("Upgrades: All upgrades applied successfully~n", []), ok = write_version(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index fc00976a45..8fee70af1f 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -24,11 +24,15 @@ -compile([export_all]). --rabbit_upgrade({remove_user_scope, []}). --rabbit_upgrade({hash_passwords, []}). --rabbit_upgrade({add_ip_to_listener, []}). --rabbit_upgrade({internal_exchanges, []}). --rabbit_upgrade({user_to_internal_user, []}). +-rabbit_upgrade({remove_user_scope, mnesia, []}). +-rabbit_upgrade({hash_passwords, mnesia, []}). +-rabbit_upgrade({add_ip_to_listener, mnesia, []}). +-rabbit_upgrade({internal_exchanges, mnesia, []}). +-rabbit_upgrade({user_to_internal_user, mnesia, []}). + +-rabbit_upgrade({one, mnesia, []}). +-rabbit_upgrade({two, local, [one]}). +-rabbit_upgrade({three, mnesia, [two]}). %% ------------------------------------------------------------------- @@ -87,6 +91,27 @@ internal_exchanges() -> || T <- Tables ], ok. +one() -> + mnesia( + rabbit_user, + fun ({user, Username, Hash, IsAdmin}) -> + {user, Username, Hash, IsAdmin, foo} + end, + [username, password_hash, is_admin, extra]). + +two() -> + ok = rabbit_misc:write_term_file(filename:join(rabbit_mnesia:dir(), "test"), + [test]). + +three() -> + mnesia( + rabbit_user, + fun ({user, Username, Hash, IsAdmin, _}) -> + {user, Username, Hash, IsAdmin} + end, + [username, password_hash, is_admin]). + + user_to_internal_user() -> mnesia( rabbit_user, |
