diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-09-04 13:38:29 +0100 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-09-04 13:38:29 +0100 |
| commit | 7624bf5f6ec5b90ac30e32c923677988976cefc4 (patch) | |
| tree | 8eb288fd6125e37d3009d1e73892fac8fee300c0 | |
| parent | ff5544e78cee7bb82a93923bac9633d75a57f143 (diff) | |
| parent | 7ec27ca5a43f58a408176c225d54bf13f910fb72 (diff) | |
| download | rabbitmq-server-git-7624bf5f6ec5b90ac30e32c923677988976cefc4.tar.gz | |
merge bug25119
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 71 | ||||
| -rw-r--r-- | src/rabbit_net.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 5 |
4 files changed, 73 insertions, 44 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index ef43d96e5f..964c3e2415 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -80,14 +80,12 @@ synchronised }). -start_link(Q) -> - gen_server2:start_link(?MODULE, Q, []). +start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -info(QPid) -> - gen_server2:call(QPid, info, infinity). +info(QPid) -> gen_server2:call(QPid, info, infinity). init(#amqqueue { name = QueueName } = Q) -> %% We join the GM group before we add ourselves to the amqqueue @@ -351,14 +349,10 @@ prioritise_info(Msg, _State) -> %% GM %% --------------------------------------------------------------------------- -joined([SPid], _Members) -> - SPid ! {joined, self()}, - ok. +joined([SPid], _Members) -> SPid ! {joined, self()}, ok. -members_changed([_SPid], _Births, []) -> - ok; -members_changed([SPid], _Births, Deaths) -> - inform_deaths(SPid, Deaths). +members_changed([_SPid], _Births, []) -> ok; +members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths). handle_msg([_SPid], _From, master_changed) -> ok; @@ -675,26 +669,24 @@ maybe_enqueue_message( %% msg_seq_no was at the time. We do now! ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { sender_queues = SQ1, - msg_id_status = dict:erase(MsgId, MS) }; + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 }; {ok, {published, ChPid}} -> %% It was published to the BQ and we didn't know the %% msg_seq_no so couldn't confirm it at the time. - case needs_confirming(Delivery, State1) of - never -> - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), - sender_queues = SQ1 }; - eventually -> - State1 #state { - msg_id_status = - dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; - immediately -> - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), - sender_queues = SQ1 } - end; + {MS1, SQ1} = + case needs_confirming(Delivery, State1) of + never -> {dict:erase(MsgId, MS), + remove_from_pending_ch(MsgId, ChPid, SQ)}; + eventually -> MMS = {published, ChPid, MsgSeqNo}, + {dict:store(MsgId, MMS, MS), SQ}; + immediately -> ok = rabbit_misc:confirm_to_sender( + ChPid, [MsgSeqNo]), + {dict:erase(MsgId, MS), + remove_from_pending_ch(MsgId, ChPid, SQ)} + end, + State1 #state { msg_id_status = MS1, + sender_queues = SQ1 }; {ok, discarded} -> %% We've already heard from GM that the msg is to be %% discarded. We won't see this again. @@ -743,18 +735,17 @@ process_instruction( msg_seq_no = MsgSeqNo, message = #basic_message { id = MsgId } }, _EnqueueOnPromotion}}, MQ2} -> - %% We received the msg from the channel first. Thus we - %% need to deal with confirms here. - case needs_confirming(Delivery, State1) of - never -> - {MQ2, PendingCh, MS}; - eventually -> - {MQ2, PendingCh, - dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; - immediately -> - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), - {MQ2, PendingCh, MS} - end; + {MQ2, PendingCh, + %% We received the msg from the channel first. Thus + %% we need to deal with confirms here. + case needs_confirming(Delivery, State1) of + never -> MS; + eventually -> MMS = {published, ChPid, MsgSeqNo}, + dict:store(MsgId, MMS , MS); + immediately -> ok = rabbit_misc:confirm_to_sender( + ChPid, [MsgSeqNo]), + MS + end}; {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 698a7c9a0f..038154c36e 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -77,6 +77,8 @@ %%--------------------------------------------------------------------------- +-define(SSL_CLOSE_TIMEOUT, 5000). + -define(IS_SSL(Sock), is_record(Sock, ssl_socket)). is_ssl(Sock) -> ?IS_SSL(Sock). @@ -148,8 +150,31 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl); close(Sock) when is_port(Sock) -> gen_tcp:close(Sock). -fast_close(Sock) when ?IS_SSL(Sock) -> ok; -fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok. +fast_close(Sock) when ?IS_SSL(Sock) -> + %% We cannot simply port_close the underlying tcp socket since the + %% TLS protocol is quite insistent that a proper closing handshake + %% should take place (see RFC 5245 s7.2.1). So we call ssl:close + %% instead, but that can block for a very long time, e.g. when + %% there is lots of pending output and there is tcp backpressure, + %% or the ssl_connection process has entered the the + %% workaround_transport_delivery_problems function during + %% termination, which, inexplicably, does a gen_tcp:recv(Socket, + %% 0), which may never return if the client doesn't send a FIN or + %% that gets swallowed by the network. Since there is no timeout + %% variant of ssl:close, we construct our own. + {Pid, MRef} = spawn_monitor(fun () -> ssl:close(Sock#ssl_socket.ssl) end), + erlang:send_after(?SSL_CLOSE_TIMEOUT, self(), {Pid, ssl_close_timeout}), + receive + {Pid, ssl_close_timeout} -> + erlang:demonitor(MRef, [flush]), + exit(Pid, kill); + {'DOWN', MRef, process, Pid, _Reason} -> + ok + end, + catch port_close(Sock#ssl_socket.tcp), + ok; +fast_close(Sock) when is_port(Sock) -> + catch port_close(Sock), ok. sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 94a5a2b79d..2d0ded1239 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -160,7 +160,19 @@ ssl_transform_fun(SslOpts) -> case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of {ok, SslSock} -> {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; + {error, timeout} -> + {error, {ssl_upgrade_error, timeout}}; {error, Reason} -> + %% We have no idea what state the ssl_connection + %% process is in - it could still be happily + %% going, it might be stuck, or it could be just + %% about to fail. There is little that our caller + %% can do but close the TCP socket, but this could + %% cause ssl alerts to get dropped (which is bad + %% form, according to the TLS spec). So we give + %% the ssl_connection a little bit of time to send + %% such alerts. + timer:sleep(?SSL_TIMEOUT * 1000), {error, {ssl_upgrade_error, Reason}}; {'EXIT', Reason} -> {error, {ssl_upgrade_failure, Reason}} diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index bd20deb22d..aef48b2030 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -184,6 +184,8 @@ socket_op(Sock, Fun) -> {ok, Res} -> Res; {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n", [self(), Reason]), + %% NB: this is tcp socket, even in case of ssl + rabbit_net:fast_close(Sock), exit(normal) end. @@ -242,8 +244,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, %% controlling process and hence its termination will close %% the socket. However, to keep the file_handle_cache %% accounting as accurate as possible we ought to close the - %% socket w/o delay before termination. fast_close does that, - %% though only for non-ssl sockets. + %% socket w/o delay before termination. rabbit_net:fast_close(ClientSock), rabbit_event:notify(connection_closed, [{pid, self()}]) end, |
