summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEssien Ita Essien <essiene@gmail.com>2008-12-16 22:45:45 +0100
committerEssien Ita Essien <essiene@gmail.com>2008-12-16 22:45:45 +0100
commit8b3579d93a0a84d9fa483cc9ce349cafd69ae4cd (patch)
tree559a8206295e55628a7122e26780c01f1f312e42 /src
parent306c0ec272bef14b454e8727adc47282cd571d5f (diff)
parent0cc77d81d21200cce22e19eaeb69f8b1d08e0dbe (diff)
downloadrabbitmq-server-git-8b3579d93a0a84d9fa483cc9ce349cafd69ae4cd.tar.gz
Merge ssl_async_recv branch into bug19356
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_heartbeat.erl4
-rw-r--r--src/rabbit_net.erl80
-rw-r--r--src/rabbit_networking.erl15
-rw-r--r--src/rabbit_reader.erl16
-rw-r--r--src/rabbit_writer.erl4
5 files changed, 97 insertions, 22 deletions
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 0a68c9adad..ed0066fe07 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -53,7 +53,7 @@ start_heartbeat(Sock, TimeoutSec) ->
spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2,
send_oct, 0,
fun () ->
- catch gen_tcp:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
continue
end,
erlang:monitor(process, Parent)) end),
@@ -73,7 +73,7 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) ->
{'DOWN', MonitorRef, process, _Object, _Info} -> ok;
Other -> exit({unexpected_message, Other})
after TimeoutMillisec ->
- case inet:getstat(Sock, [StatName]) of
+ case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
if NewStatVal =/= StatVal ->
F({NewStatVal, 0});
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
new file mode 100644
index 0000000000..979ac718c9
--- /dev/null
+++ b/src/rabbit_net.erl
@@ -0,0 +1,80 @@
+-module(rabbit_net).
+-export([
+ async_recv/3,
+ close/1,
+ controlling_process/2,
+ getstat/2,
+ peername/1,
+ port_command/2,
+ send/2,
+ sockname/1
+ ]).
+
+-include("rabbit.hrl").
+
+
+async_recv(Sock, Length, Timeout) when is_record(Sock, rabbit_ssl_socket) ->
+ Pid = self(),
+ Ref = make_ref(),
+
+ Fun = fun() ->
+ case ssl:recv(Sock#rabbit_ssl_socket.ssl, Length, Timeout) of
+ {ok, Data} ->
+ Pid ! {inet_async, Sock, Ref, {ok, Data}};
+ {error, Reason} ->
+ Pid ! {inet_async, Sock, Ref, {error, Reason}}
+ end
+ end,
+
+ spawn(Fun),
+ {ok, Ref};
+
+async_recv(Sock, Length, Timeout) when is_port(Sock) ->
+ prim_inet:async_recv(Sock, Length, Timeout).
+
+close(Sock) when is_record(Sock, rabbit_ssl_socket) ->
+ ssl:close(Sock#rabbit_ssl_socket.ssl);
+
+close(Sock) when is_port(Sock) ->
+ gen_tcp:close(Sock).
+
+
+controlling_process(Sock, Pid) when is_record(Sock, rabbit_ssl_socket) ->
+ ssl:controlling_process(Sock#rabbit_ssl_socket.ssl, Pid);
+
+controlling_process(Sock, Pid) when is_port(Sock) ->
+ gen_tcp:controlling_process(Sock, Pid).
+
+
+getstat(Sock, Stats) when is_record(Sock, rabbit_ssl_socket) ->
+ inet:getstat(Sock#rabbit_ssl_socket.tcp, Stats);
+
+getstat(Sock, Stats) when is_port(Sock) ->
+ inet:getstat(Sock, Stats).
+
+
+peername(Sock) when is_record(Sock, rabbit_ssl_socket) ->
+ ssl:peername(Sock#rabbit_ssl_socket.ssl);
+
+peername(Sock) when is_port(Sock) ->
+ inet:peername(Sock).
+
+
+port_command(Sock, Data) when is_record(Sock, rabbit_ssl_socket) ->
+ ssl:send(Sock#rabbit_ssl_socket.ssl, Data);
+
+port_command(Sock, Data) when is_port(Sock) ->
+ erlang:port_command(Sock, Data).
+
+send(Sock, Data) when is_record(Sock, rabbit_ssl_socket) ->
+ ssl:send(Sock#rabbit_ssl_socket.ssl, Data);
+
+send(Sock, Data) when is_port(Sock) ->
+ gen_tcp:send(Sock, Data).
+
+
+sockname(Sock) when is_record(Sock, rabbit_ssl_socket) ->
+ ssl:sockname(Sock#rabbit_ssl_socket.ssl);
+
+sockname(Sock) when is_port(Sock) ->
+ inet:sockname(Sock).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 136b5b9c79..66233ed424 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -39,7 +39,7 @@
-export([check_tcp_listener_address/3]).
-export([tcp_listener_started/2, ssl_connection_upgrade/2,
- tcp_listener_stopped/2, start_client/1, start_ssl_client/1]).
+ tcp_listener_stopped/2, start_client/1]).
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
@@ -171,31 +171,26 @@ on_node_down(Node) ->
start_client(Sock) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
- ok = gen_tcp:controlling_process(Sock, Child),
+ ok = rabbit_net:controlling_process(Sock, Child),
Child ! {go, Sock},
Child.
ssl_connection_upgrade(SslOpts, Sock) ->
- {ok, {PeerAddress, PeerPort}} = inet:peername(Sock),
+ {ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock),
PeerIp = inet_parse:ntoa(PeerAddress),
case ssl:ssl_accept(Sock, SslOpts) of
{ok, SslSock} ->
error_logger:info_msg("Upgraded TCP connection from ~s:~p to SSL/TLS~n",
[PeerIp, PeerPort]),
- start_ssl_client(SslSock);
+ RabbitSslSock = #rabbit_ssl_socket{tcp=Sock, ssl=SslSock},
+ start_client(RabbitSslSock);
{error, Reason} ->
error_logger:error_msg("Failed to upgrade TCP connection from ~s:~p to SSL~n",
[PeerIp, PeerPort]),
{error, Reason}
end.
-start_ssl_client(Sock) ->
- {ok, {PeerAddress, PeerPort}} = ssl:peername(Sock),
- PeerIp = inet_parse:ntoa(PeerAddress),
- error_logger:info_msg("Dummy session started for ssl client from ~s:~p~n",
- [PeerIp, PeerPort]).
-
connections() ->
[Pid || {_, Pid, _, _} <- supervisor:which_children(
rabbit_tcp_client_sup)].
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3f8d7cac5f..678f1e3fbc 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -199,7 +199,7 @@ inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
peername(Sock) ->
try
- {Address, Port} = inet_op(fun () -> inet:peername(Sock) end),
+ {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end),
AddressS = inet_parse:ntoa(Address),
{AddressS, Port}
catch
@@ -316,7 +316,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end.
switch_callback(OldState, NewCallback, Length) ->
- Ref = inet_op(fun () -> prim_inet:async_recv(
+ Ref = inet_op(fun () -> rabbit_net:async_recv(
OldState#v1.sock, Length, -1) end),
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
@@ -523,7 +523,7 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
end;
handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = inet_op(fun () -> gen_tcp:send(
+ ok = inet_op(fun () -> rabbit_net:send(
Sock, <<"AMQP",1,1,
?PROTOCOL_VERSION_MAJOR,
?PROTOCOL_VERSION_MINOR>>) end),
@@ -659,23 +659,23 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, #v1{}) ->
self();
i(address, #v1{sock = Sock}) ->
- {ok, {A, _}} = inet:sockname(Sock),
+ {ok, {A, _}} = rabbit_net:sockname(Sock),
A;
i(port, #v1{sock = Sock}) ->
- {ok, {_, P}} = inet:sockname(Sock),
+ {ok, {_, P}} = rabbit_net:sockname(Sock),
P;
i(peer_address, #v1{sock = Sock}) ->
- {ok, {A, _}} = inet:peername(Sock),
+ {ok, {A, _}} = rabbit_net:peername(Sock),
A;
i(peer_port, #v1{sock = Sock}) ->
- {ok, {_, P}} = inet:peername(Sock),
+ {ok, {_, P}} = rabbit_net:peername(Sock),
P;
i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
- case inet:getstat(Sock, [SockStat]) of
+ case rabbit_net:getstat(Sock, [SockStat]) of
{ok, [{SockStat, StatVal}]} -> StatVal;
{error, einval} -> undefined;
{error, Error} -> throw({cannot_get_socket_stats, Error})
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 9cf9f8aef9..5ca294b762 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -139,7 +139,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
- fun () -> gen_tcp:send(Sock, Data) end).
+ fun () -> rabbit_net:send(Sock, Data) end).
internal_send_command(Sock, Channel, MethodRecord) ->
ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
@@ -176,6 +176,6 @@ internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
ok.
port_cmd(Sock, Data) ->
- try erlang:port_command(Sock, Data)
+ try rabbit_net:port_command(Sock, Data)
catch error:Error -> exit({writer, send_failed, Error})
end.