diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2011-04-07 14:30:47 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-04-07 14:30:47 +0100 |
| commit | 9c93d407b264e0b93c13fb6c7bcf5c5d82ea6b1c (patch) | |
| tree | 38f5f88a747d9361866089c5273ad764f55593ca /src | |
| parent | 92654d7c845f60dd339e03cca7ccef0c35688976 (diff) | |
| download | rabbitmq-server-git-9c93d407b264e0b93c13fb6c7bcf5c5d82ea6b1c.tar.gz | |
make ssl work
...and handle socket errors
It turns out that for active sockets the messages sent by tcp and ssl
sockets to the controlling process differ gratuitously.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_net.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 146 |
2 files changed, 93 insertions, 75 deletions
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index c8514d9094..b6cc28afd1 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, - async_recv/3, port_command/2, setopts/2, send/2, close/1, + recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1, sockname/1, peername/1, peercert/1]). %%--------------------------------------------------------------------------- @@ -42,6 +42,9 @@ -spec(getstat/2 :: (socket(), [stat_option()]) -> ok_val_or_error([{stat_option(), integer()}])). +-spec(recv/1 :: (socket()) -> + {'data', [char()] | binary()} | 'closed' | + rabbit_types:error(any()) | {'other', any()}). -spec(async_recv/3 :: (socket(), integer(), timeout()) -> rabbit_types:ok(any())). -spec(port_command/2 :: (socket(), iolist()) -> 'true'). @@ -83,6 +86,23 @@ getstat(Sock, Stats) when ?IS_SSL(Sock) -> getstat(Sock, Stats) when is_port(Sock) -> inet:getstat(Sock, Stats). +recv(Sock) when ?IS_SSL(Sock) -> + S = Sock#ssl_socket.ssl, + receive + {ssl, S, Data} -> {data, Data}; + {ssl_closed, S} -> closed; + {ssl_error, S, Reason} -> {error, Reason}; + Other -> {other, Other} + end; +recv(Sock) -> + S = Sock, + receive + {tcp, S, Data} -> {data, Data}; + {tcp_closed, S} -> closed; + {tcp_error, S, Reason} -> {error, Reason}; + Other -> {other, Other} + end. + async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) -> Pid = self(), Ref = make_ref(), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e210dba182..4dcb744641 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -252,82 +252,80 @@ recvloop(Deb, State = #v1{sock = Sock, recv_length = Length, buf = Buf}) -> State#v1{buf = [Rest]})) end. -mainloop(Deb, State = #v1{parent = Parent, sock = Sock}) -> - receive - {tcp, Sock, Data} -> - recvloop(Deb, State#v1{buf = [Data | State#v1.buf], - pending_recv = false}); - {tcp_closed, Sock} -> - if State#v1.connection_state =:= closed -> - State; - true -> - throw(connection_closed_abruptly) - end; - {conserve_memory, Conserve} -> - recvloop(Deb, internal_conserve_memory(Conserve, State)); - {channel_closing, ChPid} -> - ok = rabbit_channel:ready_for_close(ChPid), - channel_cleanup(ChPid), - mainloop(Deb, State); - {'EXIT', Parent, Reason} -> - terminate(io_lib:format("broker forced connection closure " - "with reason '~w'", [Reason]), State), - %% this is what we are expected to do according to - %% http://www.erlang.org/doc/man/sys.html - %% - %% If we wanted to be *really* nice we should wait for a - %% while for clients to close the socket at their end, - %% just as we do in the ordinary error case. However, - %% since this termination is initiated by our parent it is - %% probably more important to exit quickly. - exit(Reason); - {channel_exit, _Channel, E = {writer, send_failed, _Error}} -> - throw(E); - {channel_exit, Channel, Reason} -> - mainloop(Deb, handle_exception(State, Channel, Reason)); - {'DOWN', _MRef, process, ChPid, Reason} -> - mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); - terminate_connection -> - State; - handshake_timeout -> - if ?IS_RUNNING(State) orelse - State#v1.connection_state =:= closing orelse - State#v1.connection_state =:= closed -> - mainloop(Deb, State); - true -> - throw({handshake_timeout, State#v1.callback}) - end; - timeout -> - case State#v1.connection_state of - closed -> mainloop(Deb, State); - S -> throw({timeout, S}) - end; - {'$gen_call', From, {shutdown, Explanation}} -> - {ForceTermination, NewState} = terminate(Explanation, State), - gen_server:reply(From, ok), - case ForceTermination of - force -> ok; - normal -> mainloop(Deb, NewState) - end; - {'$gen_call', From, info} -> - gen_server:reply(From, infos(?INFO_KEYS, State)), - mainloop(Deb, State); - {'$gen_call', From, {info, Items}} -> - gen_server:reply(From, try {ok, infos(Items, State)} - catch Error -> {error, Error} - end), - mainloop(Deb, State); - {'$gen_cast', emit_stats} -> - State1 = internal_emit_stats(State), - mainloop(Deb, State1); - {system, From, Request} -> - sys:handle_system_msg(Request, From, - Parent, ?MODULE, Deb, State); - Other -> - %% internal error -> something worth dying for - exit({unexpected_message, Other}) +mainloop(Deb, State = #v1{sock = Sock}) -> + case rabbit_net:recv(Sock) of + {data, Data} -> recvloop(Deb, State#v1{buf = [Data | State#v1.buf], + pending_recv = false}); + closed -> if State#v1.connection_state =:= closed -> + State; + true -> + throw(connection_closed_abruptly) + end; + {error, Reason} -> throw({inet_error, Reason}); + {other, Other} -> handle_other(Other, Deb, State) end. +handle_other({conserve_memory, Conserve}, Deb, State) -> + recvloop(Deb, internal_conserve_memory(Conserve, State)); +handle_other({channel_closing, ChPid}, Deb, State) -> + ok = rabbit_channel:ready_for_close(ChPid), + channel_cleanup(ChPid), + mainloop(Deb, State); +handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> + terminate(io_lib:format("broker forced connection closure " + "with reason '~w'", [Reason]), State), + %% this is what we are expected to do according to + %% http://www.erlang.org/doc/man/sys.html + %% + %% If we wanted to be *really* nice we should wait for a while for + %% clients to close the socket at their end, just as we do in the + %% ordinary error case. However, since this termination is + %% initiated by our parent it is probably more important to exit + %% quickly. + exit(Reason); +handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}}, + _Deb, _State) -> + throw(E); +handle_other({channel_exit, Channel, Reason}, Deb, State) -> + mainloop(Deb, handle_exception(State, Channel, Reason)); +handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) -> + mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); +handle_other(terminate_connection, _Deb, State) -> + State; +handle_other(handshake_timeout, Deb, State) + when ?IS_RUNNING(State) orelse + State#v1.connection_state =:= closing orelse + State#v1.connection_state =:= closed -> + mainloop(Deb, State); +handle_other(handshake_timeout, _Deb, State) -> + throw({handshake_timeout, State#v1.callback}); +handle_other(timeout, Deb, State = #v1{connection_state = closed}) -> + mainloop(Deb, State); +handle_other(timeout, _Deb, #v1{connection_state = S}) -> + throw({timeout, S}); +handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) -> + {ForceTermination, NewState} = terminate(Explanation, State), + gen_server:reply(From, ok), + case ForceTermination of + force -> ok; + normal -> mainloop(Deb, NewState) + end; +handle_other({'$gen_call', From, info}, Deb, State) -> + gen_server:reply(From, infos(?INFO_KEYS, State)), + mainloop(Deb, State); +handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> + gen_server:reply(From, try {ok, infos(Items, State)} + catch Error -> {error, Error} + end), + mainloop(Deb, State); +handle_other({'$gen_cast', emit_stats}, Deb, State) -> + mainloop(Deb, internal_emit_stats(State)); +handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); +handle_other(Other, _Deb, _State) -> + %% internal error -> something worth dying for + exit({unexpected_message, Other}). + switch_callback(State = #v1{connection_state = blocked, heartbeater = Heartbeater}, Callback, Length) -> ok = rabbit_heartbeat:pause_monitor(Heartbeater), |
