diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_control.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 154 |
2 files changed, 133 insertions, 48 deletions
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 478155a1ef..e51c4c6a03 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -182,18 +182,23 @@ action(force_reset, Node, [], _Opts, Inform) -> Inform("Forcefully resetting node ~p", [Node]), call(Node, {rabbit_mnesia, force_reset, []}); -action(cluster, Node, ClusterNodeSs, _Opts, Inform) -> - io:format("'cluster' is deprecated, please us 'join_cluster'.~n"), - ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), - DiscNode = rabbit_mnesia:should_be_disc_node(ClusterNodes), - Inform("Clustering node ~p with ~p", [Node, ClusterNodes]), - rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNodes, DiscNode]); - -action(join_cluster, Node, ClusterNodeSs, Opts, Inform) -> - ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), +action(join_cluster, Node, [ClusterNodeS], Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), DiscNode = not proplists:get_bool(?RAM_OPT, Opts), - Inform("Clustering node ~p with ~p", [Node, ClusterNodes]), - rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNodes, DiscNode]); + Inform("Clustering node ~p with ~p", [Node, ClusterNode]), + rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]); + +action(change_node_type, Node, ["ram"], _Opts, Inform) -> + Inform("Turning ~p into a ram node", [Node]), + rpc_call(Node, rabbit_mnesia, change_node_type, [ram]); +action(change_node_type, Node, ["disc"], _Opts, Inform) -> + Inform("Turning ~p into a disc node", [Node]), + rpc_call(Node, rabbit_mnesia, change_node_type, [disc]); + +action(recluster, Node, [ClusterNodeS], _Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + Inform("Re-clustering ~p with ~p", [Node, ClusterNode]), + rpc_call(Node, rabbit_mnesia, recluster, [ClusterNode]); action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 8cde170287..741392c8ee 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -24,7 +24,8 @@ wait_for_tables/1, initialize_cluster_nodes_status/0, write_cluster_nodes_status/1, read_cluster_nodes_status/0, update_cluster_nodes_status/0, is_disc_node/0, on_node_down/1, - on_node_up/1, should_be_disc_node/1]). + on_node_up/1, should_be_disc_node/1, change_node_type/1, + recluster/1]). -export([table_names/0]). @@ -105,38 +106,40 @@ init() -> %% actually cluster it. The nodes provided will be used to find out about the %% nodes in the cluster. %% This function will fail if: -%% +%% %% * The node is currently the only disc node of its cluster %% * We can't connect to any of the nodes provided %% * The node is currently already clustered with the cluster of the nodes %% provided -%% +%% %% Note that we make no attempt to verify that the nodes provided are all in the %% same cluster, we simply pick the first online node and we cluster to its %% cluster. -join_cluster(DiscoveryNodes, WantDiscNode) -> +join_cluster(DiscoveryNode, WantDiscNode) -> case is_disc_and_clustered() andalso is_only_disc_node(node()) of true -> throw({error, {standalone_ram_node, "You can't cluster a node if it's the only " - "disc node in its existing cluster."}}); + "disc node in its existing cluster. If new nodes " + "joined while this node was offline, use \"recluster\" " + "to add them manually"}}); _ -> ok end, ensure_mnesia_not_running(), ensure_mnesia_dir(), - ProperDiscoveryNodes = DiscoveryNodes -- [node()], - {ClusterNodes, DiscNodes, _} = - case discover_cluster(ProperDiscoveryNodes) of + Status = {ClusterNodes, DiscNodes, _} = + case discover_cluster(DiscoveryNode) of {ok, Res} -> Res; {error, Reason} -> throw({error, Reason}) end, - + case lists:member(node(), ClusterNodes) of true -> throw({error, {already_clustered, "You are already clustered with the nodes you " - "have selected."}}); + "have selected"}}), + write_cluster_nodes_status(Status); false -> ok end, @@ -149,12 +152,7 @@ join_cluster(DiscoveryNodes, WantDiscNode) -> rabbit_misc:local_info_msg("Clustering with ~p~s~n", [ClusterNodes]), %% Join the cluster - start_mnesia(), - try - ok = init_db(DiscNodes, WantDiscNode, false) - after - stop_mnesia() - end, + ok = start_and_init_db(DiscNodes, WantDiscNode, false), ok. @@ -212,6 +210,63 @@ reset(Force) -> ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), ok. +change_node_type(Type) -> + ensure_mnesia_dir(), + ensure_mnesia_not_running(), + case is_clustered() of + false -> throw({error, {not_clustered, + "Non-clustered nodes can only be disc nodes"}}); + true -> ok + end, + + check_cluster_consistency(), + DiscoveryNodes = all_clustered_nodes(), + ClusterNodes = + case discover_cluster(DiscoveryNodes) of + {ok, {ClusterNodes0, _, _}} -> + ClusterNodes0; + {error, _Reason} -> + throw({error, + {cannot_connect_to_cluster, + "Could not connect to the cluster nodes present in " + "this node status file. If the cluster has changed, " + "you can use the \"recluster\" command to point to the " + "new cluster nodes"}}) + end, + + WantDiscNode = case Type of + ram -> false; + disc -> true + end, + + ok = start_and_init_db(ClusterNodes, WantDiscNode, false), + + ok. + +recluster(DiscoveryNode) -> + ensure_mnesia_not_running(), + ensure_mnesia_dir(), + + ClusterNodes = + case discover_cluster(DiscoveryNode) of + {ok, {ClusterNodes0, _, _}} -> + ClusterNodes0; + {error, _Reason} -> + throw({error, + {cannot_connect_to_node, + "Could not connect to the cluster node provided"}}) + end, + + case lists:member(node(), ClusterNodes) of + true -> start_and_init_db(ClusterNodes, is_disc_node(), false); + false -> throw({error, + {inconsistent_cluster, + "The nodes provided do not have this node as part of " + "the cluster"}}) + end, + + ok. + %%---------------------------------------------------------------------------- %% Queries %%---------------------------------------------------------------------------- @@ -238,21 +293,21 @@ is_db_empty() -> table_names()). is_clustered() -> - RunningNodes = running_clustered_nodes(), - [node()] /= RunningNodes andalso [] /= RunningNodes. + Nodes = all_clustered_nodes(), + [node()] /= Nodes andalso [] /= Nodes. is_disc_and_clustered() -> is_disc_node() andalso is_clustered(). %% The situation with functions that retrieve the nodes in the cluster is %% messy. -%% +%% %% * If we want to get all the nodes or the running nodes, we can do that %% while mnesia is offline *if* the node is a disc node. If the node is ram, %% the result will always be [node()]. %% * If we want to get the cluster disc nodes (running or not), we need to %% start mnesia in any case. -%% +%% %% In the following functions we try to get the data from mnesia when we can, %% otherwise we fall back to the cluster status file. @@ -376,13 +431,19 @@ init_db(ClusterNodes, WantDiscNode, Force, Upgrade) -> %% Subsequent node in cluster, catch up ensure_version_ok( rpc:call(AnotherNode, rabbit_version, recorded, [])), - {CopyType, CopyTypeAlt} = case WantDiscNode of - true -> {disc, disc_copies}; - false -> {ram, ram_copies} - end, ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, CopyTypeAlt), - ok = create_local_table_copies(CopyType), + + %% The sequence in which we delete the schema and then the + %% other tables is important: if we delete the schema first + %% when moving to RAM mnesia will loudly complain since it + %% doesn't make much sense to do that. But when moving to + %% disc, we need to move the schema first. + case WantDiscNode of + true -> create_local_table_copy(schema, disc_copies), + create_local_table_copies(disc); + false -> create_local_table_copies(ram), + create_local_table_copy(schema, ram_copies) + end, %% Write the status now that mnesia is running and clustered update_cluster_nodes_status(), @@ -544,21 +605,21 @@ check_cluster_consistency() -> [ThisNode, Node, Node])}}) end end - end, rabbit_mnesia:all_clustered_nodes()). + end, all_clustered_nodes()). %%---------------------------------------------------------------------------- %% Cluster status file functions %%---------------------------------------------------------------------------- %% The cluster node status file contains all we need to know about the cluster: -%% +%% %% * All the clustered nodes %% * The disc nodes %% * The running nodes. %% %% If the current node is a disc node it will be included in the disc nodes %% list. -%% +%% %% We strive to keep the file up to date and we rely on this assumption in %% various situations. Obviously when mnesia is offline the information we have %% will be outdated, but it can't be otherwise. @@ -643,14 +704,24 @@ on_node_down(Node) -> %% Internal helpers %%-------------------------------------------------------------------- -discover_cluster([]) -> - {error, {cannot_discover_cluster, - "The cluster nodes provided are either offline or not running."}}; -discover_cluster([Node | Nodes]) -> - case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of - {badrpc, _Reason} -> discover_cluster(Nodes); - error -> discover_cluster(Nodes); - {ok, Res} -> {ok, Res} +discover_cluster(Nodes) when is_list(Nodes) -> + lists:foldl(fun (_, {ok, Res}) -> {ok, Res}; + (Node, {error, _}) -> discover_cluster(Node) + end, + {error, {cannot_discover_cluster, + "The nodes provided is either offline or not running"}}, + Nodes); +discover_cluster(Node) -> + case Node =:= node() of + true -> + {error, {cannot_discover_cluster, + "You provided the current node as node to cluster with"}}; + false -> + case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of + {badrpc, _Reason} -> discover_cluster([]); + error -> discover_cluster([]); + {ok, Res} -> {ok, Res} + end end. nodes_of_type(Type) -> @@ -968,3 +1039,12 @@ start_mnesia() -> stop_mnesia() -> stopped = mnesia:stop(), ensure_mnesia_not_running(). + +start_and_init_db(ClusterNodes, WantDiscNode, Force) -> + mnesia:start(), + try + ok = init_db(ClusterNodes, WantDiscNode, Force) + after + mnesia:stop() + end, + ok. |
