diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-04-20 14:21:13 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-04-20 14:21:13 +0100 |
| commit | 8f4cfceb8ad933d453ab692e6386560e18c1960b (patch) | |
| tree | b8964a4d8dd36f6a8dd6801a8e63f31ee3b8bfaf | |
| parent | c15a4adf07441c7cd6f3819c296c192f4a54c5e4 (diff) | |
| parent | b809d2e314d523aa578bbed637ce4a35afb980c6 (diff) | |
| download | rabbitmq-server-git-8f4cfceb8ad933d453ab692e6386560e18c1960b.tar.gz | |
Merge branch 'stable' into rabbitmq-auth-backend-ldap-13
| -rwxr-xr-x | scripts/rabbitmq-server-ha.ocf | 1 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 111 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 12 |
4 files changed, 99 insertions, 31 deletions
diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf index 301f7a1fc7..06eeb50837 100755 --- a/scripts/rabbitmq-server-ha.ocf +++ b/scripts/rabbitmq-server-ha.ocf @@ -1463,6 +1463,7 @@ get_monitor() { # Rabbit is running but is not connected to master # Failing to avoid split brain ocf_log err "${LH} rabbit node is running out of the cluster" + stop_server_process rc=$OCF_ERR_GENERIC fi fi diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index afd0508aac..6a57f6bb2c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -423,6 +423,7 @@ cluster_status(WhichNodes) -> node_info() -> {rabbit_misc:otp_release(), rabbit_misc:version(), + mnesia:system_info(protocol_version), cluster_status_from_mnesia()}. node_type() -> @@ -593,26 +594,37 @@ check_cluster_consistency() -> end. check_cluster_consistency(Node, CheckNodesConsistency) -> - case rpc:call(Node, rabbit_mnesia, node_info, []) of + case remote_node_info(Node) of {badrpc, _Reason} -> {error, not_found}; - {_OTP, _Rabbit, {error, _}} -> + {_OTP, Rabbit, DelegateModuleHash, _Status} when is_binary(DelegateModuleHash) -> + %% when a delegate module .beam file hash is present + %% in the tuple, we are dealing with an old version + rabbit_version:version_error("Rabbit", rabbit_misc:version(), Rabbit); + {_OTP, _Rabbit, _Protocol, {error, _}} -> {error, not_found}; - {OTP, Rabbit, {ok, Status}} when CheckNodesConsistency -> - case check_consistency(OTP, Rabbit, Node, Status) of + {OTP, Rabbit, Protocol, {ok, Status}} when CheckNodesConsistency -> + case check_consistency(Node, OTP, Rabbit, Protocol, Status) of {error, _} = E -> E; {ok, Res} -> {ok, Res} end; - {OTP, Rabbit, {ok, Status}} -> - case check_consistency(OTP, Rabbit) of + {OTP, Rabbit, Protocol, {ok, Status}} -> + case check_consistency(Node, OTP, Rabbit, Protocol) of {error, _} = E -> E; ok -> {ok, Status} - end; - {_OTP, Rabbit, _Hash, _Status} -> - %% delegate hash checking implies version mismatch - rabbit_version:version_error("Rabbit", rabbit_misc:version(), Rabbit) + end + end. + +remote_node_info(Node) -> + case rpc:call(Node, rabbit_mnesia, node_info, []) of + {badrpc, _} = Error -> Error; + %% RabbitMQ prior to 3.6.2 + {OTP, Rabbit, Status} -> {OTP, Rabbit, unsupported, Status}; + %% RabbitMQ 3.6.2 or later + {OTP, Rabbit, Protocol, Status} -> {OTP, Rabbit, Protocol, Status} end. + %%-------------------------------------------------------------------- %% Hooks for `rabbit_node_monitor' %%-------------------------------------------------------------------- @@ -763,14 +775,14 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) -> Nodes end. -check_consistency(OTP, Rabbit) -> +check_consistency(Node, OTP, Rabbit, ProtocolVersion) -> rabbit_misc:sequence_error( - [rabbit_version:check_otp_consistency(OTP), + [check_mnesia_or_otp_consistency(Node, ProtocolVersion, OTP), check_rabbit_consistency(Rabbit)]). -check_consistency(OTP, Rabbit, Node, Status) -> +check_consistency(Node, OTP, Rabbit, ProtocolVersion, Status) -> rabbit_misc:sequence_error( - [rabbit_version:check_otp_consistency(OTP), + [check_mnesia_or_otp_consistency(Node, ProtocolVersion, OTP), check_rabbit_consistency(Rabbit), check_nodes_consistency(Node, Status)]). @@ -785,6 +797,55 @@ check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> [node(), Node, Node])}} end. +check_mnesia_or_otp_consistency(_Node, unsupported, OTP) -> + rabbit_version:check_otp_consistency(OTP); +check_mnesia_or_otp_consistency(Node, ProtocolVersion, _) -> + check_mnesia_consistency(Node, ProtocolVersion). + +check_mnesia_consistency(Node, ProtocolVersion) -> + % If mnesia is running we will just check protocol version + % If it's not running, we don't want it to join cluster until all checks pass + % so we start it without `dir` env variable to prevent + % joining cluster and/or corrupting data + with_running_or_clean_mnesia(fun() -> + case negotiate_protocol([Node]) of + [Node] -> ok; + [] -> + LocalVersion = mnesia:system_info(protocol_version), + {error, {inconsistent_cluster, + rabbit_misc:format("Mnesia protocol negotiation failed." + " Local version: ~p." + " Remote version ~p", + [LocalVersion, ProtocolVersion])}} + end + end). + +negotiate_protocol([Node]) -> + mnesia_monitor:negotiate_protocol([Node]). + +with_running_or_clean_mnesia(Fun) -> + IsMnesiaRunning = case mnesia:system_info(is_running) of + yes -> true; + no -> false; + stopping -> + ensure_mnesia_not_running(), + false; + starting -> + ensure_mnesia_running(), + true + end, + case IsMnesiaRunning of + true -> Fun(); + false -> + {ok, MnesiaDir} = application:get_env(mnesia, dir), + application:unset_env(mnesia, dir), + mnesia:start(), + Result = Fun(), + application:stop(mnesia), + application:set_env(mnesia, dir, MnesiaDir), + Result + end. + check_rabbit_consistency(Remote) -> rabbit_version:check_version_consistency( rabbit_misc:version(), Remote, "Rabbit", @@ -819,16 +880,20 @@ find_auto_cluster_node([Node | Nodes]) -> "Could not auto-cluster with ~s: " ++ Fmt, [Node | Args]), find_auto_cluster_node(Nodes) end, - case rpc:call(Node, rabbit_mnesia, node_info, []) of - {badrpc, _} = Reason -> Fail("~p~n", [Reason]); + case remote_node_info(Node) of + {badrpc, _} = Reason -> + Fail("~p~n", [Reason]); %% old delegate hash check - {_OTP, RMQ, _Hash, _} -> Fail("version ~s~n", [RMQ]); - {_OTP, _RMQ, {error, _} = E} -> Fail("~p~n", [E]); - {OTP, RMQ, _} -> case check_consistency(OTP, RMQ) of - {error, _} -> Fail("versions ~p~n", - [{OTP, RMQ}]); - ok -> {ok, Node} - end + {_OTP, RMQ, Hash, _} when is_binary(Hash) -> + Fail("version ~s~n", [RMQ]); + {_OTP, _RMQ, _Protocol, {error, _} = E} -> + Fail("~p~n", [E]); + {OTP, RMQ, Protocol, _} -> + case check_consistency(Node, OTP, RMQ, Protocol) of + {error, _} -> Fail("versions ~p~n", + [{OTP, RMQ}]); + ok -> {ok, Node} + end end. is_only_clustered_disc_node() -> diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 1f7e521dfd..d04551043e 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -242,8 +242,10 @@ update_policies(VHost) -> fun() -> [mnesia:lock({table, T}, write) || T <- Tabs], %% [1] case catch list(VHost) of - {error, {no_such_vhost, _}} -> - ok; %% [2] + {'EXIT', {throw, {error, {no_such_vhost, _}}}} -> + {[], []}; %% [2] + {'EXIT', Exit} -> + exit(Exit); Policies -> {[update_exchange(X, Policies) || X <- rabbit_exchange:list(VHost)], diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d5b090bed4..45dde112a5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -688,12 +688,12 @@ requeue(AckTags, #vqstate { mode = default, State2), MsgCount = length(MsgIds2), {MsgIds2, a(reduce_memory_use( - maybe_update_rates( + maybe_update_rates(ui( State3 #vqstate { delta = Delta1, q3 = Q3a, q4 = Q4a, in_counter = InCounter + MsgCount, - len = Len + MsgCount })))}; + len = Len + MsgCount }))))}; requeue(AckTags, #vqstate { mode = lazy, delta = Delta, q3 = Q3, @@ -706,11 +706,11 @@ requeue(AckTags, #vqstate { mode = lazy, State1), MsgCount = length(MsgIds1), {MsgIds1, a(reduce_memory_use( - maybe_update_rates( + maybe_update_rates(ui( State2 #vqstate { delta = Delta1, q3 = Q3a, in_counter = InCounter + MsgCount, - len = Len + MsgCount })))}. + len = Len + MsgCount }))))}. ackfold(MsgFun, Acc, State, AckTags) -> {AccN, StateN} = @@ -2124,7 +2124,7 @@ publish_alpha(MsgStatus, State) -> {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}. publish_beta(MsgStatus, State) -> - {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), + {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}. @@ -2161,7 +2161,7 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> {#msg_status { msg_id = MsgId } = MsgStatus, State1} = msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = - maybe_write_to_disk(true, true, MsgStatus, State1), + maybe_prepare_write_to_disk(true, true, MsgStatus, State1), {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], stats({1, -1}, {MsgStatus, none}, State2)} end, {Delta, MsgIds, State}, SeqIds). |
