From 135575b3ffc83fa32218278a066b5a872a21ff85 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 13 Sep 2021 11:23:35 +0100 Subject: Stream reader: close osiris logs and sockets in terminate Instead of injecting it into varios places inside the code. When the osiris log is closed it will decrement the global "readers" counter which is why it is much safer to do this in terminate. --- deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 412be10650..e0b4337c87 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -173,11 +173,15 @@ callback_mode() -> [state_functions, state_enter]. -terminate(Reason, State, StatemData) -> +terminate(Reason, State, + #statem_data{transport = Transport, + connection = #stream_connection{socket = Socket}, + connection_state = ConnectionState} = StatemData) -> + close(Transport, Socket, ConnectionState), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(StatemData), - rabbit_log:debug("~p terminating in state '~s' with reason '~p'", - [?MODULE, State, Reason]). + rabbit_log:debug("~s terminating in state '~s' with reason '~W'", + [?MODULE, State, Reason, 10]). start_link(KeepaliveSup, Transport, Ref, Opts) -> {ok, @@ -713,7 +717,6 @@ open(info, {OK, S, Data}, #stream_connection{connection_step = Step} = Connection1, case Step of closing -> - close(Transport, S, State), stop; close_sent -> rabbit_log_connection:debug("Transitioned to close_sent"), @@ -808,7 +811,6 @@ open(info, heartbeat_send, rabbit_log_connection:info("Heartbeat send error ~p, closing connection", [Unexpected]), _C1 = demonitor_all_streams(Connection), - close(Transport, S, State), stop end; open(info, heartbeat_timeout, @@ -817,7 +819,6 @@ open(info, heartbeat_timeout, connection_state = State}) -> rabbit_log_connection:debug("Heartbeat timeout, closing connection"), _C1 = demonitor_all_streams(Connection), - close(Transport, S, State), stop; open(info, {infos, From}, #statem_data{connection = @@ -857,7 +858,6 @@ open({call, From}, {shutdown, Explanation}, rabbit_log_connection:info("Forcing stream connection ~p closing: ~p", [self(), Explanation]), demonitor_all_streams(Connection), - close(Transport, S, State), {stop_and_reply, normal, {reply, From, ok}}; open(cast, {queue_event, _, {osiris_written, _, undefined, CorrelationList}}, @@ -1060,7 +1060,6 @@ close_sent(state_timeout, close, connection_state = State}) -> rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.", [?FUNCTION_NAME]), - close(Transport, Socket, State), stop; close_sent(info, {tcp, S, Data}, #statem_data{transport = Transport, @@ -1075,7 +1074,6 @@ close_sent(info, {tcp, S, Data}, [?FUNCTION_NAME, Step]), case Step of closing_done -> - close(Transport, S, State1), stop; _ -> Transport:setopts(S, [{active, once}]), @@ -1091,7 +1089,6 @@ close_sent(info, {tcp_error, S, Reason}, #statem_data{transport = Transport, connection_state = State}) -> rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]", [Reason, S, self()]), - close(Transport, S, State), stop; close_sent(info, {resource_alarm, IsThereAlarm}, StatemData = #statem_data{connection = Connection}) -> -- cgit v1.2.1