summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl19
-rw-r--r--src/rabbit_reader.erl7
-rw-r--r--src/rabbit_tests.erl12
3 files changed, 21 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e80d881809..d0a1e1f7a2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -219,16 +219,12 @@ handle_cast({method, Method, Content}, State) ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
noreply(NewState);
{noreply, NewState} ->
- noreply(NewState);
- stop ->
- {stop, normal, State#ch{state = terminating}}
+ noreply(NewState)
catch
exit:Reason = #amqp_error{} ->
MethodName = rabbit_misc:method_record_type(Method),
{stop, normal, terminating(Reason#amqp_error{method = MethodName},
State)};
- exit:normal ->
- {stop, normal, State};
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
@@ -236,6 +232,10 @@ handle_cast({method, Method, Content}, State) ->
handle_cast({flushed, QPid}, State) ->
{noreply, queue_blocked(QPid, State), hibernate};
+handle_cast(closed, State = #ch{state = closing, writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
+ {stop, normal, State};
+
handle_cast(terminate, State) ->
{stop, normal, State};
@@ -529,10 +529,13 @@ handle_method(#'channel.open'{}, _, _State) ->
handle_method(_Method, _, #ch{state = starting}) ->
rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []);
-handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
+handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid,
+ channel = Channel}) ->
+ Self = self(),
+ ReaderPid ! {channel_closing, Channel,
+ fun () -> ok = gen_server2:cast(Self, closed) end},
ok = rollback_and_notify(State),
- ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
- stop;
+ {noreply, State#ch{state = closing}};
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 1781469a13..1beb49d331 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -328,6 +328,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
throw({inet_error, Reason});
{conserve_memory, Conserve} ->
mainloop(Deb, internal_conserve_memory(Conserve, State));
+ {channel_closing, Channel, Fun} ->
+ ok = Fun(),
+ erase({channel, Channel}),
+ mainloop(Deb, State);
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -552,9 +556,6 @@ handle_frame(Type, Channel, Payload,
Channel, ChPid, FramingState),
put({channel, Channel}, {ChPid, NewAState}),
case AnalyzedFrame of
- {method, 'channel.close', _} ->
- erase({channel, Channel}),
- State;
{method, MethodName, _} ->
case (State#v1.connection_state =:= blocking
andalso
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 58c369b5a3..82eb1bebc2 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1019,9 +1019,9 @@ test_user_management() ->
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- user(<<"user">>), <<"/">>, self(),
- fun (_) -> {ok, self()} end),
+ {ok, Ch} = rabbit_channel:start_link(
+ rabbit_framing_amqp_0_9_1, 1, self(), Writer, user(<<"user">>),
+ <<"/">>, self(), fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
@@ -1079,9 +1079,9 @@ test_server_status() ->
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
- user(<<"guest">>), <<"/">>, self(),
- fun (_) -> {ok, self()} end),
+ {ok, Ch} = rabbit_channel:start_link(
+ rabbit_framing_amqp_0_9_1, 1, Me, Writer, user(<<"guest">>),
+ <<"/">>, self(), fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)