summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-09-04 13:38:29 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-09-04 13:38:29 +0100
commit7624bf5f6ec5b90ac30e32c923677988976cefc4 (patch)
tree8eb288fd6125e37d3009d1e73892fac8fee300c0
parentff5544e78cee7bb82a93923bac9633d75a57f143 (diff)
parent7ec27ca5a43f58a408176c225d54bf13f910fb72 (diff)
downloadrabbitmq-server-git-7624bf5f6ec5b90ac30e32c923677988976cefc4.tar.gz
merge bug25119
-rw-r--r--src/rabbit_mirror_queue_slave.erl71
-rw-r--r--src/rabbit_net.erl29
-rw-r--r--src/rabbit_networking.erl12
-rw-r--r--src/rabbit_reader.erl5
4 files changed, 73 insertions, 44 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index ef43d96e5f..964c3e2415 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -80,14 +80,12 @@
synchronised
}).
-start_link(Q) ->
- gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-info(QPid) ->
- gen_server2:call(QPid, info, infinity).
+info(QPid) -> gen_server2:call(QPid, info, infinity).
init(#amqqueue { name = QueueName } = Q) ->
%% We join the GM group before we add ourselves to the amqqueue
@@ -351,14 +349,10 @@ prioritise_info(Msg, _State) ->
%% GM
%% ---------------------------------------------------------------------------
-joined([SPid], _Members) ->
- SPid ! {joined, self()},
- ok.
+joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, []) ->
- ok;
-members_changed([SPid], _Births, Deaths) ->
- inform_deaths(SPid, Deaths).
+members_changed([_SPid], _Births, []) -> ok;
+members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths).
handle_msg([_SPid], _From, master_changed) ->
ok;
@@ -675,26 +669,24 @@ maybe_enqueue_message(
%% msg_seq_no was at the time. We do now!
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { sender_queues = SQ1,
- msg_id_status = dict:erase(MsgId, MS) };
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 };
{ok, {published, ChPid}} ->
%% It was published to the BQ and we didn't know the
%% msg_seq_no so couldn't confirm it at the time.
- case needs_confirming(Delivery, State1) of
- never ->
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 };
- eventually ->
- State1 #state {
- msg_id_status =
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 }
- end;
+ {MS1, SQ1} =
+ case needs_confirming(Delivery, State1) of
+ never -> {dict:erase(MsgId, MS),
+ remove_from_pending_ch(MsgId, ChPid, SQ)};
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ {dict:store(MsgId, MMS, MS), SQ};
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ {dict:erase(MsgId, MS),
+ remove_from_pending_ch(MsgId, ChPid, SQ)}
+ end,
+ State1 #state { msg_id_status = MS1,
+ sender_queues = SQ1 };
{ok, discarded} ->
%% We've already heard from GM that the msg is to be
%% discarded. We won't see this again.
@@ -743,18 +735,17 @@ process_instruction(
msg_seq_no = MsgSeqNo,
message = #basic_message { id = MsgId } },
_EnqueueOnPromotion}}, MQ2} ->
- %% We received the msg from the channel first. Thus we
- %% need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
- never ->
- {MQ2, PendingCh, MS};
- eventually ->
- {MQ2, PendingCh,
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- {MQ2, PendingCh, MS}
- end;
+ {MQ2, PendingCh,
+ %% We received the msg from the channel first. Thus
+ %% we need to deal with confirms here.
+ case needs_confirming(Delivery, State1) of
+ never -> MS;
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ dict:store(MsgId, MMS , MS);
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ MS
+ end};
{{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 698a7c9a0f..038154c36e 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -77,6 +77,8 @@
%%---------------------------------------------------------------------------
+-define(SSL_CLOSE_TIMEOUT, 5000).
+
-define(IS_SSL(Sock), is_record(Sock, ssl_socket)).
is_ssl(Sock) -> ?IS_SSL(Sock).
@@ -148,8 +150,31 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl);
close(Sock) when is_port(Sock) -> gen_tcp:close(Sock).
-fast_close(Sock) when ?IS_SSL(Sock) -> ok;
-fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok.
+fast_close(Sock) when ?IS_SSL(Sock) ->
+ %% We cannot simply port_close the underlying tcp socket since the
+ %% TLS protocol is quite insistent that a proper closing handshake
+ %% should take place (see RFC 5245 s7.2.1). So we call ssl:close
+ %% instead, but that can block for a very long time, e.g. when
+ %% there is lots of pending output and there is tcp backpressure,
+ %% or the ssl_connection process has entered the the
+ %% workaround_transport_delivery_problems function during
+ %% termination, which, inexplicably, does a gen_tcp:recv(Socket,
+ %% 0), which may never return if the client doesn't send a FIN or
+ %% that gets swallowed by the network. Since there is no timeout
+ %% variant of ssl:close, we construct our own.
+ {Pid, MRef} = spawn_monitor(fun () -> ssl:close(Sock#ssl_socket.ssl) end),
+ erlang:send_after(?SSL_CLOSE_TIMEOUT, self(), {Pid, ssl_close_timeout}),
+ receive
+ {Pid, ssl_close_timeout} ->
+ erlang:demonitor(MRef, [flush]),
+ exit(Pid, kill);
+ {'DOWN', MRef, process, Pid, _Reason} ->
+ ok
+ end,
+ catch port_close(Sock#ssl_socket.tcp),
+ ok;
+fast_close(Sock) when is_port(Sock) ->
+ catch port_close(Sock), ok.
sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl);
sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 94a5a2b79d..2d0ded1239 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -160,7 +160,19 @@ ssl_transform_fun(SslOpts) ->
case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
{ok, SslSock} ->
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
+ {error, timeout} ->
+ {error, {ssl_upgrade_error, timeout}};
{error, Reason} ->
+ %% We have no idea what state the ssl_connection
+ %% process is in - it could still be happily
+ %% going, it might be stuck, or it could be just
+ %% about to fail. There is little that our caller
+ %% can do but close the TCP socket, but this could
+ %% cause ssl alerts to get dropped (which is bad
+ %% form, according to the TLS spec). So we give
+ %% the ssl_connection a little bit of time to send
+ %% such alerts.
+ timer:sleep(?SSL_TIMEOUT * 1000),
{error, {ssl_upgrade_error, Reason}};
{'EXIT', Reason} ->
{error, {ssl_upgrade_failure, Reason}}
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index bd20deb22d..aef48b2030 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -184,6 +184,8 @@ socket_op(Sock, Fun) ->
{ok, Res} -> Res;
{error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
[self(), Reason]),
+ %% NB: this is tcp socket, even in case of ssl
+ rabbit_net:fast_close(Sock),
exit(normal)
end.
@@ -242,8 +244,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
%% controlling process and hence its termination will close
%% the socket. However, to keep the file_handle_cache
%% accounting as accurate as possible we ought to close the
- %% socket w/o delay before termination. fast_close does that,
- %% though only for non-ssl sockets.
+ %% socket w/o delay before termination.
rabbit_net:fast_close(ClientSock),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,