summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-05-18 12:01:09 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-05-18 12:01:09 +0100
commitf3de805963356e7f572dc538a45045fab4e280a4 (patch)
tree61b153eb12106abf7fe32a616d810bbf217f1111 /src
parente6a8155834034c60f100ddcf52fce62c792ed162 (diff)
downloadrabbitmq-server-git-f3de805963356e7f572dc538a45045fab4e280a4.tar.gz
moved functions around in rabbit_mnesia
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mnesia.erl726
1 files changed, 379 insertions, 347 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 8427ae2add..8cde170287 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -45,53 +45,50 @@
-type(node_type() :: disc_only | disc | ram | unknown).
-type(node_status() :: {[node()], [node()], [node()]}).
--spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]}]).
--spec(dir/0 :: () -> file:filename()).
--spec(ensure_mnesia_dir/0 :: () -> 'ok').
+
+%% Main interface
-spec(init/0 :: () -> 'ok').
--spec(init_db/4 :: ([node()], boolean(), boolean(), boolean()) -> 'ok').
--spec(is_db_empty/0 :: () -> boolean()).
-spec(join_cluster/2 :: ([node()], boolean()) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
+
+%% Various queries to get the status of the db
+-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
+ {'running_nodes', [node()]}]).
+-spec(is_db_empty/0 :: () -> boolean()).
-spec(is_clustered/0 :: () -> boolean()).
-spec(running_clustered_nodes/0 :: () -> [node()]).
-spec(all_clustered_nodes/0 :: () -> [node()]).
+-spec(is_disc_node/0 :: () -> boolean()).
+-spec(dir/0 :: () -> file:filename()).
+-spec(table_names/0 :: () -> [atom()]).
+-spec(cluster_status_if_running/0 :: () -> 'error' | node_status()).
+
+%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit'
+-spec(init_db/4 :: ([node()], boolean(), boolean(), boolean()) -> 'ok').
+-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
+-spec(should_be_disc_node/1 :: ([node()]) -> boolean()).
+-spec(check_cluster_consistency/0 :: () -> 'ok' | no_return()).
+
+%% Functions to handle the cluster status file
-spec(write_cluster_nodes_status/1 :: (node_status()) -> 'ok').
-spec(read_cluster_nodes_status/0 :: () -> node_status()).
-spec(initialize_cluster_nodes_status/0 :: () -> 'ok').
-spec(update_cluster_nodes_status/0 :: () -> 'ok').
--spec(is_disc_node/0 :: () -> boolean()).
+
+%% Hooks used in `rabbit_node_monitor'
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(should_be_disc_node/1 :: ([node()]) -> boolean()).
-
--spec(table_names/0 :: () -> [atom()]).
-endif.
%%----------------------------------------------------------------------------
-
-status() ->
- [{nodes, case mnesia:system_info(is_running) of
- yes -> [{Key, Nodes} ||
- {Key, CopyType} <- [{disc_only, disc_only_copies},
- {disc, disc_copies},
- {ram, ram_copies}],
- begin
- Nodes = nodes_of_type(CopyType),
- Nodes =/= []
- end];
- no -> [{unknown, all_clustered_nodes()}];
- Reason when Reason =:= starting; Reason =:= stopping ->
- exit({rabbit_busy, try_again_later})
- end},
- {running_nodes, running_clustered_nodes()}].
+%% Main interface
+%%----------------------------------------------------------------------------
init() ->
ensure_mnesia_running(),
@@ -104,10 +101,6 @@ init() ->
ok = global:sync(),
ok.
-is_db_empty() ->
- lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
- table_names()).
-
%% Make the node join a cluster. The node will be reset automatically before we
%% actually cluster it. The nodes provided will be used to find out about the
%% nodes in the cluster.
@@ -171,6 +164,79 @@ join_cluster(DiscoveryNodes, WantDiscNode) ->
reset() -> reset(false).
force_reset() -> reset(true).
+reset(Force) ->
+ rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
+ [if Force -> " forcefully";
+ true -> ""
+ end]),
+ ensure_mnesia_not_running(),
+ case not Force andalso is_disc_and_clustered() andalso
+ is_only_disc_node(node())
+ of
+ true -> throw({error, {standalone_ram_node,
+ "You can't reset a node if it's the only disc "
+ "node in a cluster. Please convert another node"
+ " of the cluster to a disc node first."}});
+ false -> ok
+ end,
+ Node = node(),
+ Nodes = all_clustered_nodes(),
+ case Force of
+ true ->
+ ok;
+ false ->
+ ensure_mnesia_dir(),
+ start_mnesia(),
+ %% Reconnecting so that we will get an up to date RunningNodes
+ RunningNodes =
+ try
+ %% Force=true here so that reset still works when clustered
+ %% with a node which is down
+ {_, DiscNodes, _} = read_cluster_nodes_status(),
+ ok = init_db(DiscNodes, should_be_disc_node(DiscNodes),
+ true),
+ running_clustered_nodes()
+ after
+ stop_mnesia()
+ end,
+ leave_cluster(Nodes, RunningNodes),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
+ cannot_delete_schema)
+ end,
+ %% We need to make sure that we don't end up in a distributed Erlang system
+ %% with nodes while not being in an Mnesia cluster with them. We don't
+ %% handle that well.
+ [erlang:disconnect_node(N) || N <- Nodes],
+ ok = reset_cluster_nodes_status(),
+ %% remove persisted messages and any other garbage we find
+ ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
+ ok.
+
+%%----------------------------------------------------------------------------
+%% Queries
+%%----------------------------------------------------------------------------
+
+status() ->
+ [{nodes, case mnesia:system_info(is_running) of
+ yes -> [{Key, Nodes} ||
+ {Key, CopyType} <- [{disc_only, disc_only_copies},
+ {disc, disc_copies},
+ {ram, ram_copies}],
+ begin
+ Nodes = nodes_of_type(CopyType),
+ Nodes =/= []
+ end];
+ no -> [{unknown, all_clustered_nodes()}];
+ Reason when Reason =:= starting; Reason =:= stopping ->
+ exit({rabbit_busy, try_again_later})
+ end},
+ {running_nodes, running_clustered_nodes()}].
+
+
+is_db_empty() ->
+ lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
+ table_names()).
+
is_clustered() ->
RunningNodes = running_clustered_nodes(),
[node()] /= RunningNodes andalso [] /= RunningNodes.
@@ -178,7 +244,7 @@ is_clustered() ->
is_disc_and_clustered() ->
is_disc_node() andalso is_clustered().
-%% The situations with functions that retrieve the nodes in the cluster is
+%% The situation with functions that retrieve the nodes in the cluster is
%% messy.
%%
%% * If we want to get all the nodes or the running nodes, we can do that
@@ -264,6 +330,149 @@ cluster_status_if_running() ->
mnesia:system_info(running_db_nodes)}
end).
+is_disc_node() -> mnesia:system_info(use_dir).
+
+dir() -> mnesia:system_info(directory).
+
+table_names() ->
+ [Tab || {Tab, _} <- table_definitions()].
+
+%%----------------------------------------------------------------------------
+%% Operations on the db
+%%----------------------------------------------------------------------------
+
+init_db(ClusterNodes, WantDiscNode, Force) ->
+ init_db(ClusterNodes, WantDiscNode, Force, true).
+
+%% Take a cluster node config and create the right kind of node - a
+%% standalone disk node, or disk or ram node connected to the
+%% specified cluster nodes. If Force is false, don't allow
+%% connections to offline nodes.
+init_db(ClusterNodes, WantDiscNode, Force, Upgrade) ->
+ UClusterNodes = lists:usort(ClusterNodes),
+ ProperClusterNodes = UClusterNodes -- [node()],
+ case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
+ {ok, []} when not Force andalso ProperClusterNodes =/= [] ->
+ throw({error, {failed_to_cluster_with, ProperClusterNodes,
+ "Mnesia could not connect to any disc nodes."}});
+ {ok, Nodes} ->
+ WasDiscNode = is_disc_node(),
+ %% We create a new db (on disk, or in ram) in the first
+ %% two cases and attempt to upgrade the in the other two
+ case {Nodes, WasDiscNode, WantDiscNode} of
+ {[], _, false} ->
+ %% New ram node; start from scratch
+ ok = create_schema(ram);
+ {[], false, true} ->
+ %% Nothing there at all, start from scratch
+ ok = create_schema(disc);
+ {[], true, true} ->
+ %% We're the first node up
+ case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ensure_schema_integrity();
+ version_not_available -> ok = schema_ok_or_move()
+ end;
+ {[AnotherNode|_], _, _} ->
+ %% Subsequent node in cluster, catch up
+ ensure_version_ok(
+ rpc:call(AnotherNode, rabbit_version, recorded, [])),
+ {CopyType, CopyTypeAlt} = case WantDiscNode of
+ true -> {disc, disc_copies};
+ false -> {ram, ram_copies}
+ end,
+ ok = wait_for_replicated_tables(),
+ ok = create_local_table_copy(schema, CopyTypeAlt),
+ ok = create_local_table_copies(CopyType),
+
+ %% Write the status now that mnesia is running and clustered
+ update_cluster_nodes_status(),
+
+ ok = case Upgrade of
+ true ->
+ case rabbit_upgrade:maybe_upgrade_local() of
+ ok ->
+ ok;
+ %% If we're just starting up a new node we
+ %% won't have a version
+ starting_from_scratch ->
+ rabbit_version:record_desired()
+ end;
+ false ->
+ ok
+ end,
+
+ %% We've taken down mnesia, so ram nodes will need
+ %% to re-sync
+ case is_disc_node() of
+ false -> start_mnesia(),
+ mnesia:change_config(extra_db_nodes,
+ ProperClusterNodes),
+ wait_for_replicated_tables();
+ true -> ok
+ end,
+
+ ensure_schema_integrity(),
+ ok
+ end;
+ {error, Reason} ->
+ %% one reason we may end up here is if we try to join
+ %% nodes together that are currently running standalone or
+ %% are members of a different cluster
+ throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
+ end.
+
+ensure_mnesia_dir() ->
+ MnesiaDir = dir() ++ "/",
+ case filelib:ensure_dir(MnesiaDir) of
+ {error, Reason} ->
+ throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
+ ok ->
+ ok
+ end.
+
+ensure_mnesia_running() ->
+ case mnesia:system_info(is_running) of
+ yes ->
+ ok;
+ starting ->
+ wait_for(mnesia_running),
+ ensure_mnesia_running();
+ Reason when Reason =:= no; Reason =:= stopping ->
+ throw({error, mnesia_not_running})
+ end.
+
+ensure_mnesia_not_running() ->
+ case mnesia:system_info(is_running) of
+ no ->
+ ok;
+ stopping ->
+ wait_for(mnesia_not_running),
+ ensure_mnesia_not_running();
+ Reason when Reason =:= yes; Reason =:= starting ->
+ throw({error, mnesia_unexpectedly_running})
+ end.
+
+ensure_schema_integrity() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ throw({error, {schema_integrity_check_failed, Reason}})
+ end.
+
+check_schema_integrity() ->
+ Tables = mnesia:system_info(tables),
+ case check_tables(fun (Tab, TabDef) ->
+ case lists:member(Tab, Tables) of
+ false -> {error, {table_missing, Tab}};
+ true -> check_table_attributes(Tab, TabDef)
+ end
+ end) of
+ ok -> ok = wait_for_tables(),
+ check_tables(fun check_table_content/2);
+ Other -> Other
+ end.
+
empty_ram_only_tables() ->
Node = node(),
lists:foreach(
@@ -275,16 +484,42 @@ empty_ram_only_tables() ->
end, table_names()),
ok.
-discover_cluster([]) ->
- {error, {cannot_discover_cluster,
- "The cluster nodes provided are either offline or not running."}};
-discover_cluster([Node | Nodes]) ->
- case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of
- {badrpc, _Reason} -> discover_cluster(Nodes);
- error -> discover_cluster(Nodes);
- {ok, Res} -> {ok, Res}
+create_tables() -> create_tables(disc).
+
+create_tables(Type) ->
+ lists:foreach(fun ({Tab, TabDef}) ->
+ TabDef1 = proplists:delete(match, TabDef),
+ case mnesia:create_table(Tab, TabDef1) of
+ {atomic, ok} -> ok;
+ {aborted, Reason} ->
+ throw({error, {table_creation_failed,
+ Tab, TabDef1, Reason}})
+ end
+ end,
+ table_definitions(Type)),
+ ok.
+
+copy_db(Destination) ->
+ ok = ensure_mnesia_not_running(),
+ rabbit_file:recursive_copy(dir(), Destination).
+
+wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
+
+wait_for_tables() -> wait_for_tables(table_names()).
+
+wait_for_tables(TableNames) ->
+ case mnesia:wait_for_tables(TableNames, 30000) of
+ ok ->
+ ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
+ {error, Reason} ->
+ throw({error, {failed_waiting_for_tables, Reason}})
end.
+should_be_disc_node(DiscNodes) ->
+ DiscNodes == [] orelse lists:member(node(), DiscNodes).
+
%% This does not guarantee us much, but it avoids some situations that will
%% definitely end in disaster (a node starting and trying to merge its schema
%% to another node which is not clustered with it).
@@ -311,8 +546,113 @@ check_cluster_consistency() ->
end
end, rabbit_mnesia:all_clustered_nodes()).
+%%----------------------------------------------------------------------------
+%% Cluster status file functions
+%%----------------------------------------------------------------------------
+
+%% The cluster node status file contains all we need to know about the cluster:
+%%
+%% * All the clustered nodes
+%% * The disc nodes
+%% * The running nodes.
+%%
+%% If the current node is a disc node it will be included in the disc nodes
+%% list.
+%%
+%% We strive to keep the file up to date and we rely on this assumption in
+%% various situations. Obviously when mnesia is offline the information we have
+%% will be outdated, but it can't be otherwise.
+
+cluster_nodes_status_filename() ->
+ dir() ++ "/cluster_nodes.config".
+
+%% Creates a status file with the default data (one disc node), only if an
+%% existing cluster does not exist.
+initialize_cluster_nodes_status() ->
+ try read_cluster_nodes_status() of
+ _ -> ok
+ catch
+ throw:{error, {cannot_read_cluster_nodes_status, _, enoent}} ->
+ write_cluster_nodes_status({[node()], [node()], [node()]})
+ end.
+
+write_cluster_nodes_status(Status) ->
+ FileName = cluster_nodes_status_filename(),
+ case rabbit_file:write_term_file(FileName, [Status]) of
+ ok -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_write_cluster_nodes_status,
+ FileName, Reason}})
+ end.
+
+read_cluster_nodes_status() ->
+ FileName = cluster_nodes_status_filename(),
+ case rabbit_file:read_term_file(FileName) of
+ {ok, [{_, _, _} = Status]} -> Status;
+ {error, Reason} ->
+ throw({error, {cannot_read_cluster_nodes_status, FileName, Reason}})
+ end.
+
+reset_cluster_nodes_status() ->
+ FileName = cluster_nodes_status_filename(),
+ case file:delete(FileName) of
+ ok -> ok;
+ {error, enoent} -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_delete_cluster_nodes_status,
+ FileName, Reason}})
+ end,
+ write_cluster_nodes_status({[node()], [node()], [node()]}).
+
+%% To update the cluster status when mnesia is running.
+update_cluster_nodes_status() ->
+ {ok, Status} = cluster_status_if_running(),
+ write_cluster_nodes_status(Status).
+
+%% The cluster config contains the nodes that the node should try to contact to
+%% form a cluster, and whether the node should be a disc node. When starting the
+%% database, if the nodes in the cluster status are the initial ones, we try to
+%% read the cluster config.
+read_cluster_nodes_config() ->
+ {AllNodes, DiscNodes, _} = read_cluster_nodes_status(),
+ Node = node(),
+ case AllNodes of
+ [Node] -> {ok, Config} = application:get_env(rabbit, cluster_nodes),
+ Config;
+ _ -> {AllNodes, should_be_disc_node(DiscNodes)}
+ end.
+
+%%--------------------------------------------------------------------
+%% Hooks for `rabbit_node_monitor'
%%--------------------------------------------------------------------
+on_node_up(Node) ->
+ update_cluster_nodes_status(),
+ case is_only_disc_node(Node) of
+ true -> rabbit_log:info("cluster contains disc nodes again~n");
+ false -> ok
+ end.
+
+on_node_down(Node) ->
+ case is_only_disc_node(Node) of
+ true -> rabbit_log:info("only running disc node went down~n");
+ false -> ok
+ end.
+
+%%--------------------------------------------------------------------
+%% Internal helpers
+%%--------------------------------------------------------------------
+
+discover_cluster([]) ->
+ {error, {cannot_discover_cluster,
+ "The cluster nodes provided are either offline or not running."}};
+discover_cluster([Node | Nodes]) ->
+ case rpc:call(Node, rabbit_mnesia, cluster_status_if_running, []) of
+ {badrpc, _Reason} -> discover_cluster(Nodes);
+ error -> discover_cluster(Nodes);
+ {ok, Res} -> {ok, Res}
+ end.
+
nodes_of_type(Type) ->
%% This function should return the nodes of a certain type (ram,
%% disc or disc_only) in the current cluster. The type of nodes
@@ -438,68 +778,11 @@ queue_name_match() ->
resource_match(Kind) ->
#resource{kind = Kind, _='_'}.
-table_names() ->
- [Tab || {Tab, _} <- table_definitions()].
-
replicated_table_names() ->
[Tab || {Tab, TabDef} <- table_definitions(),
not lists:member({local_content, true}, TabDef)
].
-dir() -> mnesia:system_info(directory).
-
-ensure_mnesia_dir() ->
- MnesiaDir = dir() ++ "/",
- case filelib:ensure_dir(MnesiaDir) of
- {error, Reason} ->
- throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
- ok ->
- ok
- end.
-
-ensure_mnesia_running() ->
- case mnesia:system_info(is_running) of
- yes ->
- ok;
- starting ->
- wait_for(mnesia_running),
- ensure_mnesia_running();
- Reason when Reason =:= no; Reason =:= stopping ->
- throw({error, mnesia_not_running})
- end.
-
-ensure_mnesia_not_running() ->
- case mnesia:system_info(is_running) of
- no ->
- ok;
- stopping ->
- wait_for(mnesia_not_running),
- ensure_mnesia_not_running();
- Reason when Reason =:= yes; Reason =:= starting ->
- throw({error, mnesia_unexpectedly_running})
- end.
-
-ensure_schema_integrity() ->
- case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- throw({error, {schema_integrity_check_failed, Reason}})
- end.
-
-check_schema_integrity() ->
- Tables = mnesia:system_info(tables),
- case check_tables(fun (Tab, TabDef) ->
- case lists:member(Tab, Tables) of
- false -> {error, {table_missing, Tab}};
- true -> check_table_attributes(Tab, TabDef)
- end
- end) of
- ok -> ok = wait_for_tables(),
- check_tables(fun check_table_content/2);
- Other -> Other
- end.
-
check_table_attributes(Tab, TabDef) ->
{_, ExpAttrs} = proplists:lookup(attributes, TabDef),
case mnesia:table_info(Tab, attributes) of
@@ -535,158 +818,6 @@ check_tables(Fun) ->
Errors -> {error, Errors}
end.
-%% The cluster node status file contains all we need to know about the cluster:
-%%
-%% * All the clustered nodes
-%% * The disc nodes
-%% * The running nodes.
-%%
-%% If the current node is a disc node it will be included in the disc nodes
-%% list.
-%%
-%% We strive to keep the file up to date and we rely on this assumption in
-%% various situations. Obviously when mnesia is offline the information we have
-%% will be outdated, but it can't be otherwise.
-
-cluster_nodes_status_filename() ->
- dir() ++ "/cluster_nodes.config".
-
-%% Creates a status file with the default data (one disc node), only if an
-%% existing cluster does not exist.
-initialize_cluster_nodes_status() ->
- try read_cluster_nodes_status() of
- _ -> ok
- catch
- throw:{error, {cannot_read_cluster_nodes_status, _, enoent}} ->
- write_cluster_nodes_status({[node()], [node()], [node()]})
- end.
-
-write_cluster_nodes_status(Status) ->
- FileName = cluster_nodes_status_filename(),
- case rabbit_file:write_term_file(FileName, [Status]) of
- ok -> ok;
- {error, Reason} ->
- throw({error, {cannot_write_cluster_nodes_status,
- FileName, Reason}})
- end.
-
-read_cluster_nodes_status() ->
- FileName = cluster_nodes_status_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [{_, _, _} = Status]} -> Status;
- {error, Reason} ->
- throw({error, {cannot_read_cluster_nodes_status, FileName, Reason}})
- end.
-
-reset_cluster_nodes_status() ->
- FileName = cluster_nodes_status_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} ->
- throw({error, {cannot_delete_cluster_nodes_status,
- FileName, Reason}})
- end,
- write_cluster_nodes_status({[node()], [node()], [node()]}).
-
-%% To update the cluster status when mnesia is running.
-update_cluster_nodes_status() ->
- {ok, Status} = cluster_status_if_running(),
- write_cluster_nodes_status(Status).
-
-%% The cluster config contains the nodes that the node should try to contact to
-%% form a cluster, and whether the node should be a disc node. When starting the
-%% database, if the nodes in the cluster status are the initial ones, we try to
-%% read the cluster config.
-read_cluster_nodes_config() ->
- {AllNodes, DiscNodes, _} = read_cluster_nodes_status(),
- Node = node(),
- case AllNodes of
- [Node] -> {ok, Config} = application:get_env(rabbit, cluster_nodes),
- Config;
- _ -> {AllNodes, should_be_disc_node(DiscNodes)}
- end.
-
-init_db(ClusterNodes, WantDiscNode, Force) ->
- init_db(ClusterNodes, WantDiscNode, Force, true).
-
-%% Take a cluster node config and create the right kind of node - a
-%% standalone disk node, or disk or ram node connected to the
-%% specified cluster nodes. If Force is false, don't allow
-%% connections to offline nodes.
-init_db(ClusterNodes, WantDiscNode, Force, Upgrade) ->
- UClusterNodes = lists:usort(ClusterNodes),
- ProperClusterNodes = UClusterNodes -- [node()],
- case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
- {ok, []} when not Force andalso ProperClusterNodes =/= [] ->
- throw({error, {failed_to_cluster_with, ProperClusterNodes,
- "Mnesia could not connect to any disc nodes."}});
- {ok, Nodes} ->
- WasDiscNode = is_disc_node(),
- %% We create a new db (on disk, or in ram) in the first
- %% two cases and attempt to upgrade the in the other two
- case {Nodes, WasDiscNode, WantDiscNode} of
- {[], _, false} ->
- %% New ram node; start from scratch
- ok = create_schema(ram);
- {[], false, true} ->
- %% Nothing there at all, start from scratch
- ok = create_schema(disc);
- {[], true, true} ->
- %% We're the first node up
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ensure_schema_integrity();
- version_not_available -> ok = schema_ok_or_move()
- end;
- {[AnotherNode|_], _, _} ->
- %% Subsequent node in cluster, catch up
- ensure_version_ok(
- rpc:call(AnotherNode, rabbit_version, recorded, [])),
- {CopyType, CopyTypeAlt} = case WantDiscNode of
- true -> {disc, disc_copies};
- false -> {ram, ram_copies}
- end,
- ok = wait_for_replicated_tables(),
- ok = create_local_table_copy(schema, CopyTypeAlt),
- ok = create_local_table_copies(CopyType),
-
- %% Write the status now that mnesia is running and clustered
- update_cluster_nodes_status(),
-
- ok = case Upgrade of
- true ->
- case rabbit_upgrade:maybe_upgrade_local() of
- ok ->
- ok;
- %% If we're just starting up a new node we
- %% won't have a version
- starting_from_scratch ->
- rabbit_version:record_desired()
- end;
- false ->
- ok
- end,
-
- %% We've taken down mnesia, so ram nodes will need
- %% to re-sync
- case is_disc_node() of
- false -> start_mnesia(),
- mnesia:change_config(extra_db_nodes,
- ProperClusterNodes),
- wait_for_replicated_tables();
- true -> ok
- end,
-
- ensure_schema_integrity(),
- ok
- end;
- {error, Reason} ->
- %% one reason we may end up here is if we try to join
- %% nodes together that are currently running standalone or
- %% are members of a different cluster
- throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
- end.
-
schema_ok_or_move() ->
case check_schema_integrity() of
ok ->
@@ -725,11 +856,6 @@ create_schema(Type) ->
ensure_schema_integrity(),
ok = rabbit_version:record_desired().
-is_disc_node() -> mnesia:system_info(use_dir).
-
-should_be_disc_node(DiscNodes) ->
- DiscNodes == [] orelse lists:member(node(), DiscNodes).
-
move_db() ->
stop_mnesia(),
MnesiaDir = filename:dirname(dir() ++ "/"),
@@ -751,25 +877,6 @@ move_db() ->
start_mnesia(),
ok.
-copy_db(Destination) ->
- ok = ensure_mnesia_not_running(),
- rabbit_file:recursive_copy(dir(), Destination).
-
-create_tables() -> create_tables(disc).
-
-create_tables(Type) ->
- lists:foreach(fun ({Tab, TabDef}) ->
- TabDef1 = proplists:delete(match, TabDef),
- case mnesia:create_table(Tab, TabDef1) of
- {atomic, ok} -> ok;
- {aborted, Reason} ->
- throw({error, {table_creation_failed,
- Tab, TabDef1, Reason}})
- end
- end,
- table_definitions(Type)),
- ok.
-
copy_type_to_ram(TabDef) ->
[{disc_copies, []}, {ram_copies, [node()]}
| proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))].
@@ -818,68 +925,6 @@ create_local_table_copy(Tab, Type) ->
end,
ok.
-wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
-
-wait_for_tables() -> wait_for_tables(table_names()).
-
-wait_for_tables(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
- ok ->
- ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
- end.
-
-reset(Force) ->
- rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
- [if Force -> " forcefully";
- true -> ""
- end]),
- ensure_mnesia_not_running(),
- case not Force andalso is_disc_and_clustered() andalso
- is_only_disc_node(node())
- of
- true -> throw({error, {standalone_ram_node,
- "You can't reset a node if it's the only disc "
- "node in a cluster. Please convert another node"
- " of the cluster to a disc node first."}});
- false -> ok
- end,
- Node = node(),
- Nodes = all_clustered_nodes(),
- case Force of
- true ->
- ok;
- false ->
- ensure_mnesia_dir(),
- start_mnesia(),
- %% Reconnecting so that we will get an up to date RunningNodes
- RunningNodes =
- try
- %% Force=true here so that reset still works when clustered
- %% with a node which is down
- {_, DiscNodes, _} = read_cluster_nodes_status(),
- ok = init_db(DiscNodes, should_be_disc_node(DiscNodes),
- true),
- running_clustered_nodes()
- after
- stop_mnesia()
- end,
- leave_cluster(Nodes, RunningNodes),
- rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
- cannot_delete_schema)
- end,
- %% We need to make sure that we don't end up in a distributed Erlang system
- %% with nodes while not being in an Mnesia cluster with them. We don't
- %% handle that well.
- [erlang:disconnect_node(N) || N <- Nodes],
- ok = reset_cluster_nodes_status(),
- %% remove persisted messages and any other garbage we find
- ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
- ok.
-
leave_cluster([], _) -> ok;
leave_cluster(Nodes, RunningNodes0) ->
case RunningNodes0 -- [node()] of
@@ -912,19 +957,6 @@ wait_for(Condition) ->
error_logger:info_msg("Waiting for ~p...~n", [Condition]),
timer:sleep(1000).
-on_node_up(Node) ->
- update_cluster_nodes_status(),
- case is_only_disc_node(Node) of
- true -> rabbit_log:info("cluster contains disc nodes again~n");
- false -> ok
- end.
-
-on_node_down(Node) ->
- case is_only_disc_node(Node) of
- true -> rabbit_log:info("only running disc node went down~n");
- false -> ok
- end.
-
is_only_disc_node(Node) ->
Nodes = running_clustered_disc_nodes(),
[Node] =:= Nodes.