diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-01-04 13:39:08 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-01-04 13:39:08 +0000 |
| commit | a1986871023be08bcee16cf1a55671f5bcb9612c (patch) | |
| tree | 43c2252c908f61db77b58ef8044d005ffe6d53a1 /src | |
| parent | 7842327c5653565ac13ea4406b41490a4f757881 (diff) | |
| download | rabbitmq-server-git-a1986871023be08bcee16cf1a55671f5bcb9612c.tar.gz | |
Sketch of how clustered upgrades might work.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mnesia.erl | 81 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 33 |
4 files changed, 120 insertions, 48 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 11f5e410bb..2550bdd4dd 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -35,7 +35,7 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1]). + forget_other_nodes/0, empty_ram_only_tables/0, copy_db/1]). -export([table_names/0]). @@ -66,6 +66,7 @@ -spec(is_clustered/0 :: () -> boolean()). -spec(running_clustered_nodes/0 :: () -> [node()]). -spec(all_clustered_nodes/0 :: () -> [node()]). +-spec(forget_other_nodes/0 :: () -> 'ok'). -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). @@ -126,8 +127,8 @@ cluster(ClusterNodes, Force) -> %% return node to its virgin state, where it is not member of any %% cluster, has no cluster configuration, no local database, and no %% persisted messages -reset() -> reset(false). -force_reset() -> reset(true). +reset() -> reset(all). +force_reset() -> reset(force_all). is_clustered() -> RunningNodes = running_clustered_nodes(), @@ -139,6 +140,10 @@ all_clustered_nodes() -> running_clustered_nodes() -> mnesia:system_info(running_db_nodes). +forget_other_nodes() -> + Nodes = all_clustered_nodes() -- [node()], + [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Nodes]. + empty_ram_only_tables() -> Node = node(), lists:foreach( @@ -385,32 +390,54 @@ init_db(ClusterNodes, Force) -> {[], true, [_]} -> %% True single disc node, attempt upgrade ok = wait_for_tables(), - case rabbit_upgrade:maybe_upgrade() of + case rabbit_upgrade:maybe_upgrade([mnesia, local]) of ok -> ensure_schema_ok(); version_not_available -> schema_ok_or_move() end; {[], true, _} -> %% "Master" (i.e. without config) disc node in cluster, - %% verify schema + %% do upgrade ok = wait_for_tables(), - ensure_version_ok(rabbit_upgrade:read_version()), - ensure_schema_ok(); + case rabbit_upgrade:maybe_upgrade([mnesia, local]) of + ok -> ensure_schema_ok(); + version_not_available -> schema_ok_or_move() + end; {[], false, _} -> %% Nothing there at all, start from scratch ok = create_schema(); {[AnotherNode|_], _, _} -> %% Subsequent node in cluster, catch up - ensure_version_ok(rabbit_upgrade:read_version()), - ensure_version_ok( - rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, disc_copies), - ok = create_local_table_copies(case IsDiskNode of - true -> disc; - false -> ram - end), + case IsDiskNode of + true -> + %% TODO test this branch ;) + %% TODO don't just reset every time we start up! + mnesia:stop(), + reset(mnesia), + mnesia:start(), + %% TODO what should we ensure? + %% ensure_version_ok(rabbit_upgrade:read_version()), + %% ensure_version_ok( + %% rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), + %% TODO needed? + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(disc); + false -> + ok = wait_for_replicated_tables(), + %% TODO can we live without this on disc? + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(ram), + case rabbit_upgrade:maybe_upgrade([local]) of + ok -> + ok; + %% If we're just starting up a new node + %% we won't have a version + version_not_available -> + ok = rabbit_upgrade:write_version() + end + end, ensure_schema_ok() end; {error, Reason} -> @@ -563,12 +590,15 @@ wait_for_tables(TableNames) -> throw({error, {failed_waiting_for_tables, Reason}}) end. -reset(Force) -> +%% Mode: force_all - get rid of everything unconditionally +%% all - get rid of everything, conditional on Mnesia working +%% mnesia - just get rid of Mnesia, leave everything else +reset(Mode) -> ok = ensure_mnesia_not_running(), Node = node(), - case Force of - true -> ok; - false -> + case Mode of + force_all -> ok; + _ -> ok = ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), {Nodes, RunningNodes} = @@ -583,9 +613,14 @@ reset(Force) -> rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), cannot_delete_schema) end, - ok = delete_cluster_nodes_config(), - %% remove persisted messages and any other garbage we find - ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")), + case Mode of + mnesia -> + ok; + _ -> + ok = delete_cluster_nodes_config(), + %% remove persisted messages and any other garbage we find + ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")) + end, ok. leave_cluster([], _) -> ok; diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 76c0a4ef86..6adcd8b0ce 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -182,7 +182,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, []}). +-rabbit_upgrade({add_queue_ttl, local, []}). -ifdef(use_specs). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 97a07514e4..dee08f48df 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -21,7 +21,7 @@ -module(rabbit_upgrade). --export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]). +-export([maybe_upgrade/1, read_version/0, write_version/0, desired_version/0]). -include("rabbit.hrl"). @@ -33,9 +33,10 @@ -ifdef(use_specs). -type(step() :: atom()). +-type(scope() :: 'mnesia' | 'local'). -type(version() :: [step()]). --spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). +-spec(maybe_upgrade/1 :: ([scope()]) -> 'ok' | 'version_not_available'). -spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). -spec(write_version/0 :: () -> 'ok'). -spec(desired_version/0 :: () -> version()). @@ -47,24 +48,28 @@ %% Try to upgrade the schema. If no information on the existing schema %% could be found, do nothing. rabbit_mnesia:check_schema_integrity() %% will catch the problem. -maybe_upgrade() -> +maybe_upgrade(Scopes) -> case read_version() of {ok, CurrentHeads} -> with_upgrade_graph( - fun (G) -> - case unknown_heads(CurrentHeads, G) of - [] -> case upgrades_to_apply(CurrentHeads, G) of - [] -> ok; - Upgrades -> apply_upgrades(Upgrades) - end; - Unknown -> throw({error, - {future_upgrades_found, Unknown}}) - end - end); + fun (G) -> maybe_upgrade_graph(CurrentHeads, Scopes, G) end); {error, enoent} -> version_not_available end. +maybe_upgrade_graph(CurrentHeads, Scopes, G) -> + case unknown_heads(CurrentHeads, G) of + [] -> + case upgrades_to_apply(CurrentHeads, Scopes, G) of + [] -> + ok; + Upgrades -> + apply_upgrades(Upgrades, lists:member(mnesia, Scopes)) + end; + Unknown -> + throw({error, {future_upgrades_found, Unknown}}) + end. + read_version() -> case rabbit_misc:read_term_file(schema_filename()) of {ok, [Heads]} -> {ok, Heads}; @@ -98,16 +103,17 @@ with_upgrade_graph(Fun) -> end. vertices(Module, Steps) -> - [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. + [{StepName, {Scope, {Module, StepName}}} || + {StepName, Scope, _Reqs} <- Steps]. edges(_Module, Steps) -> - [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. - + [{Require, StepName} || {StepName, _Scope, Requires} <- Steps, + Require <- Requires]. unknown_heads(Heads, G) -> [H || H <- Heads, digraph:vertex(G, H) =:= false]. -upgrades_to_apply(Heads, G) -> +upgrades_to_apply(Heads, Scopes, G) -> %% Take all the vertices which can reach the known heads. That's %% everything we've already applied. Subtract that from all %% vertices: that's what we have to apply. @@ -117,15 +123,17 @@ upgrades_to_apply(Heads, G) -> sets:from_list(digraph_utils:reaching(Heads, G)))), %% Form a subgraph from that list and find a topological ordering %% so we can invoke them in order. - [element(2, digraph:vertex(G, StepName)) || - StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. + Sorted = [element(2, digraph:vertex(G, StepName)) || + StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))], + %% Only return the upgrades for the appropriate scopes + [Upgrade || {Scope, Upgrade} <- Sorted, lists:member(Scope, Scopes)]. heads(G) -> lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). %% ------------------------------------------------------------------- -apply_upgrades(Upgrades) -> +apply_upgrades(Upgrades, ForgetOthers) -> LockFile = lock_filename(dir()), case rabbit_misc:lock_file(LockFile) of ok -> @@ -140,6 +148,10 @@ apply_upgrades(Upgrades) -> %% is not intuitive. Remove it. ok = file:delete(lock_filename(BackupDir)), info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), + case ForgetOthers of + true -> rabbit_mnesia:forget_other_nodes(); + _ -> ok + end, [apply_upgrade(Upgrade) || Upgrade <- Upgrades], info("Upgrades: All upgrades applied successfully~n", []), ok = write_version(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 7848c84874..43e468ff39 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -24,10 +24,14 @@ -compile([export_all]). --rabbit_upgrade({remove_user_scope, []}). --rabbit_upgrade({hash_passwords, []}). --rabbit_upgrade({add_ip_to_listener, []}). --rabbit_upgrade({internal_exchanges, []}). +-rabbit_upgrade({remove_user_scope, mnesia, []}). +-rabbit_upgrade({hash_passwords, mnesia, []}). +-rabbit_upgrade({add_ip_to_listener, mnesia, []}). +-rabbit_upgrade({internal_exchanges, mnesia, []}). + +-rabbit_upgrade({one, mnesia, []}). +-rabbit_upgrade({two, local, [one]}). +-rabbit_upgrade({three, mnesia, [two]}). %% ------------------------------------------------------------------- @@ -85,6 +89,27 @@ internal_exchanges() -> || T <- Tables ], ok. +one() -> + mnesia( + rabbit_user, + fun ({user, Username, Hash, IsAdmin}) -> + {user, Username, Hash, IsAdmin, foo} + end, + [username, password_hash, is_admin, extra]). + +two() -> + ok = rabbit_misc:write_term_file(filename:join(rabbit_mnesia:dir(), "test"), + [test]). + +three() -> + mnesia( + rabbit_user, + fun ({user, Username, Hash, IsAdmin, _}) -> + {user, Username, Hash, IsAdmin} + end, + [username, password_hash, is_admin]). + + %%-------------------------------------------------------------------- mnesia(TableName, Fun, FieldList) -> |
