diff options
| -rw-r--r-- | src/rabbit_mnesia.erl | 92 |
1 files changed, 71 insertions, 21 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 6106a110d8..96c9bffdc8 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -423,6 +423,7 @@ cluster_status(WhichNodes) -> node_info() -> {rabbit_misc:otp_release(), rabbit_misc:version(), + mnesia:system_info(protocol_version), cluster_status_from_mnesia()}. node_type() -> @@ -596,21 +597,21 @@ check_cluster_consistency(Node, CheckNodesConsistency) -> case rpc:call(Node, rabbit_mnesia, node_info, []) of {badrpc, _Reason} -> {error, not_found}; - {_OTP, _Rabbit, {error, _}} -> + {_OTP, Rabbit, Hash, _Status} when is_binary(Hash) -> + %% delegate hash checking implies version mismatch + rabbit_version:version_error("Rabbit", rabbit_misc:version(), Rabbit); + {_OTP, _Rabbit, _Protocol, {error, _}} -> {error, not_found}; - {OTP, Rabbit, {ok, Status}} when CheckNodesConsistency -> - case check_consistency(OTP, Rabbit, Node, Status) of + {_OTP, Rabbit, Protocol, {ok, Status}} when CheckNodesConsistency -> + case check_consistency(Node, Rabbit, Protocol, Status) of {error, _} = E -> E; {ok, Res} -> {ok, Res} end; - {OTP, Rabbit, {ok, Status}} -> - case check_consistency(OTP, Rabbit) of + {_OTP, Rabbit, Protocol, {ok, Status}} -> + case check_consistency(Node, Rabbit, Protocol) of {error, _} = E -> E; ok -> {ok, Status} - end; - {_OTP, Rabbit, _Hash, _Status} -> - %% delegate hash checking implies version mismatch - rabbit_version:version_error("Rabbit", rabbit_misc:version(), Rabbit) + end end. %%-------------------------------------------------------------------- @@ -763,13 +764,15 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) -> Nodes end. -check_consistency(OTP, Rabbit) -> +check_consistency(Node, Rabbit, ProtocolVersion) -> rabbit_misc:sequence_error( - [check_rabbit_consistency(Rabbit)]). + [check_mnesia_consistency(Node, ProtocolVersion), + check_rabbit_consistency(Rabbit)]). -check_consistency(OTP, Rabbit, Node, Status) -> +check_consistency(Node, Rabbit, ProtocolVersion, Status) -> rabbit_misc:sequence_error( - [check_rabbit_consistency(Rabbit), + [check_mnesia_consistency(Node, ProtocolVersion), + check_rabbit_consistency(Rabbit), check_nodes_consistency(Node, Status)]). check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> @@ -783,6 +786,49 @@ check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> [node(), Node, Node])}} end. +check_mnesia_consistency(Node, ProtocolVersion) -> + % If mnesia is running we will just check protocol version + % If it's not running, we don't want it to join cluster until all checks pass + % so we start it without `dir` env variable to prevent + % joining cluster and/or corrupting data + with_running_or_clean_mnesia(fun() -> + case negotiate_protocol([Node]) of + [Node] -> ok; + [] -> + LocalVersion = mnesia:system_info(protocol_version), + {error, {inconsistent_cluster, + rabbit_misc:format("Mnesia protocol negotiation failed." + " Local version: ~p." + " Remote version ~p", + [LocalVersion, ProtocolVersion])}} + end + end). + +negotiate_protocol([Node]) -> + mnesia_monitor:negotiate_protocol([Node]). + +with_running_or_clean_mnesia(Fun) -> + MnesiaRunning = case mnesia:system_info(is_running) of + stopping -> + ensure_mnesia_not_running(), + no; + starting -> + ensure_mnesia_running(), + yes; + Other -> Other + end, + case MnesiaRunning of + yes -> Fun(); + no -> + {ok, MnesiaDir} = application:get_env(mnesia, dir), + application:unset_env(mnesia, dir), + mnesia:start(), + Result = Fun(), + application:stop(mnesia), + application:set_env(mnesia, dir, MnesiaDir), + Result + end. + check_rabbit_consistency(Remote) -> rabbit_version:check_version_consistency( rabbit_misc:version(), Remote, "Rabbit", @@ -818,15 +864,19 @@ find_auto_cluster_node([Node | Nodes]) -> find_auto_cluster_node(Nodes) end, case rpc:call(Node, rabbit_mnesia, node_info, []) of - {badrpc, _} = Reason -> Fail("~p~n", [Reason]); + {badrpc, _} = Reason -> + Fail("~p~n", [Reason]); %% old delegate hash check - {_OTP, RMQ, _Hash, _} -> Fail("version ~s~n", [RMQ]); - {_OTP, _RMQ, {error, _} = E} -> Fail("~p~n", [E]); - {OTP, RMQ, _} -> case check_consistency(OTP, RMQ) of - {error, _} -> Fail("versions ~p~n", - [{OTP, RMQ}]); - ok -> {ok, Node} - end + {_OTP, RMQ, Hash, _} when is_binary(Hash) -> + Fail("version ~s~n", [RMQ]); + {_OTP, _RMQ, _Protocol, {error, _} = E} -> + Fail("~p~n", [E]); + {OTP, RMQ, Protocol, _} -> + case check_consistency(Node, RMQ, Protocol) of + {error, _} -> Fail("versions ~p~n", + [{OTP, RMQ}]); + ok -> {ok, Node} + end end. is_only_clustered_disc_node() -> |
