diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-01 19:20:06 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-01 19:20:06 +0000 |
| commit | 707a504e264b26322a44cee4f0748de0ca964d04 (patch) | |
| tree | 42bc1340eef5968c9143fae518244a32fad9a200 /src | |
| parent | 685ed08edacd82e871fadac07b97d8cf6d15a029 (diff) | |
| download | rabbitmq-server-git-707a504e264b26322a44cee4f0748de0ca964d04.tar.gz | |
replace rabbit_framing_channel with tiny state machine
...which we store in the reader's process dict
This simplifies the interactions between the channel and the reader.
Note that the reader now monitors the channel processes rather than
their sups. This makes more sense since it interacts with the former
but never the latter.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel_sup.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_channel_sup_sup.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_framing_channel.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 110 |
4 files changed, 66 insertions, 103 deletions
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 02199a6516..83e91f6c4e 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -50,7 +50,7 @@ rabbit_channel:channel_number(), non_neg_integer(), pid(), rabbit_access_control:username(), rabbit_types:vhost(), pid()}). --spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}). +-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). -endif. @@ -72,13 +72,8 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, [Channel, ReaderPid, WriterPid, Username, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), - {ok, FramingChannelPid} = - supervisor2:start_child( - SupPid, - {framing_channel, {rabbit_framing_channel, start_link, - [ReaderPid, ChannelPid, Protocol]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}), - {ok, SupPid, FramingChannelPid}. + {ok, FramingState} = rabbit_framing_channel:init(Protocol), + {ok, SupPid, {ChannelPid, FramingState}}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 21c39780a5..fd99af5636 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -43,7 +43,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> - {'ok', pid(), pid()}). + {'ok', pid(), {pid(), any()}}). -endif. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 9243ea1673..57089fc2c5 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,51 +32,11 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/3, process/2, shutdown/1]). - -%% internal --export([mainloop/3]). - -%%-------------------------------------------------------------------- - -start_link(Parent, ChannelPid, Protocol) -> - {ok, proc_lib:spawn_link( - fun () -> mainloop(Parent, ChannelPid, {method, Protocol}) end)}. - -process(Pid, Frame) -> - Pid ! {frame, Frame}, - ok. - -shutdown(Pid) -> - Pid ! terminate, - ok. +-export([init/1, collect/2]). %%-------------------------------------------------------------------- -mainloop(Parent, ChannelPid, State) -> - Loop = fun (NewState) -> - ?MODULE:mainloop(Parent, ChannelPid, NewState) - end, - receive - {frame, Frame} -> - case collect(Frame, State) of - {ok, NewState} -> - Loop(NewState); - {ok, Method, NewState} -> - rabbit_channel:do(ChannelPid, Method), - Loop(NewState); - {ok, Method, Content, NewState} -> - rabbit_channel:do(ChannelPid, Method, Content), - Loop(NewState); - {error, Reason} -> - Parent ! {channel_exit, self(), Reason} - end; - terminate -> - rabbit_channel:shutdown(ChannelPid), - Loop(State); - Msg -> - exit({unexpected_message, Msg}) - end. +init(Protocol) -> {ok, {method, Protocol}}. collect({method, MethodName, FieldsBin}, {method, Protocol}) -> Method = Protocol:decode_method_fields(MethodName, FieldsBin), @@ -117,6 +77,8 @@ collect(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) -> unexpected_frame("expected content body, " "got non content body frame instead", [], Method). +%%-------------------------------------------------------------------- + empty_content(ClassId, PropertiesBin, Protocol) -> #content{class_id = ClassId, properties = none, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 92a2f4d7fe..9b30401854 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -347,12 +347,12 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); - {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> + {channel_exit, _Channel, E = {writer, send_failed, _Error}} -> throw(E); - {channel_exit, ChannelOrFrPid, Reason} -> - mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); - {'DOWN', _MRef, process, ChSupPid, Reason} -> - mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); + {channel_exit, Channel, Reason} -> + mainloop(Deb, handle_exception(State, Channel, Reason)); + {'DOWN', _MRef, process, ChPid, Reason} -> + mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); terminate_connection -> State; handshake_timeout -> @@ -443,45 +443,32 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. -handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) -> - {channel, Channel} = get({ch_fr_pid, ChFrPid}), - handle_exception(State, Channel, Reason); -handle_channel_exit(Channel, Reason, State) -> - handle_exception(State, Channel, Reason). - -handle_dependent_exit(ChSupPid, Reason, State) -> +handle_dependent_exit(ChPid, Reason, State) -> case termination_kind(Reason) of controlled -> - case erase({ch_sup_pid, ChSupPid}) of - undefined -> ok; - {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr) - end, + erase({ch_pid, ChPid}), maybe_close(State); uncontrolled -> - case channel_cleanup(ChSupPid) of - undefined -> - exit({abnormal_dependent_exit, ChSupPid, Reason}); - Channel -> - maybe_close(handle_exception(State, Channel, Reason)) + case channel_cleanup(ChPid) of + undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); + Channel -> maybe_close( + handle_exception(State, Channel, Reason)) end end. -channel_cleanup(ChSupPid) -> - case get({ch_sup_pid, ChSupPid}) of - undefined -> undefined; - {{channel, Channel}, ChFr} -> erase({channel, Channel}), - erase(ChFr), - erase({ch_sup_pid, ChSupPid}), - Channel +channel_cleanup(ChPid) -> + case get({ch_pid, ChPid}) of + undefined -> undefined; + Channel -> erase({channel, Channel}), + erase({ch_pid, ChPid}), + Channel end. -all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid}, - {_Channel, {ch_fr_pid, ChFrPid}}} <- get()]. +all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()]. terminate_channels() -> NChannels = - length([rabbit_framing_channel:shutdown(ChFrPid) - || ChFrPid <- all_channels()]), + length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -499,10 +486,10 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'DOWN', _MRef, process, ChSupPid, Reason} -> - case channel_cleanup(ChSupPid) of + {'DOWN', _MRef, process, ChPid, Reason} -> + case channel_cleanup(ChPid) of undefined -> - exit({abnormal_dependent_exit, ChSupPid, Reason}); + exit({abnormal_dependent_exit, ChPid, Reason}); Channel -> case termination_kind(Reason) of controlled -> @@ -565,20 +552,23 @@ handle_frame(Type, Channel, Payload, heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> case get({channel, Channel}) of - {ch_fr_pid, ChFrPid} -> - ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame), + {ChPid, FramingState} -> + State1 = process_channel_frame( + AnalyzedFrame, Channel, ChPid, FramingState, + State), case AnalyzedFrame of {method, 'channel.close', _} -> erase({channel, Channel}), - State; + State1; {method, MethodName, _} -> - case (State#v1.connection_state =:= blocking andalso + case (State#v1.connection_state =:= blocking + andalso Protocol:method_has_content(MethodName)) of - true -> State#v1{connection_state = blocked}; - false -> State + true -> State1#v1{connection_state = blocked}; + false -> State1 end; _ -> - State + State1 end; closing -> %% According to the spec, after sending a @@ -601,9 +591,8 @@ handle_frame(Type, Channel, Payload, State; undefined -> case ?IS_RUNNING(State) of - true -> ok = send_to_new_channel( - Channel, AnalyzedFrame, State), - State; + true -> send_to_new_channel( + Channel, AnalyzedFrame, State); false -> throw({channel_frame_while_starting, Channel, State#v1.connection_state, AnalyzedFrame}) @@ -809,7 +798,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, fun() -> internal_emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> - lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), + lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, State = #v1{connection_state = CS, @@ -971,15 +960,32 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, - {ok, ChSupPid, ChFrPid} = + {ok, _ChSupPid, {ChPid, ChFrSt}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector}), - erlang:monitor(process, ChSupPid), - put({channel, Channel}, {ch_fr_pid, ChFrPid}), - put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), - put({ch_fr_pid, ChFrPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame). + erlang:monitor(process, ChPid), + put({channel, Channel}, {ChPid, ChFrSt}), + put({ch_pid, ChPid}, Channel), + process_channel_frame(AnalyzedFrame, Channel, ChPid, ChFrSt, State). + +process_channel_frame(Frame, Channel, ChPid, ChFrSt, State) -> + UpdateFramingState = fun (NewChFrSt) -> + put({channel, Channel}, {ChPid, NewChFrSt}), + State + end, + case rabbit_framing_channel:collect(Frame, ChFrSt) of + {ok, NewChFrSt} -> + UpdateFramingState(NewChFrSt); + {ok, Method, NewChFrSt} -> + rabbit_channel:do(ChPid, Method), + UpdateFramingState(NewChFrSt); + {ok, Method, Content, NewChFrSt} -> + rabbit_channel:do(ChPid, Method, Content), + UpdateFramingState(NewChFrSt); + {error, Reason} -> + handle_exception(State, Channel, Reason) + end. log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", |
