summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-01-04 13:39:08 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-01-04 13:39:08 +0000
commita1986871023be08bcee16cf1a55671f5bcb9612c (patch)
tree43c2252c908f61db77b58ef8044d005ffe6d53a1 /src
parent7842327c5653565ac13ea4406b41490a4f757881 (diff)
downloadrabbitmq-server-git-a1986871023be08bcee16cf1a55671f5bcb9612c.tar.gz
Sketch of how clustered upgrades might work.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mnesia.erl81
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_upgrade.erl52
-rw-r--r--src/rabbit_upgrade_functions.erl33
4 files changed, 120 insertions, 48 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 11f5e410bb..2550bdd4dd 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -35,7 +35,7 @@
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, force_cluster/1, reset/0, force_reset/0,
is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
- empty_ram_only_tables/0, copy_db/1]).
+ forget_other_nodes/0, empty_ram_only_tables/0, copy_db/1]).
-export([table_names/0]).
@@ -66,6 +66,7 @@
-spec(is_clustered/0 :: () -> boolean()).
-spec(running_clustered_nodes/0 :: () -> [node()]).
-spec(all_clustered_nodes/0 :: () -> [node()]).
+-spec(forget_other_nodes/0 :: () -> 'ok').
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
@@ -126,8 +127,8 @@ cluster(ClusterNodes, Force) ->
%% return node to its virgin state, where it is not member of any
%% cluster, has no cluster configuration, no local database, and no
%% persisted messages
-reset() -> reset(false).
-force_reset() -> reset(true).
+reset() -> reset(all).
+force_reset() -> reset(force_all).
is_clustered() ->
RunningNodes = running_clustered_nodes(),
@@ -139,6 +140,10 @@ all_clustered_nodes() ->
running_clustered_nodes() ->
mnesia:system_info(running_db_nodes).
+forget_other_nodes() ->
+ Nodes = all_clustered_nodes() -- [node()],
+ [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Nodes].
+
empty_ram_only_tables() ->
Node = node(),
lists:foreach(
@@ -385,32 +390,54 @@ init_db(ClusterNodes, Force) ->
{[], true, [_]} ->
%% True single disc node, attempt upgrade
ok = wait_for_tables(),
- case rabbit_upgrade:maybe_upgrade() of
+ case rabbit_upgrade:maybe_upgrade([mnesia, local]) of
ok -> ensure_schema_ok();
version_not_available -> schema_ok_or_move()
end;
{[], true, _} ->
%% "Master" (i.e. without config) disc node in cluster,
- %% verify schema
+ %% do upgrade
ok = wait_for_tables(),
- ensure_version_ok(rabbit_upgrade:read_version()),
- ensure_schema_ok();
+ case rabbit_upgrade:maybe_upgrade([mnesia, local]) of
+ ok -> ensure_schema_ok();
+ version_not_available -> schema_ok_or_move()
+ end;
{[], false, _} ->
%% Nothing there at all, start from scratch
ok = create_schema();
{[AnotherNode|_], _, _} ->
%% Subsequent node in cluster, catch up
- ensure_version_ok(rabbit_upgrade:read_version()),
- ensure_version_ok(
- rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
- ok = wait_for_replicated_tables(),
- ok = create_local_table_copy(schema, disc_copies),
- ok = create_local_table_copies(case IsDiskNode of
- true -> disc;
- false -> ram
- end),
+ case IsDiskNode of
+ true ->
+ %% TODO test this branch ;)
+ %% TODO don't just reset every time we start up!
+ mnesia:stop(),
+ reset(mnesia),
+ mnesia:start(),
+ %% TODO what should we ensure?
+ %% ensure_version_ok(rabbit_upgrade:read_version()),
+ %% ensure_version_ok(
+ %% rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
+ %% TODO needed?
+ ok = wait_for_replicated_tables(),
+ ok = create_local_table_copy(schema, disc_copies),
+ ok = create_local_table_copies(disc);
+ false ->
+ ok = wait_for_replicated_tables(),
+ %% TODO can we live without this on disc?
+ ok = create_local_table_copy(schema, disc_copies),
+ ok = create_local_table_copies(ram),
+ case rabbit_upgrade:maybe_upgrade([local]) of
+ ok ->
+ ok;
+ %% If we're just starting up a new node
+ %% we won't have a version
+ version_not_available ->
+ ok = rabbit_upgrade:write_version()
+ end
+ end,
ensure_schema_ok()
end;
{error, Reason} ->
@@ -563,12 +590,15 @@ wait_for_tables(TableNames) ->
throw({error, {failed_waiting_for_tables, Reason}})
end.
-reset(Force) ->
+%% Mode: force_all - get rid of everything unconditionally
+%% all - get rid of everything, conditional on Mnesia working
+%% mnesia - just get rid of Mnesia, leave everything else
+reset(Mode) ->
ok = ensure_mnesia_not_running(),
Node = node(),
- case Force of
- true -> ok;
- false ->
+ case Mode of
+ force_all -> ok;
+ _ ->
ok = ensure_mnesia_dir(),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
{Nodes, RunningNodes} =
@@ -583,9 +613,14 @@ reset(Force) ->
rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
cannot_delete_schema)
end,
- ok = delete_cluster_nodes_config(),
- %% remove persisted messages and any other garbage we find
- ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")),
+ case Mode of
+ mnesia ->
+ ok;
+ _ ->
+ ok = delete_cluster_nodes_config(),
+ %% remove persisted messages and any other garbage we find
+ ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*"))
+ end,
ok.
leave_cluster([], _) -> ok;
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76c0a4ef86..6adcd8b0ce 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -182,7 +182,7 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({add_queue_ttl, []}).
+-rabbit_upgrade({add_queue_ttl, local, []}).
-ifdef(use_specs).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 97a07514e4..dee08f48df 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -21,7 +21,7 @@
-module(rabbit_upgrade).
--export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]).
+-export([maybe_upgrade/1, read_version/0, write_version/0, desired_version/0]).
-include("rabbit.hrl").
@@ -33,9 +33,10 @@
-ifdef(use_specs).
-type(step() :: atom()).
+-type(scope() :: 'mnesia' | 'local').
-type(version() :: [step()]).
--spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available').
+-spec(maybe_upgrade/1 :: ([scope()]) -> 'ok' | 'version_not_available').
-spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
-spec(write_version/0 :: () -> 'ok').
-spec(desired_version/0 :: () -> version()).
@@ -47,24 +48,28 @@
%% Try to upgrade the schema. If no information on the existing schema
%% could be found, do nothing. rabbit_mnesia:check_schema_integrity()
%% will catch the problem.
-maybe_upgrade() ->
+maybe_upgrade(Scopes) ->
case read_version() of
{ok, CurrentHeads} ->
with_upgrade_graph(
- fun (G) ->
- case unknown_heads(CurrentHeads, G) of
- [] -> case upgrades_to_apply(CurrentHeads, G) of
- [] -> ok;
- Upgrades -> apply_upgrades(Upgrades)
- end;
- Unknown -> throw({error,
- {future_upgrades_found, Unknown}})
- end
- end);
+ fun (G) -> maybe_upgrade_graph(CurrentHeads, Scopes, G) end);
{error, enoent} ->
version_not_available
end.
+maybe_upgrade_graph(CurrentHeads, Scopes, G) ->
+ case unknown_heads(CurrentHeads, G) of
+ [] ->
+ case upgrades_to_apply(CurrentHeads, Scopes, G) of
+ [] ->
+ ok;
+ Upgrades ->
+ apply_upgrades(Upgrades, lists:member(mnesia, Scopes))
+ end;
+ Unknown ->
+ throw({error, {future_upgrades_found, Unknown}})
+ end.
+
read_version() ->
case rabbit_misc:read_term_file(schema_filename()) of
{ok, [Heads]} -> {ok, Heads};
@@ -98,16 +103,17 @@ with_upgrade_graph(Fun) ->
end.
vertices(Module, Steps) ->
- [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps].
+ [{StepName, {Scope, {Module, StepName}}} ||
+ {StepName, Scope, _Reqs} <- Steps].
edges(_Module, Steps) ->
- [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires].
-
+ [{Require, StepName} || {StepName, _Scope, Requires} <- Steps,
+ Require <- Requires].
unknown_heads(Heads, G) ->
[H || H <- Heads, digraph:vertex(G, H) =:= false].
-upgrades_to_apply(Heads, G) ->
+upgrades_to_apply(Heads, Scopes, G) ->
%% Take all the vertices which can reach the known heads. That's
%% everything we've already applied. Subtract that from all
%% vertices: that's what we have to apply.
@@ -117,15 +123,17 @@ upgrades_to_apply(Heads, G) ->
sets:from_list(digraph_utils:reaching(Heads, G)))),
%% Form a subgraph from that list and find a topological ordering
%% so we can invoke them in order.
- [element(2, digraph:vertex(G, StepName)) ||
- StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
+ Sorted = [element(2, digraph:vertex(G, StepName)) ||
+ StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))],
+ %% Only return the upgrades for the appropriate scopes
+ [Upgrade || {Scope, Upgrade} <- Sorted, lists:member(Scope, Scopes)].
heads(G) ->
lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
%% -------------------------------------------------------------------
-apply_upgrades(Upgrades) ->
+apply_upgrades(Upgrades, ForgetOthers) ->
LockFile = lock_filename(dir()),
case rabbit_misc:lock_file(LockFile) of
ok ->
@@ -140,6 +148,10 @@ apply_upgrades(Upgrades) ->
%% is not intuitive. Remove it.
ok = file:delete(lock_filename(BackupDir)),
info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]),
+ case ForgetOthers of
+ true -> rabbit_mnesia:forget_other_nodes();
+ _ -> ok
+ end,
[apply_upgrade(Upgrade) || Upgrade <- Upgrades],
info("Upgrades: All upgrades applied successfully~n", []),
ok = write_version(),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 7848c84874..43e468ff39 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -24,10 +24,14 @@
-compile([export_all]).
--rabbit_upgrade({remove_user_scope, []}).
--rabbit_upgrade({hash_passwords, []}).
--rabbit_upgrade({add_ip_to_listener, []}).
--rabbit_upgrade({internal_exchanges, []}).
+-rabbit_upgrade({remove_user_scope, mnesia, []}).
+-rabbit_upgrade({hash_passwords, mnesia, []}).
+-rabbit_upgrade({add_ip_to_listener, mnesia, []}).
+-rabbit_upgrade({internal_exchanges, mnesia, []}).
+
+-rabbit_upgrade({one, mnesia, []}).
+-rabbit_upgrade({two, local, [one]}).
+-rabbit_upgrade({three, mnesia, [two]}).
%% -------------------------------------------------------------------
@@ -85,6 +89,27 @@ internal_exchanges() ->
|| T <- Tables ],
ok.
+one() ->
+ mnesia(
+ rabbit_user,
+ fun ({user, Username, Hash, IsAdmin}) ->
+ {user, Username, Hash, IsAdmin, foo}
+ end,
+ [username, password_hash, is_admin, extra]).
+
+two() ->
+ ok = rabbit_misc:write_term_file(filename:join(rabbit_mnesia:dir(), "test"),
+ [test]).
+
+three() ->
+ mnesia(
+ rabbit_user,
+ fun ({user, Username, Hash, IsAdmin, _}) ->
+ {user, Username, Hash, IsAdmin}
+ end,
+ [username, password_hash, is_admin]).
+
+
%%--------------------------------------------------------------------
mnesia(TableName, Fun, FieldList) ->