summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_control.erl27
-rw-r--r--src/rabbit_mnesia.erl154
2 files changed, 133 insertions, 48 deletions
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 478155a1ef..e51c4c6a03 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -182,18 +182,23 @@ action(force_reset, Node, [], _Opts, Inform) ->
Inform("Forcefully resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, force_reset, []});
-action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
- io:format("'cluster' is deprecated, please us 'join_cluster'.~n"),
- ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
- DiscNode = rabbit_mnesia:should_be_disc_node(ClusterNodes),
- Inform("Clustering node ~p with ~p", [Node, ClusterNodes]),
- rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNodes, DiscNode]);
-
-action(join_cluster, Node, ClusterNodeSs, Opts, Inform) ->
- ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
+action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
DiscNode = not proplists:get_bool(?RAM_OPT, Opts),
- Inform("Clustering node ~p with ~p", [Node, ClusterNodes]),
- rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNodes, DiscNode]);
+ Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]);
+
+action(change_node_type, Node, ["ram"], _Opts, Inform) ->
+ Inform("Turning ~p into a ram node", [Node]),
+ rpc_call(Node, rabbit_mnesia, change_node_type, [ram]);
+action(change_node_type, Node, ["disc"], _Opts, Inform) ->
+ Inform("Turning ~p into a disc node", [Node]),
+ rpc_call(Node, rabbit_mnesia, change_node_type, [disc]);
+
+action(recluster, Node, [ClusterNodeS], _Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ Inform("Re-clustering ~p with ~p", [Node, ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, recluster, [ClusterNode]);
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 8cde170287..741392c8ee 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -24,7 +24,8 @@
wait_for_tables/1, initialize_cluster_nodes_status/0,
write_cluster_nodes_status/1, read_cluster_nodes_status/0,
update_cluster_nodes_status/0, is_disc_node/0, on_node_down/1,
- on_node_up/1, should_be_disc_node/1]).
+ on_node_up/1, should_be_disc_node/1, change_node_type/1,
+ recluster/1]).
-export([table_names/0]).
@@ -105,38 +106,40 @@ init() ->
%% actually cluster it. The nodes provided will be used to find out about the
%% nodes in the cluster.
%% This function will fail if:
-%%
+%%
%% * The node is currently the only disc node of its cluster
%% * We can't connect to any of the nodes provided
%% * The node is currently already clustered with the cluster of the nodes
%% provided
-%%
+%%
%% Note that we make no attempt to verify that the nodes provided are all in the
%% same cluster, we simply pick the first online node and we cluster to its
%% cluster.
-join_cluster(DiscoveryNodes, WantDiscNode) ->
+join_cluster(DiscoveryNode, WantDiscNode) ->
case is_disc_and_clustered() andalso is_only_disc_node(node()) of
true -> throw({error,
{standalone_ram_node,
"You can't cluster a node if it's the only "
- "disc node in its existing cluster."}});
+ "disc node in its existing cluster. If new nodes "
+ "joined while this node was offline, use \"recluster\" "
+ "to add them manually"}});
_ -> ok
end,
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
- ProperDiscoveryNodes = DiscoveryNodes -- [node()],
- {ClusterNodes, DiscNodes, _} =
- case discover_cluster(ProperDiscoveryNodes) of
+ Status = {ClusterNodes, DiscNodes, _} =
+ case discover_cluster(DiscoveryNode) of
{ok, Res} -> Res;
{error, Reason} -> throw({error, Reason})
end,
-
+
case lists:member(node(), ClusterNodes) of
true -> throw({error, {already_clustered,
"You are already clustered with the nodes you "
- "have selected."}});
+ "have selected"}}),
+ write_cluster_nodes_status(Status);
false -> ok
end,
@@ -149,12 +152,7 @@ join_cluster(DiscoveryNodes, WantDiscNode) ->
rabbit_misc:local_info_msg("Clustering with ~p~s~n", [ClusterNodes]),
%% Join the cluster
- start_mnesia(),
- try
- ok = init_db(DiscNodes, WantDiscNode, false)
- after
- stop_mnesia()
- end,
+ ok = start_and_init_db(DiscNodes, WantDiscNode, false),
ok.
@@ -212,6 +210,63 @@ reset(Force) ->
ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
ok.
+change_node_type(Type) ->
+ ensure_mnesia_dir(),
+ ensure_mnesia_not_running(),
+ case is_clustered() of
+ false -> throw({error, {not_clustered,
+ "Non-clustered nodes can only be disc nodes"}});
+ true -> ok
+ end,
+
+ check_cluster_consistency(),
+ DiscoveryNodes = all_clustered_nodes(),
+ ClusterNodes =
+ case discover_cluster(DiscoveryNodes) of
+ {ok, {ClusterNodes0, _, _}} ->
+ ClusterNodes0;
+ {error, _Reason} ->
+ throw({error,
+ {cannot_connect_to_cluster,
+ "Could not connect to the cluster nodes present in "
+ "this node status file. If the cluster has changed, "
+ "you can use the \"recluster\" command to point to the "
+ "new cluster nodes"}})
+ end,
+
+ WantDiscNode = case Type of
+ ram -> false;
+ disc -> true
+ end,
+
+ ok = start_and_init_db(ClusterNodes, WantDiscNode, false),
+
+ ok.
+
+recluster(DiscoveryNode) ->
+ ensure_mnesia_not_running(),
+ ensure_mnesia_dir(),
+
+ ClusterNodes =
+ case discover_cluster(DiscoveryNode) of
+ {ok, {ClusterNodes0, _, _}} ->
+ ClusterNodes0;
+ {error, _Reason} ->
+ throw({error,
+ {cannot_connect_to_node,
+ "Could not connect to the cluster node provided"}})
+ end,
+
+ case lists:member(node(), ClusterNodes) of
+ true -> start_and_init_db(ClusterNodes, is_disc_node(), false);
+ false -> throw({error,
+ {inconsistent_cluster,
+ "The nodes provided do not have this node as part of "
+ "the cluster"}})
+ end,
+
+ ok.
+
%%----------------------------------------------------------------------------
%% Queries
%%----------------------------------------------------------------------------
@@ -238,21 +293,21 @@ is_db_empty() ->
table_names()).
is_clustered() ->
- RunningNodes = running_clustered_nodes(),
- [node()] /= RunningNodes andalso [] /= RunningNodes.
+ Nodes = all_clustered_nodes(),
+ [node()] /= Nodes andalso [] /= Nodes.
is_disc_and_clustered() ->
is_disc_node() andalso is_clustered().
%% The situation with functions that retrieve the nodes in the cluster is
%% messy.
-%%
+%%
%% * If we want to get all the nodes or the running nodes, we can do that
%% while mnesia is offline *if* the node is a disc node. If the node is ram,
%% the result will always be [node()].
%% * If we want to get the cluster disc nodes (running or not), we need to
%% start mnesia in any case.
-%%
+%%
%% In the following functions we try to get the data from mnesia when we can,
%% otherwise we fall back to the cluster status file.
@@ -376,13 +431,19 @@ init_db(ClusterNodes, WantDiscNode, Force, Upgrade) ->
%% Subsequent node in cluster, catch up
ensure_version_ok(
rpc:call(AnotherNode, rabbit_version, recorded, [])),
- {CopyType, CopyTypeAlt} = case WantDiscNode of
- true -> {disc, disc_copies};
- false -> {ram, ram_copies}
- end,
ok = wait_for_replicated_tables(),
- ok = create_local_table_copy(schema, CopyTypeAlt),
- ok = create_local_table_copies(CopyType),
+
+ %% The sequence in which we delete the schema and then the
+ %% other tables is important: if we delete the schema first
+ %% when moving to RAM mnesia will loudly complain since it
+ %% doesn't make much sense to do that. But when moving to
+ %% disc, we need to move the schema first.
+ case WantDiscNode of
+ true -> create_local_table_copy(schema, disc_copies),
+ create_local_table_copies(disc);
+ false -> create_local_table_copies(ram),
+ create_local_table_copy(schema, ram_copies)
+ end,
%% Write the status now that mnesia is running and clustered
update_cluster_nodes_status(),
@@ -544,21 +605,21 @@ check_cluster_consistency() ->
[ThisNode, Node, Node])}})
end
end
- end, rabbit_mnesia:all_clustered_nodes()).
+ end, all_clustered_nodes()).
%%----------------------------------------------------------------------------
%% Cluster status file functions
%%----------------------------------------------------------------------------
%% The cluster node status file contains all we need to know about the cluster:
-%%
+%%
%% * All the clustered nodes
%% * The disc nodes
%% * The running nodes.
%%
%% If the current node is a disc node it will be included in the disc nodes
%% list.
-%%
+%%
%% We strive to keep the file up to date and we rely on this assumption in
%% various situations. Obviously when mnesia is offline the information we have
%% will be outdated, but it can't be otherwise.
@@ -643,14 +704,24 @@ on_node_down(Node) ->
%% Internal helpers
%%--------------------------------------------------------------------
-discover_cluster([]) ->
- {error, {cannot_discover_cluster,
- "The cluster nodes provided are either offline or not running."}};
-discover_cluster([Node | Nodes]) ->
- case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of
- {badrpc, _Reason} -> discover_cluster(Nodes);
- error -> discover_cluster(Nodes);
- {ok, Res} -> {ok, Res}
+discover_cluster(Nodes) when is_list(Nodes) ->
+ lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
+ (Node, {error, _}) -> discover_cluster(Node)
+ end,
+ {error, {cannot_discover_cluster,
+ "The nodes provided is either offline or not running"}},
+ Nodes);
+discover_cluster(Node) ->
+ case Node =:= node() of
+ true ->
+ {error, {cannot_discover_cluster,
+ "You provided the current node as node to cluster with"}};
+ false ->
+ case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of
+ {badrpc, _Reason} -> discover_cluster([]);
+ error -> discover_cluster([]);
+ {ok, Res} -> {ok, Res}
+ end
end.
nodes_of_type(Type) ->
@@ -968,3 +1039,12 @@ start_mnesia() ->
stop_mnesia() ->
stopped = mnesia:stop(),
ensure_mnesia_not_running().
+
+start_and_init_db(ClusterNodes, WantDiscNode, Force) ->
+ mnesia:start(),
+ try
+ ok = init_db(ClusterNodes, WantDiscNode, Force)
+ after
+ mnesia:stop()
+ end,
+ ok.