diff options
| -rw-r--r-- | src/rabbit_channel_sup.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_channel_sup_sup.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_framing_channel.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 110 |
4 files changed, 66 insertions, 103 deletions
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 02199a6516..83e91f6c4e 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -50,7 +50,7 @@ rabbit_channel:channel_number(), non_neg_integer(), pid(), rabbit_access_control:username(), rabbit_types:vhost(), pid()}). --spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}). +-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). -endif. @@ -72,13 +72,8 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, [Channel, ReaderPid, WriterPid, Username, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), - {ok, FramingChannelPid} = - supervisor2:start_child( - SupPid, - {framing_channel, {rabbit_framing_channel, start_link, - [ReaderPid, ChannelPid, Protocol]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}), - {ok, SupPid, FramingChannelPid}. + {ok, FramingState} = rabbit_framing_channel:init(Protocol), + {ok, SupPid, {ChannelPid, FramingState}}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 21c39780a5..fd99af5636 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -43,7 +43,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> - {'ok', pid(), pid()}). + {'ok', pid(), {pid(), any()}}). -endif. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 9243ea1673..57089fc2c5 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,51 +32,11 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/3, process/2, shutdown/1]). - -%% internal --export([mainloop/3]). - -%%-------------------------------------------------------------------- - -start_link(Parent, ChannelPid, Protocol) -> - {ok, proc_lib:spawn_link( - fun () -> mainloop(Parent, ChannelPid, {method, Protocol}) end)}. - -process(Pid, Frame) -> - Pid ! {frame, Frame}, - ok. - -shutdown(Pid) -> - Pid ! terminate, - ok. +-export([init/1, collect/2]). %%-------------------------------------------------------------------- -mainloop(Parent, ChannelPid, State) -> - Loop = fun (NewState) -> - ?MODULE:mainloop(Parent, ChannelPid, NewState) - end, - receive - {frame, Frame} -> - case collect(Frame, State) of - {ok, NewState} -> - Loop(NewState); - {ok, Method, NewState} -> - rabbit_channel:do(ChannelPid, Method), - Loop(NewState); - {ok, Method, Content, NewState} -> - rabbit_channel:do(ChannelPid, Method, Content), - Loop(NewState); - {error, Reason} -> - Parent ! {channel_exit, self(), Reason} - end; - terminate -> - rabbit_channel:shutdown(ChannelPid), - Loop(State); - Msg -> - exit({unexpected_message, Msg}) - end. +init(Protocol) -> {ok, {method, Protocol}}. collect({method, MethodName, FieldsBin}, {method, Protocol}) -> Method = Protocol:decode_method_fields(MethodName, FieldsBin), @@ -117,6 +77,8 @@ collect(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) -> unexpected_frame("expected content body, " "got non content body frame instead", [], Method). +%%-------------------------------------------------------------------- + empty_content(ClassId, PropertiesBin, Protocol) -> #content{class_id = ClassId, properties = none, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 92a2f4d7fe..9b30401854 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -347,12 +347,12 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); - {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> + {channel_exit, _Channel, E = {writer, send_failed, _Error}} -> throw(E); - {channel_exit, ChannelOrFrPid, Reason} -> - mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); - {'DOWN', _MRef, process, ChSupPid, Reason} -> - mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); + {channel_exit, Channel, Reason} -> + mainloop(Deb, handle_exception(State, Channel, Reason)); + {'DOWN', _MRef, process, ChPid, Reason} -> + mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); terminate_connection -> State; handshake_timeout -> @@ -443,45 +443,32 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. -handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) -> - {channel, Channel} = get({ch_fr_pid, ChFrPid}), - handle_exception(State, Channel, Reason); -handle_channel_exit(Channel, Reason, State) -> - handle_exception(State, Channel, Reason). - -handle_dependent_exit(ChSupPid, Reason, State) -> +handle_dependent_exit(ChPid, Reason, State) -> case termination_kind(Reason) of controlled -> - case erase({ch_sup_pid, ChSupPid}) of - undefined -> ok; - {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr) - end, + erase({ch_pid, ChPid}), maybe_close(State); uncontrolled -> - case channel_cleanup(ChSupPid) of - undefined -> - exit({abnormal_dependent_exit, ChSupPid, Reason}); - Channel -> - maybe_close(handle_exception(State, Channel, Reason)) + case channel_cleanup(ChPid) of + undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); + Channel -> maybe_close( + handle_exception(State, Channel, Reason)) end end. -channel_cleanup(ChSupPid) -> - case get({ch_sup_pid, ChSupPid}) of - undefined -> undefined; - {{channel, Channel}, ChFr} -> erase({channel, Channel}), - erase(ChFr), - erase({ch_sup_pid, ChSupPid}), - Channel +channel_cleanup(ChPid) -> + case get({ch_pid, ChPid}) of + undefined -> undefined; + Channel -> erase({channel, Channel}), + erase({ch_pid, ChPid}), + Channel end. -all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid}, - {_Channel, {ch_fr_pid, ChFrPid}}} <- get()]. +all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()]. terminate_channels() -> NChannels = - length([rabbit_framing_channel:shutdown(ChFrPid) - || ChFrPid <- all_channels()]), + length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -499,10 +486,10 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'DOWN', _MRef, process, ChSupPid, Reason} -> - case channel_cleanup(ChSupPid) of + {'DOWN', _MRef, process, ChPid, Reason} -> + case channel_cleanup(ChPid) of undefined -> - exit({abnormal_dependent_exit, ChSupPid, Reason}); + exit({abnormal_dependent_exit, ChPid, Reason}); Channel -> case termination_kind(Reason) of controlled -> @@ -565,20 +552,23 @@ handle_frame(Type, Channel, Payload, heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> case get({channel, Channel}) of - {ch_fr_pid, ChFrPid} -> - ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame), + {ChPid, FramingState} -> + State1 = process_channel_frame( + AnalyzedFrame, Channel, ChPid, FramingState, + State), case AnalyzedFrame of {method, 'channel.close', _} -> erase({channel, Channel}), - State; + State1; {method, MethodName, _} -> - case (State#v1.connection_state =:= blocking andalso + case (State#v1.connection_state =:= blocking + andalso Protocol:method_has_content(MethodName)) of - true -> State#v1{connection_state = blocked}; - false -> State + true -> State1#v1{connection_state = blocked}; + false -> State1 end; _ -> - State + State1 end; closing -> %% According to the spec, after sending a @@ -601,9 +591,8 @@ handle_frame(Type, Channel, Payload, State; undefined -> case ?IS_RUNNING(State) of - true -> ok = send_to_new_channel( - Channel, AnalyzedFrame, State), - State; + true -> send_to_new_channel( + Channel, AnalyzedFrame, State); false -> throw({channel_frame_while_starting, Channel, State#v1.connection_state, AnalyzedFrame}) @@ -809,7 +798,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, fun() -> internal_emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> - lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), + lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, State = #v1{connection_state = CS, @@ -971,15 +960,32 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, - {ok, ChSupPid, ChFrPid} = + {ok, _ChSupPid, {ChPid, ChFrSt}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector}), - erlang:monitor(process, ChSupPid), - put({channel, Channel}, {ch_fr_pid, ChFrPid}), - put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), - put({ch_fr_pid, ChFrPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame). + erlang:monitor(process, ChPid), + put({channel, Channel}, {ChPid, ChFrSt}), + put({ch_pid, ChPid}, Channel), + process_channel_frame(AnalyzedFrame, Channel, ChPid, ChFrSt, State). + +process_channel_frame(Frame, Channel, ChPid, ChFrSt, State) -> + UpdateFramingState = fun (NewChFrSt) -> + put({channel, Channel}, {ChPid, NewChFrSt}), + State + end, + case rabbit_framing_channel:collect(Frame, ChFrSt) of + {ok, NewChFrSt} -> + UpdateFramingState(NewChFrSt); + {ok, Method, NewChFrSt} -> + rabbit_channel:do(ChPid, Method), + UpdateFramingState(NewChFrSt); + {ok, Method, Content, NewChFrSt} -> + rabbit_channel:do(ChPid, Method, Content), + UpdateFramingState(NewChFrSt); + {error, Reason} -> + handle_exception(State, Channel, Reason) + end. log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", |
