summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2016-04-20 14:21:13 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2016-04-20 14:21:13 +0100
commit8f4cfceb8ad933d453ab692e6386560e18c1960b (patch)
treeb8964a4d8dd36f6a8dd6801a8e63f31ee3b8bfaf /src
parentc15a4adf07441c7cd6f3819c296c192f4a54c5e4 (diff)
parentb809d2e314d523aa578bbed637ce4a35afb980c6 (diff)
downloadrabbitmq-server-git-8f4cfceb8ad933d453ab692e6386560e18c1960b.tar.gz
Merge branch 'stable' into rabbitmq-auth-backend-ldap-13
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mnesia.erl111
-rw-r--r--src/rabbit_policy.erl6
-rw-r--r--src/rabbit_variable_queue.erl12
3 files changed, 98 insertions, 31 deletions
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index afd0508aac..6a57f6bb2c 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -423,6 +423,7 @@ cluster_status(WhichNodes) ->
node_info() ->
{rabbit_misc:otp_release(), rabbit_misc:version(),
+ mnesia:system_info(protocol_version),
cluster_status_from_mnesia()}.
node_type() ->
@@ -593,26 +594,37 @@ check_cluster_consistency() ->
end.
check_cluster_consistency(Node, CheckNodesConsistency) ->
- case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ case remote_node_info(Node) of
{badrpc, _Reason} ->
{error, not_found};
- {_OTP, _Rabbit, {error, _}} ->
+ {_OTP, Rabbit, DelegateModuleHash, _Status} when is_binary(DelegateModuleHash) ->
+ %% when a delegate module .beam file hash is present
+ %% in the tuple, we are dealing with an old version
+ rabbit_version:version_error("Rabbit", rabbit_misc:version(), Rabbit);
+ {_OTP, _Rabbit, _Protocol, {error, _}} ->
{error, not_found};
- {OTP, Rabbit, {ok, Status}} when CheckNodesConsistency ->
- case check_consistency(OTP, Rabbit, Node, Status) of
+ {OTP, Rabbit, Protocol, {ok, Status}} when CheckNodesConsistency ->
+ case check_consistency(Node, OTP, Rabbit, Protocol, Status) of
{error, _} = E -> E;
{ok, Res} -> {ok, Res}
end;
- {OTP, Rabbit, {ok, Status}} ->
- case check_consistency(OTP, Rabbit) of
+ {OTP, Rabbit, Protocol, {ok, Status}} ->
+ case check_consistency(Node, OTP, Rabbit, Protocol) of
{error, _} = E -> E;
ok -> {ok, Status}
- end;
- {_OTP, Rabbit, _Hash, _Status} ->
- %% delegate hash checking implies version mismatch
- rabbit_version:version_error("Rabbit", rabbit_misc:version(), Rabbit)
+ end
+ end.
+
+remote_node_info(Node) ->
+ case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ {badrpc, _} = Error -> Error;
+ %% RabbitMQ prior to 3.6.2
+ {OTP, Rabbit, Status} -> {OTP, Rabbit, unsupported, Status};
+ %% RabbitMQ 3.6.2 or later
+ {OTP, Rabbit, Protocol, Status} -> {OTP, Rabbit, Protocol, Status}
end.
+
%%--------------------------------------------------------------------
%% Hooks for `rabbit_node_monitor'
%%--------------------------------------------------------------------
@@ -763,14 +775,14 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
Nodes
end.
-check_consistency(OTP, Rabbit) ->
+check_consistency(Node, OTP, Rabbit, ProtocolVersion) ->
rabbit_misc:sequence_error(
- [rabbit_version:check_otp_consistency(OTP),
+ [check_mnesia_or_otp_consistency(Node, ProtocolVersion, OTP),
check_rabbit_consistency(Rabbit)]).
-check_consistency(OTP, Rabbit, Node, Status) ->
+check_consistency(Node, OTP, Rabbit, ProtocolVersion, Status) ->
rabbit_misc:sequence_error(
- [rabbit_version:check_otp_consistency(OTP),
+ [check_mnesia_or_otp_consistency(Node, ProtocolVersion, OTP),
check_rabbit_consistency(Rabbit),
check_nodes_consistency(Node, Status)]).
@@ -785,6 +797,55 @@ check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
[node(), Node, Node])}}
end.
+check_mnesia_or_otp_consistency(_Node, unsupported, OTP) ->
+ rabbit_version:check_otp_consistency(OTP);
+check_mnesia_or_otp_consistency(Node, ProtocolVersion, _) ->
+ check_mnesia_consistency(Node, ProtocolVersion).
+
+check_mnesia_consistency(Node, ProtocolVersion) ->
+ % If mnesia is running we will just check protocol version
+ % If it's not running, we don't want it to join cluster until all checks pass
+ % so we start it without `dir` env variable to prevent
+ % joining cluster and/or corrupting data
+ with_running_or_clean_mnesia(fun() ->
+ case negotiate_protocol([Node]) of
+ [Node] -> ok;
+ [] ->
+ LocalVersion = mnesia:system_info(protocol_version),
+ {error, {inconsistent_cluster,
+ rabbit_misc:format("Mnesia protocol negotiation failed."
+ " Local version: ~p."
+ " Remote version ~p",
+ [LocalVersion, ProtocolVersion])}}
+ end
+ end).
+
+negotiate_protocol([Node]) ->
+ mnesia_monitor:negotiate_protocol([Node]).
+
+with_running_or_clean_mnesia(Fun) ->
+ IsMnesiaRunning = case mnesia:system_info(is_running) of
+ yes -> true;
+ no -> false;
+ stopping ->
+ ensure_mnesia_not_running(),
+ false;
+ starting ->
+ ensure_mnesia_running(),
+ true
+ end,
+ case IsMnesiaRunning of
+ true -> Fun();
+ false ->
+ {ok, MnesiaDir} = application:get_env(mnesia, dir),
+ application:unset_env(mnesia, dir),
+ mnesia:start(),
+ Result = Fun(),
+ application:stop(mnesia),
+ application:set_env(mnesia, dir, MnesiaDir),
+ Result
+ end.
+
check_rabbit_consistency(Remote) ->
rabbit_version:check_version_consistency(
rabbit_misc:version(), Remote, "Rabbit",
@@ -819,16 +880,20 @@ find_auto_cluster_node([Node | Nodes]) ->
"Could not auto-cluster with ~s: " ++ Fmt, [Node | Args]),
find_auto_cluster_node(Nodes)
end,
- case rpc:call(Node, rabbit_mnesia, node_info, []) of
- {badrpc, _} = Reason -> Fail("~p~n", [Reason]);
+ case remote_node_info(Node) of
+ {badrpc, _} = Reason ->
+ Fail("~p~n", [Reason]);
%% old delegate hash check
- {_OTP, RMQ, _Hash, _} -> Fail("version ~s~n", [RMQ]);
- {_OTP, _RMQ, {error, _} = E} -> Fail("~p~n", [E]);
- {OTP, RMQ, _} -> case check_consistency(OTP, RMQ) of
- {error, _} -> Fail("versions ~p~n",
- [{OTP, RMQ}]);
- ok -> {ok, Node}
- end
+ {_OTP, RMQ, Hash, _} when is_binary(Hash) ->
+ Fail("version ~s~n", [RMQ]);
+ {_OTP, _RMQ, _Protocol, {error, _} = E} ->
+ Fail("~p~n", [E]);
+ {OTP, RMQ, Protocol, _} ->
+ case check_consistency(Node, OTP, RMQ, Protocol) of
+ {error, _} -> Fail("versions ~p~n",
+ [{OTP, RMQ}]);
+ ok -> {ok, Node}
+ end
end.
is_only_clustered_disc_node() ->
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 1f7e521dfd..d04551043e 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -242,8 +242,10 @@ update_policies(VHost) ->
fun() ->
[mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
case catch list(VHost) of
- {error, {no_such_vhost, _}} ->
- ok; %% [2]
+ {'EXIT', {throw, {error, {no_such_vhost, _}}}} ->
+ {[], []}; %% [2]
+ {'EXIT', Exit} ->
+ exit(Exit);
Policies ->
{[update_exchange(X, Policies) ||
X <- rabbit_exchange:list(VHost)],
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d5b090bed4..45dde112a5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -688,12 +688,12 @@ requeue(AckTags, #vqstate { mode = default,
State2),
MsgCount = length(MsgIds2),
{MsgIds2, a(reduce_memory_use(
- maybe_update_rates(
+ maybe_update_rates(ui(
State3 #vqstate { delta = Delta1,
q3 = Q3a,
q4 = Q4a,
in_counter = InCounter + MsgCount,
- len = Len + MsgCount })))};
+ len = Len + MsgCount }))))};
requeue(AckTags, #vqstate { mode = lazy,
delta = Delta,
q3 = Q3,
@@ -706,11 +706,11 @@ requeue(AckTags, #vqstate { mode = lazy,
State1),
MsgCount = length(MsgIds1),
{MsgIds1, a(reduce_memory_use(
- maybe_update_rates(
+ maybe_update_rates(ui(
State2 #vqstate { delta = Delta1,
q3 = Q3a,
in_counter = InCounter + MsgCount,
- len = Len + MsgCount })))}.
+ len = Len + MsgCount }))))}.
ackfold(MsgFun, Acc, State, AckTags) ->
{AccN, StateN} =
@@ -2124,7 +2124,7 @@ publish_alpha(MsgStatus, State) ->
{MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}.
publish_beta(MsgStatus, State) ->
- {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
+ {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
{MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}.
@@ -2161,7 +2161,7 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
{#msg_status { msg_id = MsgId } = MsgStatus, State1} =
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
- maybe_write_to_disk(true, true, MsgStatus, State1),
+ maybe_prepare_write_to_disk(true, true, MsgStatus, State1),
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
stats({1, -1}, {MsgStatus, none}, State2)}
end, {Delta, MsgIds, State}, SeqIds).