summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-03-18 13:43:45 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-03-18 13:43:45 +0000
commit85506c113f88092ffd4f544a43123715f5d9212f (patch)
tree8c904c4591274e0c23126a890dcbc95cb86319ba /src
parenta20c6cd9018645d65f7ee7009208feb4ebf8f9e0 (diff)
parent7a7c1a3c0757353319c17a9567a6340185905491 (diff)
downloadrabbitmq-server-git-85506c113f88092ffd4f544a43123715f5d9212f.tar.gz
Merge bug25722
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl11
-rw-r--r--src/rabbit_control_main.erl10
-rw-r--r--src/rabbit_misc.erl8
-rw-r--r--src/rabbit_mnesia.erl50
-rw-r--r--src/rabbit_networking.erl6
-rw-r--r--src/rabbit_nodes.erl76
-rw-r--r--src/rabbit_prelaunch.erl73
-rw-r--r--src/rabbit_reader.erl4
-rw-r--r--src/tcp_acceptor.erl4
9 files changed, 169 insertions, 73 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b7e8e7a615..a47a15ca4e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -394,7 +394,8 @@ status() ->
{os, os:type()},
{erlang_version, erlang:system_info(system_version)},
{memory, rabbit_vm:memory()},
- {alarms, alarms()}],
+ {alarms, alarms()},
+ {listeners, listeners()}],
S2 = rabbit_misc:filter_exit_map(
fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
[{vm_memory_high_watermark, {vm_memory_monitor,
@@ -424,6 +425,14 @@ alarms() ->
%% [{{resource_limit,memory,rabbit@mercurio},[]}]
[Limit || {{resource_limit, Limit, Node}, _} <- Alarms, Node =:= N].
+listeners() ->
+ [{Protocol, Port, rabbit_misc:ntoa(IP)} ||
+ #listener{node = Node,
+ protocol = Protocol,
+ ip_address = IP,
+ port = Port} <- rabbit_networking:active_listeners(),
+ Node =:= node()].
+
is_running() -> is_running(node()).
is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit).
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 6d6f8e1cb3..f9e5907858 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -192,7 +192,11 @@ start() ->
rabbit_misc:quit(2);
{badrpc, Reason} ->
print_error("unable to connect to node ~w: ~w", [Node, Reason]),
- print_badrpc_diagnostics(Node),
+ print_badrpc_diagnostics([Node]),
+ rabbit_misc:quit(2);
+ {badrpc_multi, Reason, Nodes} ->
+ print_error("unable to connect to nodes ~p: ~w", [Nodes, Reason]),
+ print_badrpc_diagnostics(Nodes),
rabbit_misc:quit(2);
Other ->
print_error("~p", [Other]),
@@ -220,8 +224,8 @@ print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) ->
print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
-print_badrpc_diagnostics(Node) ->
- fmt_stderr(rabbit_nodes:diagnostics([Node]), []).
+print_badrpc_diagnostics(Nodes) ->
+ fmt_stderr(rabbit_nodes:diagnostics(Nodes), []).
stop() ->
ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 9d44fcaf75..2e87a8f6e1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -542,9 +542,11 @@ tcp_name(Prefix, IPAddress, Port)
list_to_atom(
format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])).
-format_inet_error(address) -> "cannot connect to host/port";
-format_inet_error(timeout) -> "timed out";
-format_inet_error(Error) -> inet:format_error(Error).
+format_inet_error(E) -> format("~w (~s)", [E, format_inet_error0(E)]).
+
+format_inet_error0(address) -> "cannot connect to host/port";
+format_inet_error0(timeout) -> "timed out";
+format_inet_error0(Error) -> inet:format_error(Error).
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 521f2e6dbe..baf53712ff 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -129,7 +129,7 @@ init_from_config() ->
{ok, Node} ->
rabbit_log:info("Node '~p' selected for clustering from "
"configuration~n", [Node]),
- {ok, {_, DiscNodes, _}} = discover_cluster(Node),
+ {ok, {_, DiscNodes, _}} = discover_cluster0(Node),
init_db_and_upgrade(DiscNodes, NodeType, true),
rabbit_node_monitor:notify_joined_cluster();
none ->
@@ -160,10 +160,7 @@ join_cluster(DiscoveryNode, NodeType) ->
true -> e(clustering_only_disc_node);
false -> ok
end,
- {ClusterNodes, _, _} = case discover_cluster(DiscoveryNode) of
- {ok, Res} -> Res;
- {error, _} = E -> throw(E)
- end,
+ {ClusterNodes, _, _} = discover_cluster([DiscoveryNode]),
case me_in_nodes(ClusterNodes) of
false ->
%% reset the node. this simplifies things and it will be needed in
@@ -229,10 +226,7 @@ change_cluster_node_type(Type) ->
false -> e(not_clustered);
true -> ok
end,
- {_, _, RunningNodes} = case discover_cluster(cluster_nodes(all)) of
- {ok, Status} -> Status;
- {error, _Reason} -> e(cannot_connect_to_cluster)
- end,
+ {_, _, RunningNodes} = discover_cluster(cluster_nodes(all)),
%% We might still be marked as running by a remote node since the
%% information of us going down might not have propagated yet.
Node = case RunningNodes -- [node()] of
@@ -245,11 +239,7 @@ change_cluster_node_type(Type) ->
update_cluster_nodes(DiscoveryNode) ->
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
- Status = {AllNodes, _, _} =
- case discover_cluster(DiscoveryNode) of
- {ok, Status0} -> Status0;
- {error, _Reason} -> e(cannot_connect_to_node)
- end,
+ Status = {AllNodes, _, _} = discover_cluster([DiscoveryNode]),
case me_in_nodes(AllNodes) of
true ->
%% As in `check_consistency/0', we can safely delete the
@@ -607,22 +597,20 @@ running_disc_nodes() ->
%% Internal helpers
%%--------------------------------------------------------------------
-discover_cluster(Nodes) when is_list(Nodes) ->
- lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
- (Node, {error, _}) -> discover_cluster(Node)
- end, {error, no_nodes_provided}, Nodes);
-discover_cluster(Node) when Node == node() ->
- {error, {cannot_discover_cluster, "Cannot cluster node with itself"}};
-discover_cluster(Node) ->
- OfflineError =
- {error, {cannot_discover_cluster,
- "The nodes provided are either offline or not running"}},
- case rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []) of
- {badrpc, _Reason} -> OfflineError;
- {error, mnesia_not_running} -> OfflineError;
- {ok, Res} -> {ok, Res}
+discover_cluster(Nodes) ->
+ case lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
+ (Node, _) -> discover_cluster0(Node)
+ end, {error, no_nodes_provided}, Nodes) of
+ {ok, Res} -> Res;
+ {error, E} -> throw(E);
+ {badrpc, Reason} -> throw({badrpc_multi, Reason, Nodes})
end.
+discover_cluster0(Node) when Node == node() ->
+ {error, cannot_cluster_node_with_itself};
+discover_cluster0(Node) ->
+ rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []).
+
schema_ok_or_move() ->
case rabbit_table:check_schema_integrity() of
ok ->
@@ -833,15 +821,9 @@ error_description(resetting_only_disc_node) ->
"Please convert another node of the cluster to a disc node first.";
error_description(not_clustered) ->
"Non-clustered nodes can only be disc nodes.";
-error_description(cannot_connect_to_cluster) ->
- "Could not connect to the cluster nodes present in this node's "
- "status file. If the cluster has changed, you can use the "
- "'update_cluster_nodes' command to point to the new cluster nodes.";
error_description(no_online_cluster_nodes) ->
"Could not find any online cluster nodes. If the cluster has changed, "
"you can use the 'update_cluster_nodes' command.";
-error_description(cannot_connect_to_node) ->
- "Could not connect to the cluster node provided.";
error_description(inconsistent_cluster) ->
"The nodes provided do not have this node as part of the cluster.";
error_description(not_a_cluster_node) ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 3a7396eb68..1da977262f 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -120,6 +120,7 @@
%%----------------------------------------------------------------------------
boot() ->
+ ok = record_distribution_listener(),
ok = start(),
ok = boot_tcp(),
ok = boot_ssl().
@@ -275,6 +276,11 @@ tcp_listener_stopped(Protocol, IPAddress, Port) ->
ip_address = IPAddress,
port = Port}).
+record_distribution_listener() ->
+ {Name, Host} = rabbit_nodes:parts(node()),
+ {port, Port, _Version} = erl_epmd:port_please(Name, Host),
+ tcp_listener_started(clustering, {0,0,0,0,0,0,0,0}, Port).
+
active_listeners() ->
rabbit_misc:dirty_read_all(rabbit_listener).
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 0edb90611a..db3cd08311 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -23,6 +23,7 @@
-include_lib("kernel/include/inet.hrl").
-define(EPMD_TIMEOUT, 30000).
+-define(TCP_DIAGNOSTIC_TIMEOUT, 5000).
%%----------------------------------------------------------------------------
%% Specs
@@ -58,15 +59,13 @@ names(Hostname) ->
end.
diagnostics(Nodes) ->
- Hosts = lists:usort([element(2, parts(Node)) || Node <- Nodes]),
NodeDiags = [{"~nDIAGNOSTICS~n===========~n~n"
- "nodes in question: ~p~n~n"
- "hosts, their running nodes and ports:", [Nodes]}] ++
- [diagnostics_host(Host) || Host <- Hosts] ++
- diagnostics0(),
+ "attempted to contact: ~p~n", [Nodes]}] ++
+ [diagnostics_node(Node) || Node <- Nodes] ++
+ current_node_details(),
rabbit_misc:format_many(lists:flatten(NodeDiags)).
-diagnostics0() ->
+current_node_details() ->
[{"~ncurrent node details:~n- node name: ~w", [node()]},
case init:get_argument(home) of
{ok, [[Home]]} -> {"- home dir: ~s", [Home]};
@@ -74,15 +73,62 @@ diagnostics0() ->
end,
{"- cookie hash: ~s", [cookie_hash()]}].
-diagnostics_host(Host) ->
- case names(Host) of
- {error, EpmdReason} ->
- {"- unable to connect to epmd on ~s: ~w (~s)",
- [Host, EpmdReason, rabbit_misc:format_inet_error(EpmdReason)]};
- {ok, NamePorts} ->
- {"- ~s: ~p",
- [Host, [{list_to_atom(Name), Port} ||
- {Name, Port} <- NamePorts]]}
+diagnostics_node(Node) ->
+ {Name, Host} = parts(Node),
+ [{"~s:", [Node]} |
+ case names(Host) of
+ {error, Reason} ->
+ [{" * unable to connect to epmd (port ~s) on ~s: ~s~n",
+ [epmd_port(), Host, rabbit_misc:format_inet_error(Reason)]}];
+ {ok, NamePorts} ->
+ diagnostics_node0(Name, Host, NamePorts)
+ end].
+
+epmd_port() ->
+ case init:get_argument(epmd_port) of
+ {ok, [[Port | _] | _]} when is_list(Port) -> Port;
+ error -> "4369"
+ end.
+
+diagnostics_node0(Name, Host, NamePorts) ->
+ case [{N, P} || {N, P} <- NamePorts, N =:= Name] of
+ [] ->
+ {SelfName, SelfHost} = parts(node()),
+ Others = [list_to_atom(N) || {N, _} <- NamePorts,
+ N =/= case SelfHost of
+ Host -> SelfName;
+ _ -> never_matches
+ end],
+ [{" * ~s seems not to be running at all", [Name]} |
+ case Others of
+ [] -> [{" * no other nodes on ~s", [Host]}];
+ _ -> [{" * other nodes on ~s: ~p", [Host, Others]}]
+ end];
+ [{Name, Port}] ->
+ [{" * found ~s (port ~b)", [Name, Port]} |
+ case diagnose_connect(Host, Port) of
+ ok ->
+ [{" * TCP connection succeeded~n"
+ " * suggestion: hostname mismatch?~n"
+ " * suggestion: is the cookie set correctly?", []}];
+ {error, Reason} ->
+ [{" * can't establish TCP connection, reason: ~s~n"
+ " * suggestion: blocked by firewall?",
+ [rabbit_misc:format_inet_error(Reason)]}]
+ end]
+ end.
+
+diagnose_connect(Host, Port) ->
+ case inet:gethostbyname(Host) of
+ {ok, #hostent{h_addrtype = Family}} ->
+ case gen_tcp:connect(Host, Port, [Family],
+ ?TCP_DIAGNOSTIC_TIMEOUT) of
+ {ok, Socket} -> gen_tcp:close(Socket),
+ ok;
+ {error, _} = E -> E
+ end;
+ {error, _} = E ->
+ E
end.
make({Prefix, Suffix}) -> list_to_atom(lists:append([Prefix, "@", Suffix]));
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 29ca984a0f..9e8f15b87e 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -18,10 +18,13 @@
-export([start/0, stop/0]).
+-import(rabbit_misc, [pget/2, pget/3]).
+
-include("rabbit.hrl").
--define(BaseApps, [rabbit]).
+-define(DIST_PORT_NOT_CONFIGURED, 0).
-define(ERROR_CODE, 1).
+-define(DIST_PORT_CONFIGURED, 2).
%%----------------------------------------------------------------------------
%% Specs
@@ -37,9 +40,19 @@
%%----------------------------------------------------------------------------
start() ->
- [NodeStr] = init:get_plain_arguments(),
- ok = duplicate_node_check(NodeStr),
- rabbit_misc:quit(0),
+ case init:get_plain_arguments() of
+ [NodeStr] ->
+ Node = rabbit_nodes:make(NodeStr),
+ {NodeName, NodeHost} = rabbit_nodes:parts(Node),
+ ok = duplicate_node_check(Node, NodeName, NodeHost),
+ ok = dist_port_set_check(),
+ ok = dist_port_use_check(NodeHost);
+ [] ->
+ %% Ignore running node while installing windows service
+ ok = dist_port_set_check(),
+ ok
+ end,
+ rabbit_misc:quit(?DIST_PORT_NOT_CONFIGURED),
ok.
stop() ->
@@ -48,12 +61,7 @@ stop() ->
%%----------------------------------------------------------------------------
%% Check whether a node with the same name is already running
-duplicate_node_check([]) ->
- %% Ignore running node while installing windows service
- ok;
-duplicate_node_check(NodeStr) ->
- Node = rabbit_nodes:make(NodeStr),
- {NodeName, NodeHost} = rabbit_nodes:parts(Node),
+duplicate_node_check(Node, NodeName, NodeHost) ->
case rabbit_nodes:names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
@@ -65,8 +73,47 @@ duplicate_node_check(NodeStr) ->
false -> ok
end;
{error, EpmdReason} ->
- io:format("ERROR: epmd error for host ~p: ~p (~s)~n",
- [NodeHost, EpmdReason,
- rabbit_misc:format_inet_error(EpmdReason)]),
+ io:format("ERROR: epmd error for host ~s: ~s~n",
+ [NodeHost, rabbit_misc:format_inet_error(EpmdReason)]),
rabbit_misc:quit(?ERROR_CODE)
end.
+
+dist_port_set_check() ->
+ case os:getenv("RABBITMQ_CONFIG_FILE") of
+ false ->
+ ok;
+ File ->
+ case file:consult(File ++ ".config") of
+ {ok, [Config]} ->
+ Kernel = pget(kernel, Config, []),
+ case {pget(inet_dist_listen_min, Kernel, none),
+ pget(inet_dist_listen_max, Kernel, none)} of
+ {none, none} -> ok;
+ _ -> rabbit_misc:quit(?DIST_PORT_CONFIGURED)
+ end;
+ {error, _} ->
+ %% TODO can we present errors more nicely here
+ %% than after -config has failed?
+ ok
+ end
+ end.
+
+dist_port_use_check(NodeHost) ->
+ case os:getenv("RABBITMQ_DIST_PORT") of
+ false -> ok;
+ PortStr -> Port = list_to_integer(PortStr),
+ case gen_tcp:listen(Port, [inet]) of
+ {ok, Sock} -> gen_tcp:close(Sock);
+ {error, _} -> dist_port_use_check_fail(Port, NodeHost)
+ end
+ end.
+
+dist_port_use_check_fail(Port, Host) ->
+ {ok, Names} = rabbit_nodes:names(Host),
+ case [N || {N, P} <- Names, P =:= Port] of
+ [] -> io:format("ERROR: distribution port ~b in use on ~s "
+ "(by non-Erlang process?)~n", [Port, Host]);
+ [Name] -> io:format("ERROR: distribution port ~b in use by ~s@~s~n",
+ [Port, Name, Host])
+ end,
+ rabbit_misc:quit(?ERROR_CODE).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e2771d82ad..89cfc312d4 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -188,8 +188,8 @@ server_capabilities(_) ->
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
socket_error(Reason) ->
- log(error, "error on AMQP connection ~p: ~p (~s)~n",
- [self(), Reason, rabbit_misc:format_inet_error(Reason)]).
+ log(error, "error on AMQP connection ~p: ~s~n",
+ [self(), rabbit_misc:format_inet_error(Reason)]).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index f95118a38c..047b85c5a3 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -63,9 +63,9 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
{error, Err} -> {ok, {IPAddress, Port}} = inet:sockname(LSock),
error_logger:error_msg(
"failed to tune buffer size of "
- "connection accepted on ~s:~p - ~p (~s)~n",
+ "connection accepted on ~s:~p - ~s~n",
[rabbit_misc:ntoab(IPAddress), Port,
- Err, rabbit_misc:format_inet_error(Err)]),
+ rabbit_misc:format_inet_error(Err)]),
catch port_close(Sock)
end,