diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-05-18 12:01:09 +0100 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-05-18 12:01:09 +0100 |
| commit | f3de805963356e7f572dc538a45045fab4e280a4 (patch) | |
| tree | 61b153eb12106abf7fe32a616d810bbf217f1111 | |
| parent | e6a8155834034c60f100ddcf52fce62c792ed162 (diff) | |
| download | rabbitmq-server-git-f3de805963356e7f572dc538a45045fab4e280a4.tar.gz | |
moved functions around in rabbit_mnesia
| -rw-r--r-- | src/rabbit_mnesia.erl | 726 |
1 files changed, 379 insertions, 347 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 8427ae2add..8cde170287 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -45,53 +45,50 @@ -type(node_type() :: disc_only | disc | ram | unknown). -type(node_status() :: {[node()], [node()], [node()]}). --spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | - {'running_nodes', [node()]}]). --spec(dir/0 :: () -> file:filename()). --spec(ensure_mnesia_dir/0 :: () -> 'ok'). + +%% Main interface -spec(init/0 :: () -> 'ok'). --spec(init_db/4 :: ([node()], boolean(), boolean(), boolean()) -> 'ok'). --spec(is_db_empty/0 :: () -> boolean()). -spec(join_cluster/2 :: ([node()], boolean()) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). + +%% Various queries to get the status of the db +-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | + {'running_nodes', [node()]}]). +-spec(is_db_empty/0 :: () -> boolean()). -spec(is_clustered/0 :: () -> boolean()). -spec(running_clustered_nodes/0 :: () -> [node()]). -spec(all_clustered_nodes/0 :: () -> [node()]). +-spec(is_disc_node/0 :: () -> boolean()). +-spec(dir/0 :: () -> file:filename()). +-spec(table_names/0 :: () -> [atom()]). +-spec(cluster_status_if_running/0 :: () -> 'error' | node_status()). + +%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit' +-spec(init_db/4 :: ([node()], boolean(), boolean(), boolean()) -> 'ok'). +-spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). +-spec(should_be_disc_node/1 :: ([node()]) -> boolean()). +-spec(check_cluster_consistency/0 :: () -> 'ok' | no_return()). + +%% Functions to handle the cluster status file -spec(write_cluster_nodes_status/1 :: (node_status()) -> 'ok'). -spec(read_cluster_nodes_status/0 :: () -> node_status()). -spec(initialize_cluster_nodes_status/0 :: () -> 'ok'). -spec(update_cluster_nodes_status/0 :: () -> 'ok'). --spec(is_disc_node/0 :: () -> boolean()). + +%% Hooks used in `rabbit_node_monitor' -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(should_be_disc_node/1 :: ([node()]) -> boolean()). - --spec(table_names/0 :: () -> [atom()]). -endif. %%---------------------------------------------------------------------------- - -status() -> - [{nodes, case mnesia:system_info(is_running) of - yes -> [{Key, Nodes} || - {Key, CopyType} <- [{disc_only, disc_only_copies}, - {disc, disc_copies}, - {ram, ram_copies}], - begin - Nodes = nodes_of_type(CopyType), - Nodes =/= [] - end]; - no -> [{unknown, all_clustered_nodes()}]; - Reason when Reason =:= starting; Reason =:= stopping -> - exit({rabbit_busy, try_again_later}) - end}, - {running_nodes, running_clustered_nodes()}]. +%% Main interface +%%---------------------------------------------------------------------------- init() -> ensure_mnesia_running(), @@ -104,10 +101,6 @@ init() -> ok = global:sync(), ok. -is_db_empty() -> - lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, - table_names()). - %% Make the node join a cluster. The node will be reset automatically before we %% actually cluster it. The nodes provided will be used to find out about the %% nodes in the cluster. @@ -171,6 +164,79 @@ join_cluster(DiscoveryNodes, WantDiscNode) -> reset() -> reset(false). force_reset() -> reset(true). +reset(Force) -> + rabbit_misc:local_info_msg("Resetting Rabbit~s~n", + [if Force -> " forcefully"; + true -> "" + end]), + ensure_mnesia_not_running(), + case not Force andalso is_disc_and_clustered() andalso + is_only_disc_node(node()) + of + true -> throw({error, {standalone_ram_node, + "You can't reset a node if it's the only disc " + "node in a cluster. Please convert another node" + " of the cluster to a disc node first."}}); + false -> ok + end, + Node = node(), + Nodes = all_clustered_nodes(), + case Force of + true -> + ok; + false -> + ensure_mnesia_dir(), + start_mnesia(), + %% Reconnecting so that we will get an up to date RunningNodes + RunningNodes = + try + %% Force=true here so that reset still works when clustered + %% with a node which is down + {_, DiscNodes, _} = read_cluster_nodes_status(), + ok = init_db(DiscNodes, should_be_disc_node(DiscNodes), + true), + running_clustered_nodes() + after + stop_mnesia() + end, + leave_cluster(Nodes, RunningNodes), + rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), + cannot_delete_schema) + end, + %% We need to make sure that we don't end up in a distributed Erlang system + %% with nodes while not being in an Mnesia cluster with them. We don't + %% handle that well. + [erlang:disconnect_node(N) || N <- Nodes], + ok = reset_cluster_nodes_status(), + %% remove persisted messages and any other garbage we find + ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), + ok. + +%%---------------------------------------------------------------------------- +%% Queries +%%---------------------------------------------------------------------------- + +status() -> + [{nodes, case mnesia:system_info(is_running) of + yes -> [{Key, Nodes} || + {Key, CopyType} <- [{disc_only, disc_only_copies}, + {disc, disc_copies}, + {ram, ram_copies}], + begin + Nodes = nodes_of_type(CopyType), + Nodes =/= [] + end]; + no -> [{unknown, all_clustered_nodes()}]; + Reason when Reason =:= starting; Reason =:= stopping -> + exit({rabbit_busy, try_again_later}) + end}, + {running_nodes, running_clustered_nodes()}]. + + +is_db_empty() -> + lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, + table_names()). + is_clustered() -> RunningNodes = running_clustered_nodes(), [node()] /= RunningNodes andalso [] /= RunningNodes. @@ -178,7 +244,7 @@ is_clustered() -> is_disc_and_clustered() -> is_disc_node() andalso is_clustered(). -%% The situations with functions that retrieve the nodes in the cluster is +%% The situation with functions that retrieve the nodes in the cluster is %% messy. %% %% * If we want to get all the nodes or the running nodes, we can do that @@ -264,6 +330,149 @@ cluster_status_if_running() -> mnesia:system_info(running_db_nodes)} end). +is_disc_node() -> mnesia:system_info(use_dir). + +dir() -> mnesia:system_info(directory). + +table_names() -> + [Tab || {Tab, _} <- table_definitions()]. + +%%---------------------------------------------------------------------------- +%% Operations on the db +%%---------------------------------------------------------------------------- + +init_db(ClusterNodes, WantDiscNode, Force) -> + init_db(ClusterNodes, WantDiscNode, Force, true). + +%% Take a cluster node config and create the right kind of node - a +%% standalone disk node, or disk or ram node connected to the +%% specified cluster nodes. If Force is false, don't allow +%% connections to offline nodes. +init_db(ClusterNodes, WantDiscNode, Force, Upgrade) -> + UClusterNodes = lists:usort(ClusterNodes), + ProperClusterNodes = UClusterNodes -- [node()], + case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of + {ok, []} when not Force andalso ProperClusterNodes =/= [] -> + throw({error, {failed_to_cluster_with, ProperClusterNodes, + "Mnesia could not connect to any disc nodes."}}); + {ok, Nodes} -> + WasDiscNode = is_disc_node(), + %% We create a new db (on disk, or in ram) in the first + %% two cases and attempt to upgrade the in the other two + case {Nodes, WasDiscNode, WantDiscNode} of + {[], _, false} -> + %% New ram node; start from scratch + ok = create_schema(ram); + {[], false, true} -> + %% Nothing there at all, start from scratch + ok = create_schema(disc); + {[], true, true} -> + %% We're the first node up + case rabbit_upgrade:maybe_upgrade_local() of + ok -> ensure_schema_integrity(); + version_not_available -> ok = schema_ok_or_move() + end; + {[AnotherNode|_], _, _} -> + %% Subsequent node in cluster, catch up + ensure_version_ok( + rpc:call(AnotherNode, rabbit_version, recorded, [])), + {CopyType, CopyTypeAlt} = case WantDiscNode of + true -> {disc, disc_copies}; + false -> {ram, ram_copies} + end, + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, CopyTypeAlt), + ok = create_local_table_copies(CopyType), + + %% Write the status now that mnesia is running and clustered + update_cluster_nodes_status(), + + ok = case Upgrade of + true -> + case rabbit_upgrade:maybe_upgrade_local() of + ok -> + ok; + %% If we're just starting up a new node we + %% won't have a version + starting_from_scratch -> + rabbit_version:record_desired() + end; + false -> + ok + end, + + %% We've taken down mnesia, so ram nodes will need + %% to re-sync + case is_disc_node() of + false -> start_mnesia(), + mnesia:change_config(extra_db_nodes, + ProperClusterNodes), + wait_for_replicated_tables(); + true -> ok + end, + + ensure_schema_integrity(), + ok + end; + {error, Reason} -> + %% one reason we may end up here is if we try to join + %% nodes together that are currently running standalone or + %% are members of a different cluster + throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) + end. + +ensure_mnesia_dir() -> + MnesiaDir = dir() ++ "/", + case filelib:ensure_dir(MnesiaDir) of + {error, Reason} -> + throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); + ok -> + ok + end. + +ensure_mnesia_running() -> + case mnesia:system_info(is_running) of + yes -> + ok; + starting -> + wait_for(mnesia_running), + ensure_mnesia_running(); + Reason when Reason =:= no; Reason =:= stopping -> + throw({error, mnesia_not_running}) + end. + +ensure_mnesia_not_running() -> + case mnesia:system_info(is_running) of + no -> + ok; + stopping -> + wait_for(mnesia_not_running), + ensure_mnesia_not_running(); + Reason when Reason =:= yes; Reason =:= starting -> + throw({error, mnesia_unexpectedly_running}) + end. + +ensure_schema_integrity() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + throw({error, {schema_integrity_check_failed, Reason}}) + end. + +check_schema_integrity() -> + Tables = mnesia:system_info(tables), + case check_tables(fun (Tab, TabDef) -> + case lists:member(Tab, Tables) of + false -> {error, {table_missing, Tab}}; + true -> check_table_attributes(Tab, TabDef) + end + end) of + ok -> ok = wait_for_tables(), + check_tables(fun check_table_content/2); + Other -> Other + end. + empty_ram_only_tables() -> Node = node(), lists:foreach( @@ -275,16 +484,42 @@ empty_ram_only_tables() -> end, table_names()), ok. -discover_cluster([]) -> - {error, {cannot_discover_cluster, - "The cluster nodes provided are either offline or not running."}}; -discover_cluster([Node | Nodes]) -> - case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of - {badrpc, _Reason} -> discover_cluster(Nodes); - error -> discover_cluster(Nodes); - {ok, Res} -> {ok, Res} +create_tables() -> create_tables(disc). + +create_tables(Type) -> + lists:foreach(fun ({Tab, TabDef}) -> + TabDef1 = proplists:delete(match, TabDef), + case mnesia:create_table(Tab, TabDef1) of + {atomic, ok} -> ok; + {aborted, Reason} -> + throw({error, {table_creation_failed, + Tab, TabDef1, Reason}}) + end + end, + table_definitions(Type)), + ok. + +copy_db(Destination) -> + ok = ensure_mnesia_not_running(), + rabbit_file:recursive_copy(dir(), Destination). + +wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). + +wait_for_tables() -> wait_for_tables(table_names()). + +wait_for_tables(TableNames) -> + case mnesia:wait_for_tables(TableNames, 30000) of + ok -> + ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); + {error, Reason} -> + throw({error, {failed_waiting_for_tables, Reason}}) end. +should_be_disc_node(DiscNodes) -> + DiscNodes == [] orelse lists:member(node(), DiscNodes). + %% This does not guarantee us much, but it avoids some situations that will %% definitely end in disaster (a node starting and trying to merge its schema %% to another node which is not clustered with it). @@ -311,8 +546,113 @@ check_cluster_consistency() -> end end, rabbit_mnesia:all_clustered_nodes()). +%%---------------------------------------------------------------------------- +%% Cluster status file functions +%%---------------------------------------------------------------------------- + +%% The cluster node status file contains all we need to know about the cluster: +%% +%% * All the clustered nodes +%% * The disc nodes +%% * The running nodes. +%% +%% If the current node is a disc node it will be included in the disc nodes +%% list. +%% +%% We strive to keep the file up to date and we rely on this assumption in +%% various situations. Obviously when mnesia is offline the information we have +%% will be outdated, but it can't be otherwise. + +cluster_nodes_status_filename() -> + dir() ++ "/cluster_nodes.config". + +%% Creates a status file with the default data (one disc node), only if an +%% existing cluster does not exist. +initialize_cluster_nodes_status() -> + try read_cluster_nodes_status() of + _ -> ok + catch + throw:{error, {cannot_read_cluster_nodes_status, _, enoent}} -> + write_cluster_nodes_status({[node()], [node()], [node()]}) + end. + +write_cluster_nodes_status(Status) -> + FileName = cluster_nodes_status_filename(), + case rabbit_file:write_term_file(FileName, [Status]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_write_cluster_nodes_status, + FileName, Reason}}) + end. + +read_cluster_nodes_status() -> + FileName = cluster_nodes_status_filename(), + case rabbit_file:read_term_file(FileName) of + {ok, [{_, _, _} = Status]} -> Status; + {error, Reason} -> + throw({error, {cannot_read_cluster_nodes_status, FileName, Reason}}) + end. + +reset_cluster_nodes_status() -> + FileName = cluster_nodes_status_filename(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> + throw({error, {cannot_delete_cluster_nodes_status, + FileName, Reason}}) + end, + write_cluster_nodes_status({[node()], [node()], [node()]}). + +%% To update the cluster status when mnesia is running. +update_cluster_nodes_status() -> + {ok, Status} = cluster_status_if_running(), + write_cluster_nodes_status(Status). + +%% The cluster config contains the nodes that the node should try to contact to +%% form a cluster, and whether the node should be a disc node. When starting the +%% database, if the nodes in the cluster status are the initial ones, we try to +%% read the cluster config. +read_cluster_nodes_config() -> + {AllNodes, DiscNodes, _} = read_cluster_nodes_status(), + Node = node(), + case AllNodes of + [Node] -> {ok, Config} = application:get_env(rabbit, cluster_nodes), + Config; + _ -> {AllNodes, should_be_disc_node(DiscNodes)} + end. + +%%-------------------------------------------------------------------- +%% Hooks for `rabbit_node_monitor' %%-------------------------------------------------------------------- +on_node_up(Node) -> + update_cluster_nodes_status(), + case is_only_disc_node(Node) of + true -> rabbit_log:info("cluster contains disc nodes again~n"); + false -> ok + end. + +on_node_down(Node) -> + case is_only_disc_node(Node) of + true -> rabbit_log:info("only running disc node went down~n"); + false -> ok + end. + +%%-------------------------------------------------------------------- +%% Internal helpers +%%-------------------------------------------------------------------- + +discover_cluster([]) -> + {error, {cannot_discover_cluster, + "The cluster nodes provided are either offline or not running."}}; +discover_cluster([Node | Nodes]) -> + case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of + {badrpc, _Reason} -> discover_cluster(Nodes); + error -> discover_cluster(Nodes); + {ok, Res} -> {ok, Res} + end. + nodes_of_type(Type) -> %% This function should return the nodes of a certain type (ram, %% disc or disc_only) in the current cluster. The type of nodes @@ -438,68 +778,11 @@ queue_name_match() -> resource_match(Kind) -> #resource{kind = Kind, _='_'}. -table_names() -> - [Tab || {Tab, _} <- table_definitions()]. - replicated_table_names() -> [Tab || {Tab, TabDef} <- table_definitions(), not lists:member({local_content, true}, TabDef) ]. -dir() -> mnesia:system_info(directory). - -ensure_mnesia_dir() -> - MnesiaDir = dir() ++ "/", - case filelib:ensure_dir(MnesiaDir) of - {error, Reason} -> - throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); - ok -> - ok - end. - -ensure_mnesia_running() -> - case mnesia:system_info(is_running) of - yes -> - ok; - starting -> - wait_for(mnesia_running), - ensure_mnesia_running(); - Reason when Reason =:= no; Reason =:= stopping -> - throw({error, mnesia_not_running}) - end. - -ensure_mnesia_not_running() -> - case mnesia:system_info(is_running) of - no -> - ok; - stopping -> - wait_for(mnesia_not_running), - ensure_mnesia_not_running(); - Reason when Reason =:= yes; Reason =:= starting -> - throw({error, mnesia_unexpectedly_running}) - end. - -ensure_schema_integrity() -> - case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - throw({error, {schema_integrity_check_failed, Reason}}) - end. - -check_schema_integrity() -> - Tables = mnesia:system_info(tables), - case check_tables(fun (Tab, TabDef) -> - case lists:member(Tab, Tables) of - false -> {error, {table_missing, Tab}}; - true -> check_table_attributes(Tab, TabDef) - end - end) of - ok -> ok = wait_for_tables(), - check_tables(fun check_table_content/2); - Other -> Other - end. - check_table_attributes(Tab, TabDef) -> {_, ExpAttrs} = proplists:lookup(attributes, TabDef), case mnesia:table_info(Tab, attributes) of @@ -535,158 +818,6 @@ check_tables(Fun) -> Errors -> {error, Errors} end. -%% The cluster node status file contains all we need to know about the cluster: -%% -%% * All the clustered nodes -%% * The disc nodes -%% * The running nodes. -%% -%% If the current node is a disc node it will be included in the disc nodes -%% list. -%% -%% We strive to keep the file up to date and we rely on this assumption in -%% various situations. Obviously when mnesia is offline the information we have -%% will be outdated, but it can't be otherwise. - -cluster_nodes_status_filename() -> - dir() ++ "/cluster_nodes.config". - -%% Creates a status file with the default data (one disc node), only if an -%% existing cluster does not exist. -initialize_cluster_nodes_status() -> - try read_cluster_nodes_status() of - _ -> ok - catch - throw:{error, {cannot_read_cluster_nodes_status, _, enoent}} -> - write_cluster_nodes_status({[node()], [node()], [node()]}) - end. - -write_cluster_nodes_status(Status) -> - FileName = cluster_nodes_status_filename(), - case rabbit_file:write_term_file(FileName, [Status]) of - ok -> ok; - {error, Reason} -> - throw({error, {cannot_write_cluster_nodes_status, - FileName, Reason}}) - end. - -read_cluster_nodes_status() -> - FileName = cluster_nodes_status_filename(), - case rabbit_file:read_term_file(FileName) of - {ok, [{_, _, _} = Status]} -> Status; - {error, Reason} -> - throw({error, {cannot_read_cluster_nodes_status, FileName, Reason}}) - end. - -reset_cluster_nodes_status() -> - FileName = cluster_nodes_status_filename(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> - throw({error, {cannot_delete_cluster_nodes_status, - FileName, Reason}}) - end, - write_cluster_nodes_status({[node()], [node()], [node()]}). - -%% To update the cluster status when mnesia is running. -update_cluster_nodes_status() -> - {ok, Status} = cluster_status_if_running(), - write_cluster_nodes_status(Status). - -%% The cluster config contains the nodes that the node should try to contact to -%% form a cluster, and whether the node should be a disc node. When starting the -%% database, if the nodes in the cluster status are the initial ones, we try to -%% read the cluster config. -read_cluster_nodes_config() -> - {AllNodes, DiscNodes, _} = read_cluster_nodes_status(), - Node = node(), - case AllNodes of - [Node] -> {ok, Config} = application:get_env(rabbit, cluster_nodes), - Config; - _ -> {AllNodes, should_be_disc_node(DiscNodes)} - end. - -init_db(ClusterNodes, WantDiscNode, Force) -> - init_db(ClusterNodes, WantDiscNode, Force, true). - -%% Take a cluster node config and create the right kind of node - a -%% standalone disk node, or disk or ram node connected to the -%% specified cluster nodes. If Force is false, don't allow -%% connections to offline nodes. -init_db(ClusterNodes, WantDiscNode, Force, Upgrade) -> - UClusterNodes = lists:usort(ClusterNodes), - ProperClusterNodes = UClusterNodes -- [node()], - case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of - {ok, []} when not Force andalso ProperClusterNodes =/= [] -> - throw({error, {failed_to_cluster_with, ProperClusterNodes, - "Mnesia could not connect to any disc nodes."}}); - {ok, Nodes} -> - WasDiscNode = is_disc_node(), - %% We create a new db (on disk, or in ram) in the first - %% two cases and attempt to upgrade the in the other two - case {Nodes, WasDiscNode, WantDiscNode} of - {[], _, false} -> - %% New ram node; start from scratch - ok = create_schema(ram); - {[], false, true} -> - %% Nothing there at all, start from scratch - ok = create_schema(disc); - {[], true, true} -> - %% We're the first node up - case rabbit_upgrade:maybe_upgrade_local() of - ok -> ensure_schema_integrity(); - version_not_available -> ok = schema_ok_or_move() - end; - {[AnotherNode|_], _, _} -> - %% Subsequent node in cluster, catch up - ensure_version_ok( - rpc:call(AnotherNode, rabbit_version, recorded, [])), - {CopyType, CopyTypeAlt} = case WantDiscNode of - true -> {disc, disc_copies}; - false -> {ram, ram_copies} - end, - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, CopyTypeAlt), - ok = create_local_table_copies(CopyType), - - %% Write the status now that mnesia is running and clustered - update_cluster_nodes_status(), - - ok = case Upgrade of - true -> - case rabbit_upgrade:maybe_upgrade_local() of - ok -> - ok; - %% If we're just starting up a new node we - %% won't have a version - starting_from_scratch -> - rabbit_version:record_desired() - end; - false -> - ok - end, - - %% We've taken down mnesia, so ram nodes will need - %% to re-sync - case is_disc_node() of - false -> start_mnesia(), - mnesia:change_config(extra_db_nodes, - ProperClusterNodes), - wait_for_replicated_tables(); - true -> ok - end, - - ensure_schema_integrity(), - ok - end; - {error, Reason} -> - %% one reason we may end up here is if we try to join - %% nodes together that are currently running standalone or - %% are members of a different cluster - throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) - end. - schema_ok_or_move() -> case check_schema_integrity() of ok -> @@ -725,11 +856,6 @@ create_schema(Type) -> ensure_schema_integrity(), ok = rabbit_version:record_desired(). -is_disc_node() -> mnesia:system_info(use_dir). - -should_be_disc_node(DiscNodes) -> - DiscNodes == [] orelse lists:member(node(), DiscNodes). - move_db() -> stop_mnesia(), MnesiaDir = filename:dirname(dir() ++ "/"), @@ -751,25 +877,6 @@ move_db() -> start_mnesia(), ok. -copy_db(Destination) -> - ok = ensure_mnesia_not_running(), - rabbit_file:recursive_copy(dir(), Destination). - -create_tables() -> create_tables(disc). - -create_tables(Type) -> - lists:foreach(fun ({Tab, TabDef}) -> - TabDef1 = proplists:delete(match, TabDef), - case mnesia:create_table(Tab, TabDef1) of - {atomic, ok} -> ok; - {aborted, Reason} -> - throw({error, {table_creation_failed, - Tab, TabDef1, Reason}}) - end - end, - table_definitions(Type)), - ok. - copy_type_to_ram(TabDef) -> [{disc_copies, []}, {ram_copies, [node()]} | proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))]. @@ -818,68 +925,6 @@ create_local_table_copy(Tab, Type) -> end, ok. -wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). - -wait_for_tables() -> wait_for_tables(table_names()). - -wait_for_tables(TableNames) -> - case mnesia:wait_for_tables(TableNames, 30000) of - ok -> - ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); - {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) - end. - -reset(Force) -> - rabbit_misc:local_info_msg("Resetting Rabbit~s~n", - [if Force -> " forcefully"; - true -> "" - end]), - ensure_mnesia_not_running(), - case not Force andalso is_disc_and_clustered() andalso - is_only_disc_node(node()) - of - true -> throw({error, {standalone_ram_node, - "You can't reset a node if it's the only disc " - "node in a cluster. Please convert another node" - " of the cluster to a disc node first."}}); - false -> ok - end, - Node = node(), - Nodes = all_clustered_nodes(), - case Force of - true -> - ok; - false -> - ensure_mnesia_dir(), - start_mnesia(), - %% Reconnecting so that we will get an up to date RunningNodes - RunningNodes = - try - %% Force=true here so that reset still works when clustered - %% with a node which is down - {_, DiscNodes, _} = read_cluster_nodes_status(), - ok = init_db(DiscNodes, should_be_disc_node(DiscNodes), - true), - running_clustered_nodes() - after - stop_mnesia() - end, - leave_cluster(Nodes, RunningNodes), - rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), - cannot_delete_schema) - end, - %% We need to make sure that we don't end up in a distributed Erlang system - %% with nodes while not being in an Mnesia cluster with them. We don't - %% handle that well. - [erlang:disconnect_node(N) || N <- Nodes], - ok = reset_cluster_nodes_status(), - %% remove persisted messages and any other garbage we find - ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), - ok. - leave_cluster([], _) -> ok; leave_cluster(Nodes, RunningNodes0) -> case RunningNodes0 -- [node()] of @@ -912,19 +957,6 @@ wait_for(Condition) -> error_logger:info_msg("Waiting for ~p...~n", [Condition]), timer:sleep(1000). -on_node_up(Node) -> - update_cluster_nodes_status(), - case is_only_disc_node(Node) of - true -> rabbit_log:info("cluster contains disc nodes again~n"); - false -> ok - end. - -on_node_down(Node) -> - case is_only_disc_node(Node) of - true -> rabbit_log:info("only running disc node went down~n"); - false -> ok - end. - is_only_disc_node(Node) -> Nodes = running_clustered_disc_nodes(), [Node] =:= Nodes. |
