diff options
| -rw-r--r-- | src/rabbit_channel.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 2 |
2 files changed, 19 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d5a0ca38a7..cc0192b0b4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -306,18 +306,19 @@ handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), {hibernate, State#ch{stats_timer = StatsTimer1}}. -terminate(_Reason, State = #ch{state = terminating}) -> - terminate(State); - terminate(Reason, State) -> - Res = rollback_and_notify(State), - case Reason of - normal -> ok = Res; - shutdown -> ok = Res; - {shutdown, _Term} -> ok = Res; - _ -> ok + case State#ch.state of + closing -> ok; + _ -> Res = rollback_and_notify(State), + case Reason of + normal -> ok = Res; + shutdown -> ok = Res; + {shutdown, _Term} -> ok = Res; + _ -> ok + end end, - terminate(State). + pg_local:leave(rabbit_channels, self()), + rabbit_event:notify(channel_closed, [{pid, self()}]). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -355,24 +356,22 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> - ok = rollback_and_notify(State), - Reader ! {channel_exit, Channel, Reason}, - State#ch{state = terminating}. - send_exception(Reason, State = #ch{channel = Channel, writer_pid = WriterPid, protocol = Protocol, reader_pid = ReaderPid}) -> {_ShouldClose, CloseChannel, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), + %% something bad's happened: rollback_and_notify make not be 'ok' + rollback_and_notify(State), + State1 = State#ch{state = closing}, case CloseChannel of Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod), - {noreply, State#ch{state = closing}}; + {noreply, State1}; _ -> ReaderPid ! {channel_exit, Channel, Reason}, - {stop, normal, State} + {stop, normal, State1} end. return_queue_declare_ok(#resource{name = ActualName}, @@ -559,6 +558,8 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid, Self = self(), ReaderPid ! {channel_closing, Channel, fun () -> ok = gen_server2:cast(Self, closed) end}, + %% no error, so rollback_and_notify should be 'ok'. Do in parallel + %% with the reader picking up our message and running our Fun. ok = rollback_and_notify(State), {noreply, State#ch{state = closing}}; @@ -1336,10 +1337,6 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -terminate(_State) -> - pg_local:leave(rabbit_channels, self()), - rabbit_event:notify(channel_closed, [{pid, self()}]). - infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _) -> self(); diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1beb49d331..a267837d57 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -446,7 +446,7 @@ close_channel(Channel, State) -> handle_dependent_exit(ChPid, Reason, State) -> case termination_kind(Reason) of controlled -> - erase({ch_pid, ChPid}), + channel_cleanup(ChPid), maybe_close(State); uncontrolled -> case channel_cleanup(ChPid) of |
