summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-01-06 18:03:26 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-01-06 18:03:26 +0000
commit903ca29aadf6be24cb428f8e66fc003e08833b13 (patch)
treee05f04a96a1c28ecd6587ee24a702e36c2bee90b /src
parent4de1294c7c8172943de213d106db8ad96a0971df (diff)
parente986737e0f1c0fc4e2f2c5369db7fc2eb9a7534f (diff)
downloadrabbitmq-server-git-903ca29aadf6be24cb428f8e66fc003e08833b13.tar.gz
Merge from default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mnesia.erl127
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_upgrade.erl50
-rw-r--r--src/rabbit_upgrade_functions.erl35
4 files changed, 158 insertions, 56 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 38cc82a656..c536c64f55 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -43,6 +43,8 @@
%% other mnesia-using Erlang applications, such as ejabberd
-export([create_tables/0]).
+-define(EXAMPLE_RABBIT_TABLE, rabbit_durable_exchange).
+
-include("rabbit.hrl").
%%----------------------------------------------------------------------------
@@ -139,6 +141,11 @@ all_clustered_nodes() ->
running_clustered_nodes() ->
mnesia:system_info(running_db_nodes).
+forget_other_nodes() ->
+ Nodes = all_clustered_nodes() -- [node()],
+ [{atomic, ok} = mnesia:del_table_copy(schema, Node) || Node <- Nodes],
+ ok.
+
empty_ram_only_tables() ->
Node = node(),
lists:foreach(
@@ -159,7 +166,7 @@ nodes_of_type(Type) ->
%% Specifically, we check whether a certain table, which we know
%% will be written to disk on a disc node, is stored on disk or in
%% RAM.
- mnesia:table_info(rabbit_durable_exchange, Type).
+ mnesia:table_info(?EXAMPLE_RABBIT_TABLE, Type).
table_definitions() ->
[{rabbit_user,
@@ -381,37 +388,51 @@ init_db(ClusterNodes, Force) ->
end;
true -> ok
end,
- case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of
- {[], true, [_]} ->
- %% True single disc node, attempt upgrade
- ok = wait_for_tables(),
- case rabbit_upgrade:maybe_upgrade() of
- ok -> ensure_schema_ok();
- version_not_available -> schema_ok_or_move()
- end;
- {[], true, _} ->
- %% "Master" (i.e. without config) disc node in cluster,
- %% verify schema
- ok = wait_for_tables(),
- ensure_version_ok(rabbit_upgrade:read_version()),
- ensure_schema_ok();
- {[], false, _} ->
+ case {Nodes, mnesia:system_info(use_dir)} of
+ {[], false} ->
%% Nothing there at all, start from scratch
ok = create_schema();
- {[AnotherNode|_], _, _} ->
- %% Subsequent node in cluster, catch up
- ensure_version_ok(rabbit_upgrade:read_version()),
- ensure_version_ok(
- rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
- IsDiskNode = ClusterNodes == [] orelse
- lists:member(node(), ClusterNodes),
- ok = wait_for_replicated_tables(),
- ok = create_local_table_copy(schema, disc_copies),
- ok = create_local_table_copies(case IsDiskNode of
- true -> disc;
- false -> ram
- end),
- ensure_schema_ok()
+ {_, _} ->
+ DiscNodes = mnesia:table_info(schema, disc_copies),
+ case are_we_upgrader(DiscNodes) of
+ true ->
+ %% True single disc node, or last disc
+ %% node in cluster to shut down, attempt
+ %% upgrade
+ ok = wait_for_tables(),
+ case rabbit_upgrade:maybe_upgrade(
+ [mnesia, local],
+ fun () -> ok end,
+ fun forget_other_nodes/0) of
+ ok -> ensure_schema_ok();
+ version_not_available -> schema_ok_or_move()
+ end;
+ false ->
+ %% Subsequent node in cluster, catch up
+ %% TODO how to do this?
+ %% ensure_version_ok(
+ %% rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
+ IsDiskNode = ClusterNodes == [] orelse
+ lists:member(node(), ClusterNodes),
+ case rabbit_upgrade:maybe_upgrade(
+ [local],
+ ensure_nodes_running_fun(DiscNodes),
+ reset_fun(DiscNodes -- [node()])) of
+ ok ->
+ ok;
+ %% If we're just starting up a new node
+ %% we won't have a version
+ version_not_available ->
+ ok = rabbit_upgrade:write_version()
+ end,
+ ok = wait_for_replicated_tables(),
+ ok = create_local_table_copy(schema, disc_copies),
+ ok = create_local_table_copies(case IsDiskNode of
+ true -> disc;
+ false -> ram
+ end),
+ ensure_schema_ok()
+ end
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
@@ -450,6 +471,52 @@ ensure_schema_ok() ->
{error, Reason} -> throw({error, {schema_invalid, Reason}})
end.
+ensure_nodes_running_fun(Nodes) ->
+ fun() ->
+ case nodes_running(Nodes) of
+ [] ->
+ exit("Cluster upgrade needed. The first node you start "
+ "should be the last node to be shut down.");
+ _ ->
+ ok
+ end
+ end.
+
+reset_fun(Nodes) ->
+ fun() ->
+ mnesia:stop(),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
+ cannot_delete_schema),
+ rabbit_misc:ensure_ok(mnesia:start(),
+ cannot_start_mnesia),
+ {ok, _} = mnesia:change_config(extra_db_nodes, Nodes),
+ ok
+ end.
+
+%% Were we the last node in the cluster to shut down or is there no cluster?
+%% The answer to this is yes if:
+%% * We are our canonical source for reading a table
+%% - If the canonical source is "nowhere" or another node, we are out of date
+%% * No other nodes are running Mnesia and have finished booting Rabbit.
+%% - Since any node will be its own canonical source once the cluster is up.
+
+are_we_upgrader(Nodes) ->
+ Where = mnesia:table_info(?EXAMPLE_RABBIT_TABLE, where_to_read),
+ Node = node(),
+ case {Where, nodes_running(Nodes)} of
+ {Node, []} -> true;
+ {_, _} -> false
+ end.
+
+nodes_running(Nodes) ->
+ [N || N <- Nodes, node_running(N)].
+
+node_running(Node) ->
+ case rpc:call(Node, application, which_applications, []) of
+ {badrpc, _} -> false;
+ Apps -> lists:keysearch(rabbit, 1, Apps) =/= false
+ end.
+
create_schema() ->
mnesia:stop(),
rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76c0a4ef86..6adcd8b0ce 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -182,7 +182,7 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({add_queue_ttl, []}).
+-rabbit_upgrade({add_queue_ttl, local, []}).
-ifdef(use_specs).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 97a07514e4..c852a0f953 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -21,7 +21,7 @@
-module(rabbit_upgrade).
--export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]).
+-export([maybe_upgrade/3, read_version/0, write_version/0, desired_version/0]).
-include("rabbit.hrl").
@@ -33,9 +33,11 @@
-ifdef(use_specs).
-type(step() :: atom()).
+-type(scope() :: 'mnesia' | 'local').
-type(version() :: [step()]).
--spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available').
+-spec(maybe_upgrade/3 :: ([scope()], fun (() -> 'ok'), fun (() -> 'ok'))
+ -> 'ok' | 'version_not_available').
-spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
-spec(write_version/0 :: () -> 'ok').
-spec(desired_version/0 :: () -> version()).
@@ -47,24 +49,27 @@
%% Try to upgrade the schema. If no information on the existing schema
%% could be found, do nothing. rabbit_mnesia:check_schema_integrity()
%% will catch the problem.
-maybe_upgrade() ->
+maybe_upgrade(Scopes, GuardFun, UpgradeFun) ->
case read_version() of
{ok, CurrentHeads} ->
with_upgrade_graph(
- fun (G) ->
- case unknown_heads(CurrentHeads, G) of
- [] -> case upgrades_to_apply(CurrentHeads, G) of
- [] -> ok;
- Upgrades -> apply_upgrades(Upgrades)
- end;
- Unknown -> throw({error,
- {future_upgrades_found, Unknown}})
- end
- end);
+ fun (G) -> maybe_upgrade_graph(CurrentHeads, Scopes,
+ GuardFun, UpgradeFun, G) end);
{error, enoent} ->
version_not_available
end.
+maybe_upgrade_graph(CurrentHeads, Scopes, GuardFun, UpgradeFun, G) ->
+ case unknown_heads(CurrentHeads, G) of
+ [] ->
+ case upgrades_to_apply(CurrentHeads, Scopes, G) of
+ [] -> ok;
+ Upgrades -> apply_upgrades(Upgrades, GuardFun, UpgradeFun)
+ end;
+ Unknown ->
+ throw({error, {future_upgrades_found, Unknown}})
+ end.
+
read_version() ->
case rabbit_misc:read_term_file(schema_filename()) of
{ok, [Heads]} -> {ok, Heads};
@@ -98,16 +103,17 @@ with_upgrade_graph(Fun) ->
end.
vertices(Module, Steps) ->
- [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps].
+ [{StepName, {Scope, {Module, StepName}}} ||
+ {StepName, Scope, _Reqs} <- Steps].
edges(_Module, Steps) ->
- [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires].
-
+ [{Require, StepName} || {StepName, _Scope, Requires} <- Steps,
+ Require <- Requires].
unknown_heads(Heads, G) ->
[H || H <- Heads, digraph:vertex(G, H) =:= false].
-upgrades_to_apply(Heads, G) ->
+upgrades_to_apply(Heads, Scopes, G) ->
%% Take all the vertices which can reach the known heads. That's
%% everything we've already applied. Subtract that from all
%% vertices: that's what we have to apply.
@@ -117,15 +123,18 @@ upgrades_to_apply(Heads, G) ->
sets:from_list(digraph_utils:reaching(Heads, G)))),
%% Form a subgraph from that list and find a topological ordering
%% so we can invoke them in order.
- [element(2, digraph:vertex(G, StepName)) ||
- StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
+ Sorted = [element(2, digraph:vertex(G, StepName)) ||
+ StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))],
+ %% Only return the upgrades for the appropriate scopes
+ [Upgrade || {Scope, Upgrade} <- Sorted, lists:member(Scope, Scopes)].
heads(G) ->
lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
%% -------------------------------------------------------------------
-apply_upgrades(Upgrades) ->
+apply_upgrades(Upgrades, GuardFun, UpgradeFun) ->
+ GuardFun(),
LockFile = lock_filename(dir()),
case rabbit_misc:lock_file(LockFile) of
ok ->
@@ -140,6 +149,7 @@ apply_upgrades(Upgrades) ->
%% is not intuitive. Remove it.
ok = file:delete(lock_filename(BackupDir)),
info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]),
+ ok = UpgradeFun(),
[apply_upgrade(Upgrade) || Upgrade <- Upgrades],
info("Upgrades: All upgrades applied successfully~n", []),
ok = write_version(),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index fc00976a45..8fee70af1f 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -24,11 +24,15 @@
-compile([export_all]).
--rabbit_upgrade({remove_user_scope, []}).
--rabbit_upgrade({hash_passwords, []}).
--rabbit_upgrade({add_ip_to_listener, []}).
--rabbit_upgrade({internal_exchanges, []}).
--rabbit_upgrade({user_to_internal_user, []}).
+-rabbit_upgrade({remove_user_scope, mnesia, []}).
+-rabbit_upgrade({hash_passwords, mnesia, []}).
+-rabbit_upgrade({add_ip_to_listener, mnesia, []}).
+-rabbit_upgrade({internal_exchanges, mnesia, []}).
+-rabbit_upgrade({user_to_internal_user, mnesia, []}).
+
+-rabbit_upgrade({one, mnesia, []}).
+-rabbit_upgrade({two, local, [one]}).
+-rabbit_upgrade({three, mnesia, [two]}).
%% -------------------------------------------------------------------
@@ -87,6 +91,27 @@ internal_exchanges() ->
|| T <- Tables ],
ok.
+one() ->
+ mnesia(
+ rabbit_user,
+ fun ({user, Username, Hash, IsAdmin}) ->
+ {user, Username, Hash, IsAdmin, foo}
+ end,
+ [username, password_hash, is_admin, extra]).
+
+two() ->
+ ok = rabbit_misc:write_term_file(filename:join(rabbit_mnesia:dir(), "test"),
+ [test]).
+
+three() ->
+ mnesia(
+ rabbit_user,
+ fun ({user, Username, Hash, IsAdmin, _}) ->
+ {user, Username, Hash, IsAdmin}
+ end,
+ [username, password_hash, is_admin]).
+
+
user_to_internal_user() ->
mnesia(
rabbit_user,