diff options
| author | Essien Ita Essien <essiene@gmail.com> | 2008-12-16 22:45:45 +0100 |
|---|---|---|
| committer | Essien Ita Essien <essiene@gmail.com> | 2008-12-16 22:45:45 +0100 |
| commit | 8b3579d93a0a84d9fa483cc9ce349cafd69ae4cd (patch) | |
| tree | 559a8206295e55628a7122e26780c01f1f312e42 /src | |
| parent | 306c0ec272bef14b454e8727adc47282cd571d5f (diff) | |
| parent | 0cc77d81d21200cce22e19eaeb69f8b1d08e0dbe (diff) | |
| download | rabbitmq-server-git-8b3579d93a0a84d9fa483cc9ce349cafd69ae4cd.tar.gz | |
Merge ssl_async_recv branch into bug19356
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_heartbeat.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_net.erl | 80 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 4 |
5 files changed, 97 insertions, 22 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 0a68c9adad..ed0066fe07 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -53,7 +53,7 @@ start_heartbeat(Sock, TimeoutSec) -> spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - catch gen_tcp:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), + catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), continue end, erlang:monitor(process, Parent)) end), @@ -73,7 +73,7 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) -> {'DOWN', MonitorRef, process, _Object, _Info} -> ok; Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> - case inet:getstat(Sock, [StatName]) of + case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> if NewStatVal =/= StatVal -> F({NewStatVal, 0}); diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl new file mode 100644 index 0000000000..979ac718c9 --- /dev/null +++ b/src/rabbit_net.erl @@ -0,0 +1,80 @@ +-module(rabbit_net). +-export([ + async_recv/3, + close/1, + controlling_process/2, + getstat/2, + peername/1, + port_command/2, + send/2, + sockname/1 + ]). + +-include("rabbit.hrl"). + + +async_recv(Sock, Length, Timeout) when is_record(Sock, rabbit_ssl_socket) -> + Pid = self(), + Ref = make_ref(), + + Fun = fun() -> + case ssl:recv(Sock#rabbit_ssl_socket.ssl, Length, Timeout) of + {ok, Data} -> + Pid ! {inet_async, Sock, Ref, {ok, Data}}; + {error, Reason} -> + Pid ! {inet_async, Sock, Ref, {error, Reason}} + end + end, + + spawn(Fun), + {ok, Ref}; + +async_recv(Sock, Length, Timeout) when is_port(Sock) -> + prim_inet:async_recv(Sock, Length, Timeout). + +close(Sock) when is_record(Sock, rabbit_ssl_socket) -> + ssl:close(Sock#rabbit_ssl_socket.ssl); + +close(Sock) when is_port(Sock) -> + gen_tcp:close(Sock). + + +controlling_process(Sock, Pid) when is_record(Sock, rabbit_ssl_socket) -> + ssl:controlling_process(Sock#rabbit_ssl_socket.ssl, Pid); + +controlling_process(Sock, Pid) when is_port(Sock) -> + gen_tcp:controlling_process(Sock, Pid). + + +getstat(Sock, Stats) when is_record(Sock, rabbit_ssl_socket) -> + inet:getstat(Sock#rabbit_ssl_socket.tcp, Stats); + +getstat(Sock, Stats) when is_port(Sock) -> + inet:getstat(Sock, Stats). + + +peername(Sock) when is_record(Sock, rabbit_ssl_socket) -> + ssl:peername(Sock#rabbit_ssl_socket.ssl); + +peername(Sock) when is_port(Sock) -> + inet:peername(Sock). + + +port_command(Sock, Data) when is_record(Sock, rabbit_ssl_socket) -> + ssl:send(Sock#rabbit_ssl_socket.ssl, Data); + +port_command(Sock, Data) when is_port(Sock) -> + erlang:port_command(Sock, Data). + +send(Sock, Data) when is_record(Sock, rabbit_ssl_socket) -> + ssl:send(Sock#rabbit_ssl_socket.ssl, Data); + +send(Sock, Data) when is_port(Sock) -> + gen_tcp:send(Sock, Data). + + +sockname(Sock) when is_record(Sock, rabbit_ssl_socket) -> + ssl:sockname(Sock#rabbit_ssl_socket.ssl); + +sockname(Sock) when is_port(Sock) -> + inet:sockname(Sock). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 136b5b9c79..66233ed424 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -39,7 +39,7 @@ -export([check_tcp_listener_address/3]). -export([tcp_listener_started/2, ssl_connection_upgrade/2, - tcp_listener_stopped/2, start_client/1, start_ssl_client/1]). + tcp_listener_stopped/2, start_client/1]). -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). @@ -171,31 +171,26 @@ on_node_down(Node) -> start_client(Sock) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - ok = gen_tcp:controlling_process(Sock, Child), + ok = rabbit_net:controlling_process(Sock, Child), Child ! {go, Sock}, Child. ssl_connection_upgrade(SslOpts, Sock) -> - {ok, {PeerAddress, PeerPort}} = inet:peername(Sock), + {ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock), PeerIp = inet_parse:ntoa(PeerAddress), case ssl:ssl_accept(Sock, SslOpts) of {ok, SslSock} -> error_logger:info_msg("Upgraded TCP connection from ~s:~p to SSL/TLS~n", [PeerIp, PeerPort]), - start_ssl_client(SslSock); + RabbitSslSock = #rabbit_ssl_socket{tcp=Sock, ssl=SslSock}, + start_client(RabbitSslSock); {error, Reason} -> error_logger:error_msg("Failed to upgrade TCP connection from ~s:~p to SSL~n", [PeerIp, PeerPort]), {error, Reason} end. -start_ssl_client(Sock) -> - {ok, {PeerAddress, PeerPort}} = ssl:peername(Sock), - PeerIp = inet_parse:ntoa(PeerAddress), - error_logger:info_msg("Dummy session started for ssl client from ~s:~p~n", - [PeerIp, PeerPort]). - connections() -> [Pid || {_, Pid, _, _} <- supervisor:which_children( rabbit_tcp_client_sup)]. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3f8d7cac5f..678f1e3fbc 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -199,7 +199,7 @@ inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). peername(Sock) -> try - {Address, Port} = inet_op(fun () -> inet:peername(Sock) end), + {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end), AddressS = inet_parse:ntoa(Address), {AddressS, Port} catch @@ -316,7 +316,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end. switch_callback(OldState, NewCallback, Length) -> - Ref = inet_op(fun () -> prim_inet:async_recv( + Ref = inet_op(fun () -> rabbit_net:async_recv( OldState#v1.sock, Length, -1) end), OldState#v1{callback = NewCallback, recv_ref = Ref}. @@ -523,7 +523,7 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, end; handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = inet_op(fun () -> gen_tcp:send( + ok = inet_op(fun () -> rabbit_net:send( Sock, <<"AMQP",1,1, ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR>>) end), @@ -659,23 +659,23 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, #v1{}) -> self(); i(address, #v1{sock = Sock}) -> - {ok, {A, _}} = inet:sockname(Sock), + {ok, {A, _}} = rabbit_net:sockname(Sock), A; i(port, #v1{sock = Sock}) -> - {ok, {_, P}} = inet:sockname(Sock), + {ok, {_, P}} = rabbit_net:sockname(Sock), P; i(peer_address, #v1{sock = Sock}) -> - {ok, {A, _}} = inet:peername(Sock), + {ok, {A, _}} = rabbit_net:peername(Sock), A; i(peer_port, #v1{sock = Sock}) -> - {ok, {_, P}} = inet:peername(Sock), + {ok, {_, P}} = rabbit_net:peername(Sock), P; i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; SockStat =:= recv_cnt; SockStat =:= send_oct; SockStat =:= send_cnt; SockStat =:= send_pend -> - case inet:getstat(Sock, [SockStat]) of + case rabbit_net:getstat(Sock, [SockStat]) of {ok, [{SockStat, StatVal}]} -> StatVal; {error, einval} -> undefined; {error, Error} -> throw({cannot_get_socket_stats, Error}) diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 9cf9f8aef9..5ca294b762 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -139,7 +139,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) -> tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, - fun () -> gen_tcp:send(Sock, Data) end). + fun () -> rabbit_net:send(Sock, Data) end). internal_send_command(Sock, Channel, MethodRecord) -> ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). @@ -176,6 +176,6 @@ internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> ok. port_cmd(Sock, Data) -> - try erlang:port_command(Sock, Data) + try rabbit_net:port_command(Sock, Data) catch error:Error -> exit({writer, send_failed, Error}) end. |
