summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel_sup.erl11
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_framing_channel.erl46
-rw-r--r--src/rabbit_reader.erl110
4 files changed, 66 insertions, 103 deletions
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 02199a6516..83e91f6c4e 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -50,7 +50,7 @@
rabbit_channel:channel_number(), non_neg_integer(), pid(),
rabbit_access_control:username(), rabbit_types:vhost(), pid()}).
--spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}).
+-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
-endif.
@@ -72,13 +72,8 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
[Channel, ReaderPid, WriterPid, Username, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
- {ok, FramingChannelPid} =
- supervisor2:start_child(
- SupPid,
- {framing_channel, {rabbit_framing_channel, start_link,
- [ReaderPid, ChannelPid, Protocol]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
- {ok, SupPid, FramingChannelPid}.
+ {ok, FramingState} = rabbit_framing_channel:init(Protocol),
+ {ok, SupPid, {ChannelPid, FramingState}}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 21c39780a5..fd99af5636 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -43,7 +43,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) ->
- {'ok', pid(), pid()}).
+ {'ok', pid(), {pid(), any()}}).
-endif.
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 9243ea1673..57089fc2c5 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -32,51 +32,11 @@
-module(rabbit_framing_channel).
-include("rabbit.hrl").
--export([start_link/3, process/2, shutdown/1]).
-
-%% internal
--export([mainloop/3]).
-
-%%--------------------------------------------------------------------
-
-start_link(Parent, ChannelPid, Protocol) ->
- {ok, proc_lib:spawn_link(
- fun () -> mainloop(Parent, ChannelPid, {method, Protocol}) end)}.
-
-process(Pid, Frame) ->
- Pid ! {frame, Frame},
- ok.
-
-shutdown(Pid) ->
- Pid ! terminate,
- ok.
+-export([init/1, collect/2]).
%%--------------------------------------------------------------------
-mainloop(Parent, ChannelPid, State) ->
- Loop = fun (NewState) ->
- ?MODULE:mainloop(Parent, ChannelPid, NewState)
- end,
- receive
- {frame, Frame} ->
- case collect(Frame, State) of
- {ok, NewState} ->
- Loop(NewState);
- {ok, Method, NewState} ->
- rabbit_channel:do(ChannelPid, Method),
- Loop(NewState);
- {ok, Method, Content, NewState} ->
- rabbit_channel:do(ChannelPid, Method, Content),
- Loop(NewState);
- {error, Reason} ->
- Parent ! {channel_exit, self(), Reason}
- end;
- terminate ->
- rabbit_channel:shutdown(ChannelPid),
- Loop(State);
- Msg ->
- exit({unexpected_message, Msg})
- end.
+init(Protocol) -> {ok, {method, Protocol}}.
collect({method, MethodName, FieldsBin}, {method, Protocol}) ->
Method = Protocol:decode_method_fields(MethodName, FieldsBin),
@@ -117,6 +77,8 @@ collect(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) ->
unexpected_frame("expected content body, "
"got non content body frame instead", [], Method).
+%%--------------------------------------------------------------------
+
empty_content(ClassId, PropertiesBin, Protocol) ->
#content{class_id = ClassId,
properties = none,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 92a2f4d7fe..9b30401854 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -347,12 +347,12 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
exit(Reason);
- {channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
+ {channel_exit, _Channel, E = {writer, send_failed, _Error}} ->
throw(E);
- {channel_exit, ChannelOrFrPid, Reason} ->
- mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
+ {channel_exit, Channel, Reason} ->
+ mainloop(Deb, handle_exception(State, Channel, Reason));
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -443,45 +443,32 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
-handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) ->
- {channel, Channel} = get({ch_fr_pid, ChFrPid}),
- handle_exception(State, Channel, Reason);
-handle_channel_exit(Channel, Reason, State) ->
- handle_exception(State, Channel, Reason).
-
-handle_dependent_exit(ChSupPid, Reason, State) ->
+handle_dependent_exit(ChPid, Reason, State) ->
case termination_kind(Reason) of
controlled ->
- case erase({ch_sup_pid, ChSupPid}) of
- undefined -> ok;
- {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr)
- end,
+ erase({ch_pid, ChPid}),
maybe_close(State);
uncontrolled ->
- case channel_cleanup(ChSupPid) of
- undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
- Channel ->
- maybe_close(handle_exception(State, Channel, Reason))
+ case channel_cleanup(ChPid) of
+ undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
+ Channel -> maybe_close(
+ handle_exception(State, Channel, Reason))
end
end.
-channel_cleanup(ChSupPid) ->
- case get({ch_sup_pid, ChSupPid}) of
- undefined -> undefined;
- {{channel, Channel}, ChFr} -> erase({channel, Channel}),
- erase(ChFr),
- erase({ch_sup_pid, ChSupPid}),
- Channel
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ Channel -> erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ Channel
end.
-all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid},
- {_Channel, {ch_fr_pid, ChFrPid}}} <- get()].
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()].
terminate_channels() ->
NChannels =
- length([rabbit_framing_channel:shutdown(ChFrPid)
- || ChFrPid <- all_channels()]),
+ length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -499,10 +486,10 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- case channel_cleanup(ChSupPid) of
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ case channel_cleanup(ChPid) of
undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
+ exit({abnormal_dependent_exit, ChPid, Reason});
Channel ->
case termination_kind(Reason) of
controlled ->
@@ -565,20 +552,23 @@ handle_frame(Type, Channel, Payload,
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
case get({channel, Channel}) of
- {ch_fr_pid, ChFrPid} ->
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
+ {ChPid, FramingState} ->
+ State1 = process_channel_frame(
+ AnalyzedFrame, Channel, ChPid, FramingState,
+ State),
case AnalyzedFrame of
{method, 'channel.close', _} ->
erase({channel, Channel}),
- State;
+ State1;
{method, MethodName, _} ->
- case (State#v1.connection_state =:= blocking andalso
+ case (State#v1.connection_state =:= blocking
+ andalso
Protocol:method_has_content(MethodName)) of
- true -> State#v1{connection_state = blocked};
- false -> State
+ true -> State1#v1{connection_state = blocked};
+ false -> State1
end;
_ ->
- State
+ State1
end;
closing ->
%% According to the spec, after sending a
@@ -601,9 +591,8 @@ handle_frame(Type, Channel, Payload,
State;
undefined ->
case ?IS_RUNNING(State) of
- true -> ok = send_to_new_channel(
- Channel, AnalyzedFrame, State),
- State;
+ true -> send_to_new_channel(
+ Channel, AnalyzedFrame, State);
false -> throw({channel_frame_while_starting,
Channel, State#v1.connection_state,
AnalyzedFrame})
@@ -809,7 +798,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
fun() -> internal_emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
- lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
State = #v1{connection_state = CS,
@@ -971,15 +960,32 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
- {ok, ChSupPid, ChFrPid} =
+ {ok, _ChSupPid, {ChPid, ChFrSt}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {Protocol, Sock, Channel, FrameMax,
self(), Username, VHost, Collector}),
- erlang:monitor(process, ChSupPid),
- put({channel, Channel}, {ch_fr_pid, ChFrPid}),
- put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
- put({ch_fr_pid, ChFrPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame).
+ erlang:monitor(process, ChPid),
+ put({channel, Channel}, {ChPid, ChFrSt}),
+ put({ch_pid, ChPid}, Channel),
+ process_channel_frame(AnalyzedFrame, Channel, ChPid, ChFrSt, State).
+
+process_channel_frame(Frame, Channel, ChPid, ChFrSt, State) ->
+ UpdateFramingState = fun (NewChFrSt) ->
+ put({channel, Channel}, {ChPid, NewChFrSt}),
+ State
+ end,
+ case rabbit_framing_channel:collect(Frame, ChFrSt) of
+ {ok, NewChFrSt} ->
+ UpdateFramingState(NewChFrSt);
+ {ok, Method, NewChFrSt} ->
+ rabbit_channel:do(ChPid, Method),
+ UpdateFramingState(NewChFrSt);
+ {ok, Method, Content, NewChFrSt} ->
+ rabbit_channel:do(ChPid, Method, Content),
+ UpdateFramingState(NewChFrSt);
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
+ end.
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",