summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-04-07 14:30:47 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-04-07 14:30:47 +0100
commit9c93d407b264e0b93c13fb6c7bcf5c5d82ea6b1c (patch)
tree38f5f88a747d9361866089c5273ad764f55593ca /src
parent92654d7c845f60dd339e03cca7ccef0c35688976 (diff)
downloadrabbitmq-server-git-9c93d407b264e0b93c13fb6c7bcf5c5d82ea6b1c.tar.gz
make ssl work
...and handle socket errors It turns out that for active sockets the messages sent by tcp and ssl sockets to the controlling process differ gratuitously.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_net.erl22
-rw-r--r--src/rabbit_reader.erl146
2 files changed, 93 insertions, 75 deletions
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index c8514d9094..b6cc28afd1 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
- async_recv/3, port_command/2, setopts/2, send/2, close/1,
+ recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
sockname/1, peername/1, peercert/1]).
%%---------------------------------------------------------------------------
@@ -42,6 +42,9 @@
-spec(getstat/2 ::
(socket(), [stat_option()])
-> ok_val_or_error([{stat_option(), integer()}])).
+-spec(recv/1 :: (socket()) ->
+ {'data', [char()] | binary()} | 'closed' |
+ rabbit_types:error(any()) | {'other', any()}).
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
@@ -83,6 +86,23 @@ getstat(Sock, Stats) when ?IS_SSL(Sock) ->
getstat(Sock, Stats) when is_port(Sock) ->
inet:getstat(Sock, Stats).
+recv(Sock) when ?IS_SSL(Sock) ->
+ S = Sock#ssl_socket.ssl,
+ receive
+ {ssl, S, Data} -> {data, Data};
+ {ssl_closed, S} -> closed;
+ {ssl_error, S, Reason} -> {error, Reason};
+ Other -> {other, Other}
+ end;
+recv(Sock) ->
+ S = Sock,
+ receive
+ {tcp, S, Data} -> {data, Data};
+ {tcp_closed, S} -> closed;
+ {tcp_error, S, Reason} -> {error, Reason};
+ Other -> {other, Other}
+ end.
+
async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) ->
Pid = self(),
Ref = make_ref(),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e210dba182..4dcb744641 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -252,82 +252,80 @@ recvloop(Deb, State = #v1{sock = Sock, recv_length = Length, buf = Buf}) ->
State#v1{buf = [Rest]}))
end.
-mainloop(Deb, State = #v1{parent = Parent, sock = Sock}) ->
- receive
- {tcp, Sock, Data} ->
- recvloop(Deb, State#v1{buf = [Data | State#v1.buf],
- pending_recv = false});
- {tcp_closed, Sock} ->
- if State#v1.connection_state =:= closed ->
- State;
- true ->
- throw(connection_closed_abruptly)
- end;
- {conserve_memory, Conserve} ->
- recvloop(Deb, internal_conserve_memory(Conserve, State));
- {channel_closing, ChPid} ->
- ok = rabbit_channel:ready_for_close(ChPid),
- channel_cleanup(ChPid),
- mainloop(Deb, State);
- {'EXIT', Parent, Reason} ->
- terminate(io_lib:format("broker forced connection closure "
- "with reason '~w'", [Reason]), State),
- %% this is what we are expected to do according to
- %% http://www.erlang.org/doc/man/sys.html
- %%
- %% If we wanted to be *really* nice we should wait for a
- %% while for clients to close the socket at their end,
- %% just as we do in the ordinary error case. However,
- %% since this termination is initiated by our parent it is
- %% probably more important to exit quickly.
- exit(Reason);
- {channel_exit, _Channel, E = {writer, send_failed, _Error}} ->
- throw(E);
- {channel_exit, Channel, Reason} ->
- mainloop(Deb, handle_exception(State, Channel, Reason));
- {'DOWN', _MRef, process, ChPid, Reason} ->
- mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
- terminate_connection ->
- State;
- handshake_timeout ->
- if ?IS_RUNNING(State) orelse
- State#v1.connection_state =:= closing orelse
- State#v1.connection_state =:= closed ->
- mainloop(Deb, State);
- true ->
- throw({handshake_timeout, State#v1.callback})
- end;
- timeout ->
- case State#v1.connection_state of
- closed -> mainloop(Deb, State);
- S -> throw({timeout, S})
- end;
- {'$gen_call', From, {shutdown, Explanation}} ->
- {ForceTermination, NewState} = terminate(Explanation, State),
- gen_server:reply(From, ok),
- case ForceTermination of
- force -> ok;
- normal -> mainloop(Deb, NewState)
- end;
- {'$gen_call', From, info} ->
- gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Deb, State);
- {'$gen_call', From, {info, Items}} ->
- gen_server:reply(From, try {ok, infos(Items, State)}
- catch Error -> {error, Error}
- end),
- mainloop(Deb, State);
- {'$gen_cast', emit_stats} ->
- State1 = internal_emit_stats(State),
- mainloop(Deb, State1);
- {system, From, Request} ->
- sys:handle_system_msg(Request, From,
- Parent, ?MODULE, Deb, State);
- Other ->
- %% internal error -> something worth dying for
- exit({unexpected_message, Other})
+mainloop(Deb, State = #v1{sock = Sock}) ->
+ case rabbit_net:recv(Sock) of
+ {data, Data} -> recvloop(Deb, State#v1{buf = [Data | State#v1.buf],
+ pending_recv = false});
+ closed -> if State#v1.connection_state =:= closed ->
+ State;
+ true ->
+ throw(connection_closed_abruptly)
+ end;
+ {error, Reason} -> throw({inet_error, Reason});
+ {other, Other} -> handle_other(Other, Deb, State)
end.
+handle_other({conserve_memory, Conserve}, Deb, State) ->
+ recvloop(Deb, internal_conserve_memory(Conserve, State));
+handle_other({channel_closing, ChPid}, Deb, State) ->
+ ok = rabbit_channel:ready_for_close(ChPid),
+ channel_cleanup(ChPid),
+ mainloop(Deb, State);
+handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
+ terminate(io_lib:format("broker forced connection closure "
+ "with reason '~w'", [Reason]), State),
+ %% this is what we are expected to do according to
+ %% http://www.erlang.org/doc/man/sys.html
+ %%
+ %% If we wanted to be *really* nice we should wait for a while for
+ %% clients to close the socket at their end, just as we do in the
+ %% ordinary error case. However, since this termination is
+ %% initiated by our parent it is probably more important to exit
+ %% quickly.
+ exit(Reason);
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
+ _Deb, _State) ->
+ throw(E);
+handle_other({channel_exit, Channel, Reason}, Deb, State) ->
+ mainloop(Deb, handle_exception(State, Channel, Reason));
+handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
+ mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
+handle_other(terminate_connection, _Deb, State) ->
+ State;
+handle_other(handshake_timeout, Deb, State)
+ when ?IS_RUNNING(State) orelse
+ State#v1.connection_state =:= closing orelse
+ State#v1.connection_state =:= closed ->
+ mainloop(Deb, State);
+handle_other(handshake_timeout, _Deb, State) ->
+ throw({handshake_timeout, State#v1.callback});
+handle_other(timeout, Deb, State = #v1{connection_state = closed}) ->
+ mainloop(Deb, State);
+handle_other(timeout, _Deb, #v1{connection_state = S}) ->
+ throw({timeout, S});
+handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
+ {ForceTermination, NewState} = terminate(Explanation, State),
+ gen_server:reply(From, ok),
+ case ForceTermination of
+ force -> ok;
+ normal -> mainloop(Deb, NewState)
+ end;
+handle_other({'$gen_call', From, info}, Deb, State) ->
+ gen_server:reply(From, infos(?INFO_KEYS, State)),
+ mainloop(Deb, State);
+handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
+ gen_server:reply(From, try {ok, infos(Items, State)}
+ catch Error -> {error, Error}
+ end),
+ mainloop(Deb, State);
+handle_other({'$gen_cast', emit_stats}, Deb, State) ->
+ mainloop(Deb, internal_emit_stats(State));
+handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
+handle_other(Other, _Deb, _State) ->
+ %% internal error -> something worth dying for
+ exit({unexpected_message, Other}).
+
switch_callback(State = #v1{connection_state = blocked,
heartbeater = Heartbeater}, Callback, Length) ->
ok = rabbit_heartbeat:pause_monitor(Heartbeater),