diff options
| -rw-r--r-- | src/rabbit_control.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 87 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 4 |
3 files changed, 72 insertions, 33 deletions
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 74581e1511..1bab7518b9 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -25,6 +25,7 @@ -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). -define(VHOST_OPT, "-p"). +-define(RAM_OPT, "--ram"). -define(GLOBAL_QUERIES, [{"Connections", rabbit_networking, connection_info_all, @@ -60,7 +61,8 @@ start() -> {[Command0 | Args], Opts} = case rabbit_misc:get_options([{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr}, - {option, ?VHOST_OPT, "/"}], + {option, ?VHOST_OPT, "/"}, + {flag, ?RAM_OPT}], init:get_plain_arguments()) of {[], _Opts} -> usage(); CmdArgsAndOpts -> CmdArgsAndOpts @@ -181,10 +183,18 @@ action(force_reset, Node, [], _Opts, Inform) -> call(Node, {rabbit_mnesia, force_reset, []}); action(cluster, Node, ClusterNodeSs, _Opts, Inform) -> + io:format("'cluster' is deprecated, please us 'join_cluster'.~n"), ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), + DiscNode = rabbit_mnesia:should_be_disc_node_legacy(ClusterNodes), Inform("Clustering node ~p with ~p", [Node, ClusterNodes]), - rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); + rpc_call(Node, rabbit_mnesia, cluster, [{ClusterNodes, DiscNode}]); + +action(join_cluster, Node, ClusterNodeSs, Opts, Inform) -> + ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), + DiscNode = not proplists:get_bool(?RAM_OPT, Opts), + Inform("Clustering node ~p with ~p", [Node, ClusterNodes]), + rpc_call(Node, rabbit_mnesia, cluster, [{ClusterNodes, DiscNode}]); action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index f8533b0481..0587daa3a2 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -24,7 +24,7 @@ create_cluster_nodes_config/1, read_cluster_nodes_config/0, record_running_nodes/0, read_previously_running_nodes/0, running_nodes_filename/0, is_disc_node/0, on_node_down/1, - on_node_up/1]). + on_node_up/1, should_be_disc_node_legacy/1]). -export([table_names/0]). @@ -38,15 +38,16 @@ -ifdef(use_specs). --export_type([node_type/0]). +-export_type([node_type/0, node_config/0]). -type(node_type() :: disc_only | disc | ram | unknown). +-type(node_config() :: {[node()], boolean()}). -spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | {'running_nodes', [node()]}]). -spec(dir/0 :: () -> file:filename()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). --spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok'). +-spec(init_db/3 :: (node_config(), boolean(), boolean()) -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). -spec(cluster/1 :: ([node()]) -> 'ok'). -spec(reset/0 :: () -> 'ok'). @@ -58,14 +59,15 @@ -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). --spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). --spec(read_cluster_nodes_config/0 :: () -> [node()]). +-spec(create_cluster_nodes_config/1 :: (node_config()) -> 'ok'). +-spec(read_cluster_nodes_config/0 :: () -> node_config()). -spec(record_running_nodes/0 :: () -> 'ok'). -spec(read_previously_running_nodes/0 :: () -> [node()]). -spec(running_nodes_filename/0 :: () -> file:filename()). -spec(is_disc_node/0 :: () -> boolean()). -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). +-spec(should_be_disc_node_legacy/1 :: ([node()]) -> boolean()). -spec(table_names/0 :: () -> [atom()]). @@ -95,8 +97,8 @@ status() -> init() -> ensure_mnesia_running(), ensure_mnesia_dir(), - Nodes = read_cluster_nodes_config(), - ok = init_db(Nodes, should_be_disc_node(Nodes)), + Config = {_, DiscNode} = read_cluster_nodes_config(), + ok = init_db(Config, DiscNode), %% We intuitively expect the global name server to be synced when %% Mnesia is up. In fact that's not guaranteed to be the case - let's %% make it so. @@ -113,20 +115,29 @@ is_db_empty() -> %% include the node itself if it is to be a disk rather than a ram %% node. If Force is false, only connections to online nodes are %% allowed. -cluster(ClusterNodes) -> +cluster({DiscoveryNodes, DiscNode}) -> + case is_only_disc_node(node(), false) andalso not DiscNode of + true -> throw({error, + {standalone_ram_node, + "You can't cluster a node if it's the only " + "disc node in its existing cluster."}}); + _ -> ok + end, + + %% TODO: Emit some warning/error if this node is in DiscoveryNodes + ProperDiscoveryNodes = DiscoveryNodes -- [node()], + ClusterNodes = case discover_cluster(ProperDiscoveryNodes) of + {ok, ClusterNodes0} -> ClusterNodes0; + {error, Reason} -> throw({error, Reason}) + end, + Config = {ClusterNodes, DiscNode}, + rabbit_misc:local_info_msg("Clustering with ~p~s~n", [ClusterNodes]), ensure_mnesia_not_running(), ensure_mnesia_dir(), - case is_clustered() andalso is_only_disc_node(node(), false) andalso - not should_be_disc_node(ClusterNodes) - of - true -> log_both("last running disc node leaving cluster"); - _ -> ok - end, - %% Wipe mnesia if we're changing type from disc to ram - case {is_disc_node(), should_be_disc_node(ClusterNodes)} of + case {is_disc_node(), DiscNode} of {true, false} -> rabbit_misc:with_local_io( fun () -> error_logger:warning_msg( "changing node type; wiping " @@ -158,8 +169,8 @@ cluster(ClusterNodes) -> %% Join the cluster start_mnesia(), try - ok = init_db(ClusterNodes, false), - ok = create_cluster_nodes_config(ClusterNodes) + ok = init_db(Config, false), + ok = create_cluster_nodes_config(Config) after stop_mnesia() end, @@ -182,6 +193,11 @@ all_clustered_nodes() -> running_clustered_nodes() -> mnesia:system_info(running_db_nodes). +running_clustered_disc_nodes() -> + RunningSet = sets:from_list(running_clustered_nodes()), + DiscSet = sets:from_list(nodes_of_type(disc_copies)), + sets:to_list(sets:intersection(RunningSet, DiscSet)). + empty_ram_only_tables() -> Node = node(), lists:foreach( @@ -193,6 +209,14 @@ empty_ram_only_tables() -> end, table_names()), ok. +discover_cluster([]) -> + {error, cannot_discover_cluster}; +discover_cluster([Node | Nodes]) -> + case rpc:call(Node, rabbit_mnesia, all_clustered_nodes, []) of + {badrpc, _Reason} -> discover_cluster(Nodes); + Res -> {ok, Res} + end. + %%-------------------------------------------------------------------- nodes_of_type(Type) -> @@ -429,9 +453,9 @@ check_tables(Fun) -> cluster_nodes_config_filename() -> dir() ++ "/cluster_nodes.config". -create_cluster_nodes_config(ClusterNodes) -> +create_cluster_nodes_config(Config) -> FileName = cluster_nodes_config_filename(), - case rabbit_file:write_term_file(FileName, [ClusterNodes]) of + case rabbit_file:write_term_file(FileName, [Config]) of ok -> ok; {error, Reason} -> throw({error, {cannot_create_cluster_nodes_config, @@ -439,12 +463,17 @@ create_cluster_nodes_config(ClusterNodes) -> end. read_cluster_nodes_config() -> + Convert = fun (Config = {_, _}) -> Config; + (ClusterNodes) -> + rabbit_log:warning("reading legacy node config"), + {ClusterNodes, should_be_disc_node_legacy(ClusterNodes)} + end, FileName = cluster_nodes_config_filename(), case rabbit_file:read_term_file(FileName) of - {ok, [ClusterNodes]} -> ClusterNodes; + {ok, [Config]} -> Convert(Config); {error, enoent} -> - {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes), - ClusterNodes; + {ok, Config} = application:get_env(rabbit, cluster_nodes), + Convert(Config); {error, Reason} -> throw({error, {cannot_read_cluster_nodes_config, FileName, Reason}}) @@ -495,7 +524,7 @@ init_db(ClusterNodes, Force) -> init_db(ClusterNodes, Force, true). %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. If Force is false, don't allow %% connections to offline nodes. -init_db(ClusterNodes, Force, Upgrade) -> +init_db({ClusterNodes, WantDiscNode}, Force, Upgrade) -> UClusterNodes = lists:usort(ClusterNodes), ProperClusterNodes = UClusterNodes -- [node()], case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of @@ -504,7 +533,6 @@ init_db(ClusterNodes, Force, Upgrade) -> "Mnesia could not connect to any disc nodes."}}); {ok, Nodes} -> WasDiscNode = is_disc_node(), - WantDiscNode = should_be_disc_node(ClusterNodes), %% We create a new db (on disk, or in ram) in the first %% two cases and attempt to upgrade the in the other two case {Nodes, WasDiscNode, WantDiscNode} of @@ -607,7 +635,10 @@ create_schema(Type) -> is_disc_node() -> mnesia:system_info(use_dir). -should_be_disc_node(ClusterNodes) -> +%% We're not using this anymore - the config stores whether the node is disc or +%% not - but we still need it when reading old-style configs and in +%% `rabbit_control'. +should_be_disc_node_legacy(ClusterNodes) -> ClusterNodes == [] orelse lists:member(node(), ClusterNodes). move_db() -> @@ -793,9 +824,7 @@ on_node_down(Node) -> end. is_only_disc_node(Node, _MnesiaRunning = true) -> - RunningSet = sets:from_list(running_clustered_nodes()), - DiscSet = sets:from_list(nodes_of_type(disc_copies)), - [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet)); + [Node] =:= running_clustered_disc_nodes(); is_only_disc_node(Node, false) -> start_mnesia(), Res = is_only_disc_node(Node, true), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 25efb91a13..273129904f 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -123,8 +123,8 @@ remove_backup() -> maybe_upgrade_mnesia() -> %% rabbit_mnesia:all_clustered_nodes/0 will return [] at this point %% if we are a RAM node since Mnesia has not started yet. - AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++ - rabbit_mnesia:read_cluster_nodes_config()), + {ClusterNodes, _DiscNode} = rabbit_mnesia:read_cluster_nodes_config(), + AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++ ClusterNodes), case rabbit_version:upgrades_required(mnesia) of {error, starting_from_scratch} -> ok; |
