summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-09 17:05:04 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-09 17:05:04 +0100
commit446c09257ccdf9d9ec04d6ab902d823e31ed4289 (patch)
treead1a341d542aa7f1d3ff8f30f92a47ab3286a4c6 /src
parentc7a372f8b6279577fd5f03943a1eaa261a3522b2 (diff)
parente6e6c6a683aa253995cb4b0984d767bbe8bbc3dd (diff)
downloadrabbitmq-server-git-446c09257ccdf9d9ec04d6ab902d823e31ed4289.tar.gz
Merging default into bug 15930
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_framing_channel.erl76
-rw-r--r--src/rabbit_plugin_activator.erl6
-rw-r--r--src/rabbit_reader.erl3
3 files changed, 50 insertions, 35 deletions
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 6ee5a5550b..08aaafe106 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -35,12 +35,14 @@
-export([start_link/2, process/2, shutdown/1]).
%% internal
--export([mainloop/2]).
+-export([mainloop/3]).
%%--------------------------------------------------------------------
start_link(ChannelPid, Protocol) ->
- {ok, proc_lib:spawn_link(fun () -> mainloop(ChannelPid, Protocol) end)}.
+ Parent = self(),
+ {ok, proc_lib:spawn_link(
+ fun () -> mainloop(Parent, ChannelPid, Protocol) end)}.
process(Pid, Frame) ->
Pid ! {frame, Frame},
@@ -60,46 +62,55 @@ read_frame(ChannelPid) ->
Msg -> exit({unexpected_message, Msg})
end.
-mainloop(ChannelPid, Protocol) ->
+mainloop(Parent, ChannelPid, Protocol) ->
case read_frame(ChannelPid) of
{method, MethodName, FieldsBin} ->
Method = Protocol:decode_method_fields(MethodName, FieldsBin),
case Protocol:method_has_content(MethodName) of
true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
- rabbit_channel:do(ChannelPid, Method,
- collect_content(ChannelPid,
- ClassId,
- Protocol));
- false -> rabbit_channel:do(ChannelPid, Method)
- end,
- ?MODULE:mainloop(ChannelPid, Protocol);
+ case collect_content(ChannelPid, ClassId, Protocol) of
+ {ok, Content} ->
+ rabbit_channel:do(ChannelPid, Method, Content),
+ ?MODULE:mainloop(Parent, ChannelPid, Protocol);
+ {error, Reason} ->
+ channel_exit(Parent, Reason, MethodName)
+ end;
+ false -> rabbit_channel:do(ChannelPid, Method),
+ ?MODULE:mainloop(Parent, ChannelPid, Protocol)
+ end;
_ ->
- unexpected_frame("expected method frame, "
- "got non method frame instead",
- [])
+ channel_exit(Parent, {unexpected_frame,
+ "expected method frame, "
+ "got non method frame instead",
+ []}, none)
end.
collect_content(ChannelPid, ClassId, Protocol) ->
case read_frame(ChannelPid) of
{content_header, ClassId, 0, BodySize, PropertiesBin} ->
- Payload = collect_content_payload(ChannelPid, BodySize, []),
- #content{class_id = ClassId,
- properties = none,
- properties_bin = PropertiesBin,
- protocol = Protocol,
- payload_fragments_rev = Payload};
+ case collect_content_payload(ChannelPid, BodySize, []) of
+ {ok, Payload} -> {ok, #content{
+ class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ protocol = Protocol,
+ payload_fragments_rev = Payload}};
+ Error -> Error
+ end;
{content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
- unexpected_frame("expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId]);
+ {error, {unexpected_frame,
+ "expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId]}};
_ ->
- unexpected_frame("expected content header for class ~w, "
- "got non content header frame instead",
- [ClassId])
+ {error, {unexpected_frame,
+ "expected content header for class ~w, "
+ "got non content header frame instead",
+ [ClassId]}}
end.
collect_content_payload(_ChannelPid, 0, Acc) ->
- Acc;
+ {ok, Acc};
collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
case read_frame(ChannelPid) of
{content_body, FragmentBin} ->
@@ -107,10 +118,13 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
RemainingByteCount - size(FragmentBin),
[FragmentBin | Acc]);
_ ->
- unexpected_frame("expected content body, "
- "got non content body frame instead",
- [])
+ {error, {unexpected_frame,
+ "expected content body, "
+ "got non content body frame instead",
+ []}}
end.
-unexpected_frame(ExplanationFormat, Params) ->
- rabbit_misc:protocol_error(unexpected_frame, ExplanationFormat, Params).
+channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) ->
+ Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params,
+ MethodName),
+ Parent ! {channel_exit, self(), Reason}.
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 35b75a23a9..a170fb1da8 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -35,7 +35,6 @@
-define(DefaultPluginDir, "plugins").
-define(DefaultUnpackedPluginDir, "priv/plugins").
--define(DefaultRabbitEBin, "ebin").
-define(BaseApps, [rabbit]).
%%----------------------------------------------------------------------------
@@ -58,9 +57,8 @@ start() ->
%% Determine our various directories
PluginDir = get_env(plugins_dir, ?DefaultPluginDir),
UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir),
- RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin),
- RootName = RabbitEBin ++ "/rabbit",
+ RootName = UnpackedPluginDir ++ "/rabbit",
%% Unpack any .ez plugins
unpack_ez_plugins(PluginDir, UnpackedPluginDir),
@@ -86,7 +84,7 @@ start() ->
{erts, erlang:system_info(version)},
AppVersions},
- %% Write it out to ebin/rabbit.rel
+ %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel
file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
%% Compile the script
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ce3d879cf4..6e336c8641 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -452,6 +452,9 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
+handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) ->
+ {channel, Channel} = get({chpid, ChPid}),
+ handle_exception(State, Channel, Reason);
handle_channel_exit(Channel, Reason, State) ->
handle_exception(State, Channel, Reason).