diff options
| -rw-r--r-- | src/mirrored_supervisor.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_alarm.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_client_sup.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 150 | ||||
| -rw-r--r-- | src/rabbit_restartable_sup.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_types.erl | 14 | ||||
| -rw-r--r-- | src/tcp_acceptor_sup.erl | 6 | ||||
| -rw-r--r-- | src/tcp_listener.erl | 9 | ||||
| -rw-r--r-- | src/tcp_listener_sup.erl | 12 | ||||
| -rw-r--r-- | src/worker_pool.erl | 7 | ||||
| -rw-r--r-- | src/worker_pool_worker.erl | 10 |
11 files changed, 124 insertions, 118 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 6e8f96d9a4..3ba8f50d2e 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -144,32 +144,17 @@ -type child() :: pid() | 'undefined'. -type child_id() :: term(). --type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}. -type modules() :: [module()] | 'dynamic'. --type restart() :: 'permanent' | 'transient' | 'temporary'. --type shutdown() :: 'brutal_kill' | timeout(). -type worker() :: 'worker' | 'supervisor'. -type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}. -type sup_ref() :: (Name :: atom()) | {Name :: atom(), Node :: node()} | {'global', Name :: atom()} | pid(). --type child_spec() :: {Id :: child_id(), - StartFunc :: mfargs(), - Restart :: restart(), - Shutdown :: shutdown(), - Type :: worker(), - Modules :: modules()}. -type startlink_err() :: {'already_started', pid()} | 'shutdown' | term(). -type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. --type startchild_err() :: 'already_present' - | {'already_started', Child :: child()} | term(). --type startchild_ret() :: {'ok', Child :: child()} - | {'ok', Child :: child(), Info :: term()} - | {'error', startchild_err()}. - -type group_name() :: any(). -spec start_link(GroupName, Module, Args) -> startlink_ret() when @@ -183,9 +168,9 @@ Module :: module(), Args :: term(). --spec start_child(SupRef, ChildSpec) -> startchild_ret() when +-spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when SupRef :: sup_ref(), - ChildSpec :: child_spec() | (List :: [term()]). + ChildSpec :: supervisor:child_spec() | (List :: [term()]). -spec restart_child(SupRef, Id) -> Result when SupRef :: sup_ref(), @@ -215,12 +200,12 @@ Modules :: modules(). -spec check_childspecs(ChildSpecs) -> Result when - ChildSpecs :: [child_spec()], + ChildSpecs :: [supervisor:child_spec()], Result :: 'ok' | {'error', Error :: term()}. -spec start_internal(Group, ChildSpecs) -> Result when Group :: group_name(), - ChildSpecs :: [child_spec()], + ChildSpecs :: [supervisor:child_spec()], Result :: startlink_ret(). -spec create_tables() -> Result when diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index fd03ca85b3..517dd4ec60 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -31,10 +31,9 @@ -ifdef(use_specs). --type(mfa_tuple() :: {atom(), atom(), list()}). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). +-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()). -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index dfb400e37f..4ba01b4f4d 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -28,8 +28,9 @@ -ifdef(use_specs). --spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()). --spec(start_link/2 :: ({'local', atom()}, mfa()) -> +-spec(start_link/1 :: (rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) -> rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 045ab89a70..e81f8134f8 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -24,7 +24,7 @@ close_connection/2, force_connection_event_refresh/0]). %%used by TCP-based transports, e.g. STOMP adapter --export([check_tcp_listener_address/2, +-export([tcp_listener_addresses/1, tcp_listener_spec/6, ensure_ssl/0, ssl_transform_fun/1]). -export([tcp_listener_started/3, tcp_listener_stopped/3, @@ -47,12 +47,16 @@ -export_type([ip_port/0, hostname/0]). -type(hostname() :: inet:hostname()). --type(ip_port() :: inet:ip_port()). +-type(ip_port() :: inet:port_number()). -type(family() :: atom()). -type(listener_config() :: ip_port() | {hostname(), ip_port()} | {hostname(), ip_port(), family()}). +-type(address() :: {inet:ip_address(), ip_port(), family()}). +-type(name_prefix() :: atom()). +-type(protocol() :: atom()). +-type(label() :: string()). -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/1 :: (listener_config()) -> 'ok'). @@ -76,8 +80,10 @@ -spec(force_connection_event_refresh/0 :: () -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(check_tcp_listener_address/2 :: (atom(), listener_config()) - -> [{inet:ip_address(), ip_port(), family(), atom()}]). +-spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]). +-spec(tcp_listener_spec/6 :: + (name_prefix(), address(), [gen_tcp:listen_option()], protocol(), + label(), rabbit_types:mfargs()) -> supervisor:child_spec()). -spec(ensure_ssl/0 :: () -> rabbit_types:infos()). -spec(ssl_transform_fun/1 :: (rabbit_types:infos()) @@ -140,39 +146,6 @@ start() -> transient, infinity, supervisor, [rabbit_client_sup]}), ok. -%% inet_parse:address takes care of ip string, like "0.0.0.0" -%% inet:getaddr returns immediately for ip tuple {0,0,0,0}, -%% and runs 'inet_gethost' port process for dns lookups. -%% On Windows inet:getaddr runs dns resolver for ip string, which may fail. - -getaddr(Host, Family) -> - case inet_parse:address(Host) of - {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}]; - {error, _} -> gethostaddr(Host, Family) - end. - -gethostaddr(Host, auto) -> - Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]], - case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of - [] -> host_lookup_error(Host, Lookups); - IPs -> IPs - end; - -gethostaddr(Host, Family) -> - case inet:getaddr(Host, Family) of - {ok, IPAddress} -> [{IPAddress, Family}]; - {error, Reason} -> host_lookup_error(Host, Reason) - end. - -host_lookup_error(Host, Reason) -> - error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), - throw({error, {invalid_host, Host, Reason}}). - -resolve_family({_,_,_,_}, auto) -> inet; -resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6; -resolve_family(IP, auto) -> throw({error, {strange_family, IP}}); -resolve_family(_, F) -> F. - ensure_ssl() -> ok = rabbit_misc:start_applications([crypto, public_key, ssl]), {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options), @@ -201,31 +174,36 @@ ssl_transform_fun(SslOpts) -> end end. -check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) -> - check_tcp_listener_address_auto(NamePrefix, Port); - -check_tcp_listener_address(NamePrefix, {"auto", Port}) -> +tcp_listener_addresses(Port) when is_integer(Port) -> + tcp_listener_addresses_auto(Port); +tcp_listener_addresses({"auto", Port}) -> %% Variant to prevent lots of hacking around in bash and batch files - check_tcp_listener_address_auto(NamePrefix, Port); - -check_tcp_listener_address(NamePrefix, {Host, Port}) -> + tcp_listener_addresses_auto(Port); +tcp_listener_addresses({Host, Port}) -> %% auto: determine family IPv4 / IPv6 after converting to IP address - check_tcp_listener_address(NamePrefix, {Host, Port, auto}); - -check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) -> - if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; - true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", - [Port]), - throw({error, {invalid_port, Port}}) - end, - [{IPAddress, Port, Family, - rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} || - {IPAddress, Family} <- getaddr(Host, Family0)]. - -check_tcp_listener_address_auto(NamePrefix, Port) -> - lists:append([check_tcp_listener_address(NamePrefix, Listener) || + tcp_listener_addresses({Host, Port, auto}); +tcp_listener_addresses({Host, Port, Family0}) + when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> + [{IPAddress, Port, Family} || + {IPAddress, Family} <- getaddr(Host, Family0)]; +tcp_listener_addresses({_Host, Port, _Family0}) -> + error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), + throw({error, {invalid_port, Port}}). + +tcp_listener_addresses_auto(Port) -> + lists:append([tcp_listener_addresses(Listener) || Listener <- port_to_listeners(Port)]). +tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts, + Protocol, Label, OnConnect) -> + {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), + {tcp_listener_sup, start_link, + [IPAddress, Port, [Family | SocketOpts], + {?MODULE, tcp_listener_started, [Protocol]}, + {?MODULE, tcp_listener_stopped, [Protocol]}, + OnConnect, Label]}, + transient, infinity, supervisor, [tcp_listener_sup]}. + start_tcp_listener(Listener) -> start_listener(Listener, amqp, "TCP Listener", {?MODULE, start_client, []}). @@ -235,27 +213,21 @@ start_ssl_listener(Listener, SslOpts) -> {?MODULE, start_ssl_client, [SslOpts]}). start_listener(Listener, Protocol, Label, OnConnect) -> - [start_listener0(Spec, Protocol, Label, OnConnect) || - Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)], + [start_listener0(Address, Protocol, Label, OnConnect) || + Address <- tcp_listener_addresses(Listener)], ok. -start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) -> - {ok,_} = supervisor:start_child( - rabbit_sup, - {Name, - {tcp_listener_sup, start_link, - [IPAddress, Port, [Family | tcp_opts()], - {?MODULE, tcp_listener_started, [Protocol]}, - {?MODULE, tcp_listener_stopped, [Protocol]}, - OnConnect, Label]}, - transient, infinity, supervisor, [tcp_listener_sup]}). +start_listener0(Address, Protocol, Label, OnConnect) -> + Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(), + Protocol, Label, OnConnect), + {ok,_} = supervisor:start_child(rabbit_sup, Spec). stop_tcp_listener(Listener) -> - [stop_tcp_listener0(Spec) || - Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)], + [stop_tcp_listener0(Address) || + Address <- tcp_listener_addresses(Listener)], ok. -stop_tcp_listener0({IPAddress, Port, _Family, Name}) -> +stop_tcp_listener0({IPAddress, Port, _Family}) -> Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name). @@ -363,6 +335,38 @@ tcp_opts() -> {ok, Opts} = application:get_env(rabbit, tcp_listen_options), Opts. +%% inet_parse:address takes care of ip string, like "0.0.0.0" +%% inet:getaddr returns immediately for ip tuple {0,0,0,0}, +%% and runs 'inet_gethost' port process for dns lookups. +%% On Windows inet:getaddr runs dns resolver for ip string, which may fail. +getaddr(Host, Family) -> + case inet_parse:address(Host) of + {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}]; + {error, _} -> gethostaddr(Host, Family) + end. + +gethostaddr(Host, auto) -> + Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]], + case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of + [] -> host_lookup_error(Host, Lookups); + IPs -> IPs + end; + +gethostaddr(Host, Family) -> + case inet:getaddr(Host, Family) of + {ok, IPAddress} -> [{IPAddress, Family}]; + {error, Reason} -> host_lookup_error(Host, Reason) + end. + +host_lookup_error(Host, Reason) -> + error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), + throw({error, {invalid_host, Host, Reason}}). + +resolve_family({_,_,_,_}, auto) -> inet; +resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6; +resolve_family(IP, auto) -> throw({error, {strange_family, IP}}); +resolve_family(_, F) -> F. + %%-------------------------------------------------------------------- %% There are three kinds of machine (for our purposes). diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl index cda3ccbe0f..1a08efed41 100644 --- a/src/rabbit_restartable_sup.erl +++ b/src/rabbit_restartable_sup.erl @@ -28,7 +28,8 @@ -ifdef(use_specs). --spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: (atom(), rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 2db960acc9..ae2b5d3f46 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -28,12 +28,9 @@ binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, internal_user/0, - username/0, password/0, password_hash/0, ok/1, error/1, - ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, - connection_exit/0]). - --type(channel_exit() :: no_return()). --type(connection_exit() :: no_return()). + username/0, password/0, password_hash/0, + ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, + channel_exit/0, connection_exit/0, mfargs/0]). -type(maybe(T) :: T | 'none'). -type(vhost() :: binary()). @@ -156,4 +153,9 @@ -type(ok_or_error2(A, B) :: ok(A) | error(B)). -type(ok_pid_or_error() :: ok_or_error2(pid(), any())). +-type(channel_exit() :: no_return()). +-type(connection_exit() :: no_return()). + +-type(mfargs() :: {atom(), atom(), [any()]}). + -endif. % use_specs diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl index 4c835598e0..cb3dd02c42 100644 --- a/src/tcp_acceptor_sup.erl +++ b/src/tcp_acceptor_sup.erl @@ -25,7 +25,11 @@ %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()). + +-type(mfargs() :: {atom(), atom(), [any()]}). + +-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()). + -endif. %%---------------------------------------------------------------------------- diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index ad2a0d02d0..9a82ac88c1 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -28,9 +28,14 @@ %%---------------------------------------------------------------------------- -ifdef(use_specs). + +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/8 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(), - atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + integer(), atom(), mfargs(), mfargs(), string()) -> + rabbit_types:ok_pid_or_error()). + -endif. %%-------------------------------------------------------------------- diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 5bff5c2701..74297b6dce 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -26,12 +26,16 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/7 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(), - mfa(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + mfargs(), mfargs(), mfargs(), string()) -> + rabbit_types:ok_pid_or_error()). -spec(start_link/8 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(), - mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + mfargs(), mfargs(), mfargs(), integer(), string()) -> + rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 456ff39f47..fcb07a16f4 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -37,10 +37,11 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). --spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). --spec(submit_async/1 :: - (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). +-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). +-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). -spec(idle/1 :: (any()) -> 'ok'). -endif. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 78ab4df3ea..b42530e269 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -29,12 +29,12 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). --spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). --spec(submit_async/2 :: - (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). --spec(run/1 :: (fun (() -> A)) -> A; - ({atom(), atom(), [any()]}) -> any()). +-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). +-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). +-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -endif. |
