summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-01 19:20:06 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-01 19:20:06 +0000
commit707a504e264b26322a44cee4f0748de0ca964d04 (patch)
tree42bc1340eef5968c9143fae518244a32fad9a200
parent685ed08edacd82e871fadac07b97d8cf6d15a029 (diff)
downloadrabbitmq-server-git-707a504e264b26322a44cee4f0748de0ca964d04.tar.gz
replace rabbit_framing_channel with tiny state machine
...which we store in the reader's process dict This simplifies the interactions between the channel and the reader. Note that the reader now monitors the channel processes rather than their sups. This makes more sense since it interacts with the former but never the latter.
-rw-r--r--src/rabbit_channel_sup.erl11
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_framing_channel.erl46
-rw-r--r--src/rabbit_reader.erl110
4 files changed, 66 insertions, 103 deletions
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 02199a6516..83e91f6c4e 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -50,7 +50,7 @@
rabbit_channel:channel_number(), non_neg_integer(), pid(),
rabbit_access_control:username(), rabbit_types:vhost(), pid()}).
--spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}).
+-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
-endif.
@@ -72,13 +72,8 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
[Channel, ReaderPid, WriterPid, Username, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
- {ok, FramingChannelPid} =
- supervisor2:start_child(
- SupPid,
- {framing_channel, {rabbit_framing_channel, start_link,
- [ReaderPid, ChannelPid, Protocol]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
- {ok, SupPid, FramingChannelPid}.
+ {ok, FramingState} = rabbit_framing_channel:init(Protocol),
+ {ok, SupPid, {ChannelPid, FramingState}}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 21c39780a5..fd99af5636 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -43,7 +43,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) ->
- {'ok', pid(), pid()}).
+ {'ok', pid(), {pid(), any()}}).
-endif.
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 9243ea1673..57089fc2c5 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -32,51 +32,11 @@
-module(rabbit_framing_channel).
-include("rabbit.hrl").
--export([start_link/3, process/2, shutdown/1]).
-
-%% internal
--export([mainloop/3]).
-
-%%--------------------------------------------------------------------
-
-start_link(Parent, ChannelPid, Protocol) ->
- {ok, proc_lib:spawn_link(
- fun () -> mainloop(Parent, ChannelPid, {method, Protocol}) end)}.
-
-process(Pid, Frame) ->
- Pid ! {frame, Frame},
- ok.
-
-shutdown(Pid) ->
- Pid ! terminate,
- ok.
+-export([init/1, collect/2]).
%%--------------------------------------------------------------------
-mainloop(Parent, ChannelPid, State) ->
- Loop = fun (NewState) ->
- ?MODULE:mainloop(Parent, ChannelPid, NewState)
- end,
- receive
- {frame, Frame} ->
- case collect(Frame, State) of
- {ok, NewState} ->
- Loop(NewState);
- {ok, Method, NewState} ->
- rabbit_channel:do(ChannelPid, Method),
- Loop(NewState);
- {ok, Method, Content, NewState} ->
- rabbit_channel:do(ChannelPid, Method, Content),
- Loop(NewState);
- {error, Reason} ->
- Parent ! {channel_exit, self(), Reason}
- end;
- terminate ->
- rabbit_channel:shutdown(ChannelPid),
- Loop(State);
- Msg ->
- exit({unexpected_message, Msg})
- end.
+init(Protocol) -> {ok, {method, Protocol}}.
collect({method, MethodName, FieldsBin}, {method, Protocol}) ->
Method = Protocol:decode_method_fields(MethodName, FieldsBin),
@@ -117,6 +77,8 @@ collect(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) ->
unexpected_frame("expected content body, "
"got non content body frame instead", [], Method).
+%%--------------------------------------------------------------------
+
empty_content(ClassId, PropertiesBin, Protocol) ->
#content{class_id = ClassId,
properties = none,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 92a2f4d7fe..9b30401854 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -347,12 +347,12 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
exit(Reason);
- {channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
+ {channel_exit, _Channel, E = {writer, send_failed, _Error}} ->
throw(E);
- {channel_exit, ChannelOrFrPid, Reason} ->
- mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
+ {channel_exit, Channel, Reason} ->
+ mainloop(Deb, handle_exception(State, Channel, Reason));
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -443,45 +443,32 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
-handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) ->
- {channel, Channel} = get({ch_fr_pid, ChFrPid}),
- handle_exception(State, Channel, Reason);
-handle_channel_exit(Channel, Reason, State) ->
- handle_exception(State, Channel, Reason).
-
-handle_dependent_exit(ChSupPid, Reason, State) ->
+handle_dependent_exit(ChPid, Reason, State) ->
case termination_kind(Reason) of
controlled ->
- case erase({ch_sup_pid, ChSupPid}) of
- undefined -> ok;
- {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr)
- end,
+ erase({ch_pid, ChPid}),
maybe_close(State);
uncontrolled ->
- case channel_cleanup(ChSupPid) of
- undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
- Channel ->
- maybe_close(handle_exception(State, Channel, Reason))
+ case channel_cleanup(ChPid) of
+ undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
+ Channel -> maybe_close(
+ handle_exception(State, Channel, Reason))
end
end.
-channel_cleanup(ChSupPid) ->
- case get({ch_sup_pid, ChSupPid}) of
- undefined -> undefined;
- {{channel, Channel}, ChFr} -> erase({channel, Channel}),
- erase(ChFr),
- erase({ch_sup_pid, ChSupPid}),
- Channel
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ Channel -> erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ Channel
end.
-all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid},
- {_Channel, {ch_fr_pid, ChFrPid}}} <- get()].
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()].
terminate_channels() ->
NChannels =
- length([rabbit_framing_channel:shutdown(ChFrPid)
- || ChFrPid <- all_channels()]),
+ length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -499,10 +486,10 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- case channel_cleanup(ChSupPid) of
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ case channel_cleanup(ChPid) of
undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
+ exit({abnormal_dependent_exit, ChPid, Reason});
Channel ->
case termination_kind(Reason) of
controlled ->
@@ -565,20 +552,23 @@ handle_frame(Type, Channel, Payload,
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
case get({channel, Channel}) of
- {ch_fr_pid, ChFrPid} ->
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
+ {ChPid, FramingState} ->
+ State1 = process_channel_frame(
+ AnalyzedFrame, Channel, ChPid, FramingState,
+ State),
case AnalyzedFrame of
{method, 'channel.close', _} ->
erase({channel, Channel}),
- State;
+ State1;
{method, MethodName, _} ->
- case (State#v1.connection_state =:= blocking andalso
+ case (State#v1.connection_state =:= blocking
+ andalso
Protocol:method_has_content(MethodName)) of
- true -> State#v1{connection_state = blocked};
- false -> State
+ true -> State1#v1{connection_state = blocked};
+ false -> State1
end;
_ ->
- State
+ State1
end;
closing ->
%% According to the spec, after sending a
@@ -601,9 +591,8 @@ handle_frame(Type, Channel, Payload,
State;
undefined ->
case ?IS_RUNNING(State) of
- true -> ok = send_to_new_channel(
- Channel, AnalyzedFrame, State),
- State;
+ true -> send_to_new_channel(
+ Channel, AnalyzedFrame, State);
false -> throw({channel_frame_while_starting,
Channel, State#v1.connection_state,
AnalyzedFrame})
@@ -809,7 +798,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
fun() -> internal_emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
- lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
State = #v1{connection_state = CS,
@@ -971,15 +960,32 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
- {ok, ChSupPid, ChFrPid} =
+ {ok, _ChSupPid, {ChPid, ChFrSt}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {Protocol, Sock, Channel, FrameMax,
self(), Username, VHost, Collector}),
- erlang:monitor(process, ChSupPid),
- put({channel, Channel}, {ch_fr_pid, ChFrPid}),
- put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
- put({ch_fr_pid, ChFrPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame).
+ erlang:monitor(process, ChPid),
+ put({channel, Channel}, {ChPid, ChFrSt}),
+ put({ch_pid, ChPid}, Channel),
+ process_channel_frame(AnalyzedFrame, Channel, ChPid, ChFrSt, State).
+
+process_channel_frame(Frame, Channel, ChPid, ChFrSt, State) ->
+ UpdateFramingState = fun (NewChFrSt) ->
+ put({channel, Channel}, {ChPid, NewChFrSt}),
+ State
+ end,
+ case rabbit_framing_channel:collect(Frame, ChFrSt) of
+ {ok, NewChFrSt} ->
+ UpdateFramingState(NewChFrSt);
+ {ok, Method, NewChFrSt} ->
+ rabbit_channel:do(ChPid, Method),
+ UpdateFramingState(NewChFrSt);
+ {ok, Method, Content, NewChFrSt} ->
+ rabbit_channel:do(ChPid, Method, Content),
+ UpdateFramingState(NewChFrSt);
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
+ end.
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",