summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mirrored_supervisor.erl23
-rw-r--r--src/rabbit_alarm.erl3
-rw-r--r--src/rabbit_client_sup.erl5
-rw-r--r--src/rabbit_networking.erl150
-rw-r--r--src/rabbit_restartable_sup.erl3
-rw-r--r--src/rabbit_types.erl14
-rw-r--r--src/tcp_acceptor_sup.erl6
-rw-r--r--src/tcp_listener.erl9
-rw-r--r--src/tcp_listener_sup.erl12
-rw-r--r--src/worker_pool.erl7
-rw-r--r--src/worker_pool_worker.erl10
11 files changed, 124 insertions, 118 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 6e8f96d9a4..3ba8f50d2e 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -144,32 +144,17 @@
-type child() :: pid() | 'undefined'.
-type child_id() :: term().
--type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}.
-type modules() :: [module()] | 'dynamic'.
--type restart() :: 'permanent' | 'transient' | 'temporary'.
--type shutdown() :: 'brutal_kill' | timeout().
-type worker() :: 'worker' | 'supervisor'.
-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
-type sup_ref() :: (Name :: atom())
| {Name :: atom(), Node :: node()}
| {'global', Name :: atom()}
| pid().
--type child_spec() :: {Id :: child_id(),
- StartFunc :: mfargs(),
- Restart :: restart(),
- Shutdown :: shutdown(),
- Type :: worker(),
- Modules :: modules()}.
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
--type startchild_err() :: 'already_present'
- | {'already_started', Child :: child()} | term().
--type startchild_ret() :: {'ok', Child :: child()}
- | {'ok', Child :: child(), Info :: term()}
- | {'error', startchild_err()}.
-
-type group_name() :: any().
-spec start_link(GroupName, Module, Args) -> startlink_ret() when
@@ -183,9 +168,9 @@
Module :: module(),
Args :: term().
--spec start_child(SupRef, ChildSpec) -> startchild_ret() when
+-spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when
SupRef :: sup_ref(),
- ChildSpec :: child_spec() | (List :: [term()]).
+ ChildSpec :: supervisor:child_spec() | (List :: [term()]).
-spec restart_child(SupRef, Id) -> Result when
SupRef :: sup_ref(),
@@ -215,12 +200,12 @@
Modules :: modules().
-spec check_childspecs(ChildSpecs) -> Result when
- ChildSpecs :: [child_spec()],
+ ChildSpecs :: [supervisor:child_spec()],
Result :: 'ok' | {'error', Error :: term()}.
-spec start_internal(Group, ChildSpecs) -> Result when
Group :: group_name(),
- ChildSpecs :: [child_spec()],
+ ChildSpecs :: [supervisor:child_spec()],
Result :: startlink_ret().
-spec create_tables() -> Result when
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index fd03ca85b3..517dd4ec60 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -31,10 +31,9 @@
-ifdef(use_specs).
--type(mfa_tuple() :: {atom(), atom(), list()}).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
+-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index dfb400e37f..4ba01b4f4d 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -28,8 +28,9 @@
-ifdef(use_specs).
--spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()).
--spec(start_link/2 :: ({'local', atom()}, mfa()) ->
+-spec(start_link/1 :: (rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) ->
rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 045ab89a70..e81f8134f8 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -24,7 +24,7 @@
close_connection/2, force_connection_event_refresh/0]).
%%used by TCP-based transports, e.g. STOMP adapter
--export([check_tcp_listener_address/2,
+-export([tcp_listener_addresses/1, tcp_listener_spec/6,
ensure_ssl/0, ssl_transform_fun/1]).
-export([tcp_listener_started/3, tcp_listener_stopped/3,
@@ -47,12 +47,16 @@
-export_type([ip_port/0, hostname/0]).
-type(hostname() :: inet:hostname()).
--type(ip_port() :: inet:ip_port()).
+-type(ip_port() :: inet:port_number()).
-type(family() :: atom()).
-type(listener_config() :: ip_port() |
{hostname(), ip_port()} |
{hostname(), ip_port(), family()}).
+-type(address() :: {inet:ip_address(), ip_port(), family()}).
+-type(name_prefix() :: atom()).
+-type(protocol() :: atom()).
+-type(label() :: string()).
-spec(start/0 :: () -> 'ok').
-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok').
@@ -76,8 +80,10 @@
-spec(force_connection_event_refresh/0 :: () -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(check_tcp_listener_address/2 :: (atom(), listener_config())
- -> [{inet:ip_address(), ip_port(), family(), atom()}]).
+-spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]).
+-spec(tcp_listener_spec/6 ::
+ (name_prefix(), address(), [gen_tcp:listen_option()], protocol(),
+ label(), rabbit_types:mfargs()) -> supervisor:child_spec()).
-spec(ensure_ssl/0 :: () -> rabbit_types:infos()).
-spec(ssl_transform_fun/1 ::
(rabbit_types:infos())
@@ -140,39 +146,6 @@ start() ->
transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
-%% inet_parse:address takes care of ip string, like "0.0.0.0"
-%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
-%% and runs 'inet_gethost' port process for dns lookups.
-%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
-
-getaddr(Host, Family) ->
- case inet_parse:address(Host) of
- {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
- {error, _} -> gethostaddr(Host, Family)
- end.
-
-gethostaddr(Host, auto) ->
- Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
- case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
- [] -> host_lookup_error(Host, Lookups);
- IPs -> IPs
- end;
-
-gethostaddr(Host, Family) ->
- case inet:getaddr(Host, Family) of
- {ok, IPAddress} -> [{IPAddress, Family}];
- {error, Reason} -> host_lookup_error(Host, Reason)
- end.
-
-host_lookup_error(Host, Reason) ->
- error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
- throw({error, {invalid_host, Host, Reason}}).
-
-resolve_family({_,_,_,_}, auto) -> inet;
-resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
-resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
-resolve_family(_, F) -> F.
-
ensure_ssl() ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
@@ -201,31 +174,36 @@ ssl_transform_fun(SslOpts) ->
end
end.
-check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) ->
- check_tcp_listener_address_auto(NamePrefix, Port);
-
-check_tcp_listener_address(NamePrefix, {"auto", Port}) ->
+tcp_listener_addresses(Port) when is_integer(Port) ->
+ tcp_listener_addresses_auto(Port);
+tcp_listener_addresses({"auto", Port}) ->
%% Variant to prevent lots of hacking around in bash and batch files
- check_tcp_listener_address_auto(NamePrefix, Port);
-
-check_tcp_listener_address(NamePrefix, {Host, Port}) ->
+ tcp_listener_addresses_auto(Port);
+tcp_listener_addresses({Host, Port}) ->
%% auto: determine family IPv4 / IPv6 after converting to IP address
- check_tcp_listener_address(NamePrefix, {Host, Port, auto});
-
-check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) ->
- if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
- true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
- [Port]),
- throw({error, {invalid_port, Port}})
- end,
- [{IPAddress, Port, Family,
- rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} ||
- {IPAddress, Family} <- getaddr(Host, Family0)].
-
-check_tcp_listener_address_auto(NamePrefix, Port) ->
- lists:append([check_tcp_listener_address(NamePrefix, Listener) ||
+ tcp_listener_addresses({Host, Port, auto});
+tcp_listener_addresses({Host, Port, Family0})
+ when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) ->
+ [{IPAddress, Port, Family} ||
+ {IPAddress, Family} <- getaddr(Host, Family0)];
+tcp_listener_addresses({_Host, Port, _Family0}) ->
+ error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]),
+ throw({error, {invalid_port, Port}}).
+
+tcp_listener_addresses_auto(Port) ->
+ lists:append([tcp_listener_addresses(Listener) ||
Listener <- port_to_listeners(Port)]).
+tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
+ Protocol, Label, OnConnect) ->
+ {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
+ {tcp_listener_sup, start_link,
+ [IPAddress, Port, [Family | SocketOpts],
+ {?MODULE, tcp_listener_started, [Protocol]},
+ {?MODULE, tcp_listener_stopped, [Protocol]},
+ OnConnect, Label]},
+ transient, infinity, supervisor, [tcp_listener_sup]}.
+
start_tcp_listener(Listener) ->
start_listener(Listener, amqp, "TCP Listener",
{?MODULE, start_client, []}).
@@ -235,27 +213,21 @@ start_ssl_listener(Listener, SslOpts) ->
{?MODULE, start_ssl_client, [SslOpts]}).
start_listener(Listener, Protocol, Label, OnConnect) ->
- [start_listener0(Spec, Protocol, Label, OnConnect) ||
- Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
+ [start_listener0(Address, Protocol, Label, OnConnect) ||
+ Address <- tcp_listener_addresses(Listener)],
ok.
-start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
- {ok,_} = supervisor:start_child(
- rabbit_sup,
- {Name,
- {tcp_listener_sup, start_link,
- [IPAddress, Port, [Family | tcp_opts()],
- {?MODULE, tcp_listener_started, [Protocol]},
- {?MODULE, tcp_listener_stopped, [Protocol]},
- OnConnect, Label]},
- transient, infinity, supervisor, [tcp_listener_sup]}).
+start_listener0(Address, Protocol, Label, OnConnect) ->
+ Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(),
+ Protocol, Label, OnConnect),
+ {ok,_} = supervisor:start_child(rabbit_sup, Spec).
stop_tcp_listener(Listener) ->
- [stop_tcp_listener0(Spec) ||
- Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
+ [stop_tcp_listener0(Address) ||
+ Address <- tcp_listener_addresses(Listener)],
ok.
-stop_tcp_listener0({IPAddress, Port, _Family, Name}) ->
+stop_tcp_listener0({IPAddress, Port, _Family}) ->
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name).
@@ -363,6 +335,38 @@ tcp_opts() ->
{ok, Opts} = application:get_env(rabbit, tcp_listen_options),
Opts.
+%% inet_parse:address takes care of ip string, like "0.0.0.0"
+%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
+%% and runs 'inet_gethost' port process for dns lookups.
+%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
+getaddr(Host, Family) ->
+ case inet_parse:address(Host) of
+ {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
+ {error, _} -> gethostaddr(Host, Family)
+ end.
+
+gethostaddr(Host, auto) ->
+ Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
+ case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
+ [] -> host_lookup_error(Host, Lookups);
+ IPs -> IPs
+ end;
+
+gethostaddr(Host, Family) ->
+ case inet:getaddr(Host, Family) of
+ {ok, IPAddress} -> [{IPAddress, Family}];
+ {error, Reason} -> host_lookup_error(Host, Reason)
+ end.
+
+host_lookup_error(Host, Reason) ->
+ error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
+ throw({error, {invalid_host, Host, Reason}}).
+
+resolve_family({_,_,_,_}, auto) -> inet;
+resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
+resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
+resolve_family(_, F) -> F.
+
%%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes).
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index cda3ccbe0f..1a08efed41 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -28,7 +28,8 @@
-ifdef(use_specs).
--spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: (atom(), rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 2db960acc9..ae2b5d3f46 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -28,12 +28,9 @@
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
connection/0, protocol/0, user/0, internal_user/0,
- username/0, password/0, password_hash/0, ok/1, error/1,
- ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
- connection_exit/0]).
-
--type(channel_exit() :: no_return()).
--type(connection_exit() :: no_return()).
+ username/0, password/0, password_hash/0,
+ ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
+ channel_exit/0, connection_exit/0, mfargs/0]).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -156,4 +153,9 @@
-type(ok_or_error2(A, B) :: ok(A) | error(B)).
-type(ok_pid_or_error() :: ok_or_error2(pid(), any())).
+-type(channel_exit() :: no_return()).
+-type(connection_exit() :: no_return()).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-endif. % use_specs
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index 4c835598e0..cb3dd02c42 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -25,7 +25,11 @@
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
+-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()).
+
-endif.
%%----------------------------------------------------------------------------
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index ad2a0d02d0..9a82ac88c1 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -28,9 +28,14 @@
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/8 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(),
- atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ integer(), atom(), mfargs(), mfargs(), string()) ->
+ rabbit_types:ok_pid_or_error()).
+
-endif.
%%--------------------------------------------------------------------
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 5bff5c2701..74297b6dce 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -26,12 +26,16 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/7 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
- mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ mfargs(), mfargs(), mfargs(), string()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(start_link/8 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
- mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ mfargs(), mfargs(), mfargs(), integer(), string()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 456ff39f47..fcb07a16f4 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -37,10 +37,11 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
--spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
--spec(submit_async/1 ::
- (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
+-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
+-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
-spec(idle/1 :: (any()) -> 'ok').
-endif.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 78ab4df3ea..b42530e269 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -29,12 +29,12 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
--spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
--spec(submit_async/2 ::
- (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
--spec(run/1 :: (fun (() -> A)) -> A;
- ({atom(), atom(), [any()]}) -> any()).
+-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
+-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
+-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-endif.