diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-09 17:05:04 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-09 17:05:04 +0100 |
| commit | 446c09257ccdf9d9ec04d6ab902d823e31ed4289 (patch) | |
| tree | ad1a341d542aa7f1d3ff8f30f92a47ab3286a4c6 /src | |
| parent | c7a372f8b6279577fd5f03943a1eaa261a3522b2 (diff) | |
| parent | e6e6c6a683aa253995cb4b0984d767bbe8bbc3dd (diff) | |
| download | rabbitmq-server-git-446c09257ccdf9d9ec04d6ab902d823e31ed4289.tar.gz | |
Merging default into bug 15930
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_framing_channel.erl | 76 | ||||
| -rw-r--r-- | src/rabbit_plugin_activator.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 3 |
3 files changed, 50 insertions, 35 deletions
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 6ee5a5550b..08aaafe106 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -35,12 +35,14 @@ -export([start_link/2, process/2, shutdown/1]). %% internal --export([mainloop/2]). +-export([mainloop/3]). %%-------------------------------------------------------------------- start_link(ChannelPid, Protocol) -> - {ok, proc_lib:spawn_link(fun () -> mainloop(ChannelPid, Protocol) end)}. + Parent = self(), + {ok, proc_lib:spawn_link( + fun () -> mainloop(Parent, ChannelPid, Protocol) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, @@ -60,46 +62,55 @@ read_frame(ChannelPid) -> Msg -> exit({unexpected_message, Msg}) end. -mainloop(ChannelPid, Protocol) -> +mainloop(Parent, ChannelPid, Protocol) -> case read_frame(ChannelPid) of {method, MethodName, FieldsBin} -> Method = Protocol:decode_method_fields(MethodName, FieldsBin), case Protocol:method_has_content(MethodName) of true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), - rabbit_channel:do(ChannelPid, Method, - collect_content(ChannelPid, - ClassId, - Protocol)); - false -> rabbit_channel:do(ChannelPid, Method) - end, - ?MODULE:mainloop(ChannelPid, Protocol); + case collect_content(ChannelPid, ClassId, Protocol) of + {ok, Content} -> + rabbit_channel:do(ChannelPid, Method, Content), + ?MODULE:mainloop(Parent, ChannelPid, Protocol); + {error, Reason} -> + channel_exit(Parent, Reason, MethodName) + end; + false -> rabbit_channel:do(ChannelPid, Method), + ?MODULE:mainloop(Parent, ChannelPid, Protocol) + end; _ -> - unexpected_frame("expected method frame, " - "got non method frame instead", - []) + channel_exit(Parent, {unexpected_frame, + "expected method frame, " + "got non method frame instead", + []}, none) end. collect_content(ChannelPid, ClassId, Protocol) -> case read_frame(ChannelPid) of {content_header, ClassId, 0, BodySize, PropertiesBin} -> - Payload = collect_content_payload(ChannelPid, BodySize, []), - #content{class_id = ClassId, - properties = none, - properties_bin = PropertiesBin, - protocol = Protocol, - payload_fragments_rev = Payload}; + case collect_content_payload(ChannelPid, BodySize, []) of + {ok, Payload} -> {ok, #content{ + class_id = ClassId, + properties = none, + properties_bin = PropertiesBin, + protocol = Protocol, + payload_fragments_rev = Payload}}; + Error -> Error + end; {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} -> - unexpected_frame("expected content header for class ~w, " - "got one for class ~w instead", - [ClassId, HeaderClassId]); + {error, {unexpected_frame, + "expected content header for class ~w, " + "got one for class ~w instead", + [ClassId, HeaderClassId]}}; _ -> - unexpected_frame("expected content header for class ~w, " - "got non content header frame instead", - [ClassId]) + {error, {unexpected_frame, + "expected content header for class ~w, " + "got non content header frame instead", + [ClassId]}} end. collect_content_payload(_ChannelPid, 0, Acc) -> - Acc; + {ok, Acc}; collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> case read_frame(ChannelPid) of {content_body, FragmentBin} -> @@ -107,10 +118,13 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> RemainingByteCount - size(FragmentBin), [FragmentBin | Acc]); _ -> - unexpected_frame("expected content body, " - "got non content body frame instead", - []) + {error, {unexpected_frame, + "expected content body, " + "got non content body frame instead", + []}} end. -unexpected_frame(ExplanationFormat, Params) -> - rabbit_misc:protocol_error(unexpected_frame, ExplanationFormat, Params). +channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) -> + Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params, + MethodName), + Parent ! {channel_exit, self(), Reason}. diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 35b75a23a9..a170fb1da8 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -35,7 +35,6 @@ -define(DefaultPluginDir, "plugins"). -define(DefaultUnpackedPluginDir, "priv/plugins"). --define(DefaultRabbitEBin, "ebin"). -define(BaseApps, [rabbit]). %%---------------------------------------------------------------------------- @@ -58,9 +57,8 @@ start() -> %% Determine our various directories PluginDir = get_env(plugins_dir, ?DefaultPluginDir), UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir), - RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin), - RootName = RabbitEBin ++ "/rabbit", + RootName = UnpackedPluginDir ++ "/rabbit", %% Unpack any .ez plugins unpack_ez_plugins(PluginDir, UnpackedPluginDir), @@ -86,7 +84,7 @@ start() -> {erts, erlang:system_info(version)}, AppVersions}, - %% Write it out to ebin/rabbit.rel + %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), %% Compile the script diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ce3d879cf4..6e336c8641 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -452,6 +452,9 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. +handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) -> + {channel, Channel} = get({chpid, ChPid}), + handle_exception(State, Channel, Reason); handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). |
