diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-10 16:11:12 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-10 16:11:12 +0000 |
| commit | 090521e9a8b01c1286d3ba15f045f627f9845d84 (patch) | |
| tree | 54db57ed67d3776b7f3fb362600219a0639b5855 /src | |
| parent | cc6a2cffd0e8c4bb1a13e5acca2fc3d18d3867ee (diff) | |
| download | rabbitmq-server-git-090521e9a8b01c1286d3ba15f045f627f9845d84.tar.gz | |
Make client-initiated channel.close work in new scheme
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 12 |
3 files changed, 21 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e80d881809..d0a1e1f7a2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -219,16 +219,12 @@ handle_cast({method, Method, Content}, State) -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), noreply(NewState); {noreply, NewState} -> - noreply(NewState); - stop -> - {stop, normal, State#ch{state = terminating}} + noreply(NewState) catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), {stop, normal, terminating(Reason#amqp_error{method = MethodName}, State)}; - exit:normal -> - {stop, normal, State}; _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; @@ -236,6 +232,10 @@ handle_cast({method, Method, Content}, State) -> handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State), hibernate}; +handle_cast(closed, State = #ch{state = closing, writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), + {stop, normal, State}; + handle_cast(terminate, State) -> {stop, normal, State}; @@ -529,10 +529,13 @@ handle_method(#'channel.open'{}, _, _State) -> handle_method(_Method, _, #ch{state = starting}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); -handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> +handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid, + channel = Channel}) -> + Self = self(), + ReaderPid ! {channel_closing, Channel, + fun () -> ok = gen_server2:cast(Self, closed) end}, ok = rollback_and_notify(State), - ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), - stop; + {noreply, State#ch{state = closing}}; handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1781469a13..1beb49d331 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -328,6 +328,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw({inet_error, Reason}); {conserve_memory, Conserve} -> mainloop(Deb, internal_conserve_memory(Conserve, State)); + {channel_closing, Channel, Fun} -> + ok = Fun(), + erase({channel, Channel}), + mainloop(Deb, State); {'EXIT', Parent, Reason} -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -552,9 +556,6 @@ handle_frame(Type, Channel, Payload, Channel, ChPid, FramingState), put({channel, Channel}, {ChPid, NewAState}), case AnalyzedFrame of - {method, 'channel.close', _} -> - erase({channel, Channel}), - State; {method, MethodName, _} -> case (State#v1.connection_state =:= blocking andalso diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 58c369b5a3..82eb1bebc2 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1019,9 +1019,9 @@ test_user_management() -> test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - user(<<"user">>), <<"/">>, self(), - fun (_) -> {ok, self()} end), + {ok, Ch} = rabbit_channel:start_link( + rabbit_framing_amqp_0_9_1, 1, self(), Writer, user(<<"user">>), + <<"/">>, self(), fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1079,9 +1079,9 @@ test_server_status() -> test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, - user(<<"guest">>), <<"/">>, self(), - fun (_) -> {ok, self()} end), + {ok, Ch} = rabbit_channel:start_link( + rabbit_framing_amqp_0_9_1, 1, Me, Writer, user(<<"guest">>), + <<"/">>, self(), fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) |
