summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl39
-rw-r--r--src/rabbit_reader.erl2
2 files changed, 19 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d5a0ca38a7..cc0192b0b4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -306,18 +306,19 @@ handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
{hibernate, State#ch{stats_timer = StatsTimer1}}.
-terminate(_Reason, State = #ch{state = terminating}) ->
- terminate(State);
-
terminate(Reason, State) ->
- Res = rollback_and_notify(State),
- case Reason of
- normal -> ok = Res;
- shutdown -> ok = Res;
- {shutdown, _Term} -> ok = Res;
- _ -> ok
+ case State#ch.state of
+ closing -> ok;
+ _ -> Res = rollback_and_notify(State),
+ case Reason of
+ normal -> ok = Res;
+ shutdown -> ok = Res;
+ {shutdown, _Term} -> ok = Res;
+ _ -> ok
+ end
end,
- terminate(State).
+ pg_local:leave(rabbit_channels, self()),
+ rabbit_event:notify(channel_closed, [{pid, self()}]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -355,24 +356,22 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
- ok = rollback_and_notify(State),
- Reader ! {channel_exit, Channel, Reason},
- State#ch{state = terminating}.
-
send_exception(Reason, State = #ch{channel = Channel,
writer_pid = WriterPid,
protocol = Protocol,
reader_pid = ReaderPid}) ->
{_ShouldClose, CloseChannel, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ %% something bad's happened: rollback_and_notify make not be 'ok'
+ rollback_and_notify(State),
+ State1 = State#ch{state = closing},
case CloseChannel of
Channel ->
ok = rabbit_writer:send_command(WriterPid, CloseMethod),
- {noreply, State#ch{state = closing}};
+ {noreply, State1};
_ ->
ReaderPid ! {channel_exit, Channel, Reason},
- {stop, normal, State}
+ {stop, normal, State1}
end.
return_queue_declare_ok(#resource{name = ActualName},
@@ -559,6 +558,8 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid,
Self = self(),
ReaderPid ! {channel_closing, Channel,
fun () -> ok = gen_server2:cast(Self, closed) end},
+ %% no error, so rollback_and_notify should be 'ok'. Do in parallel
+ %% with the reader picking up our message and running our Fun.
ok = rollback_and_notify(State),
{noreply, State#ch{state = closing}};
@@ -1336,10 +1337,6 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-terminate(_State) ->
- pg_local:leave(rabbit_channels, self()),
- rabbit_event:notify(channel_closed, [{pid, self()}]).
-
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _) -> self();
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 1beb49d331..a267837d57 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -446,7 +446,7 @@ close_channel(Channel, State) ->
handle_dependent_exit(ChPid, Reason, State) ->
case termination_kind(Reason) of
controlled ->
- erase({ch_pid, ChPid}),
+ channel_cleanup(ChPid),
maybe_close(State);
uncontrolled ->
case channel_cleanup(ChPid) of