diff options
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | src/rabbit_binary_generator.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 50 |
3 files changed, 61 insertions, 49 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 4b157cbc46..efcadc40ec 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -67,7 +67,7 @@ -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). --record(amqp_error, {name, explanation, method = none}). +-record(amqp_error, {name, explanation = "", method = none}). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 01ac4f027f..ba5efb266f 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -46,6 +46,7 @@ build_heartbeat_frame/0]). -export([generate_table/1, encode_properties/2]). -export([check_empty_content_body_frame_size/0]). +-export([map_exception/2, amqp_exception/1]). -import(lists). @@ -63,6 +64,10 @@ -spec(generate_table/1 :: (amqp_table()) -> binary()). -spec(encode_properties/2 :: ([amqp_property_type()], [any()]) -> binary()). -spec(check_empty_content_body_frame_size/0 :: () -> 'ok'). +-spec(map_exception/2 :: (non_neg_integer(), amqp_error()) -> + {bool(), non_neg_integer(), amqp_method()}). +-spec(amqp_exception/1 :: (amqp_error()) -> {bool(), non_neg_integer(), binary(), + non_neg_integer(), non_neg_integer()}). -endif. @@ -262,3 +267,56 @@ check_empty_content_body_frame_size() -> exit({incorrect_empty_content_body_frame_size, ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE}) end. + +map_exception(Channel, Reason) -> + {SuggestedClose, ReplyCode, ReplyText, ClassId, MethodId} = + amqp_exception(Reason), + ShouldClose = SuggestedClose or (Channel == 0), + {CloseChannel, CloseMethod} = + case ShouldClose of + true -> {0, #'connection.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}}; + false -> {Channel, #'channel.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}} + end, + {ShouldClose, CloseChannel, CloseMethod}. + +%% NB: this function is also used by the Erlang client +amqp_exception(Reason) -> + {ShouldClose, ReplyCode, ReplyText, FailedMethod} = + lookup_amqp_exception(Reason), + {ClassId, MethodId} = case FailedMethod of + {_, _} -> FailedMethod; + none -> {0, 0}; + _ -> rabbit_framing:method_id(FailedMethod) + end, + {ShouldClose, ReplyCode, ReplyText, ClassId, MethodId}. + +%% FIXME: this clause can go when we move to AMQP spec >=8.1 +lookup_amqp_exception(#amqp_error{name = precondition_failed, + explanation = Expl, + method = Method}) -> + ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl), + {false, 406, ExplBin, Method}; +lookup_amqp_exception(#amqp_error{name = Name, + explanation = Expl, + method = Method}) -> + {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name), + ExplBin = amqp_exception_explanation(Text, Expl), + {ShouldClose, Code, ExplBin, Method}; +lookup_amqp_exception(Other) -> + rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), + {ShouldClose, Code, Text} = + rabbit_framing:lookup_amqp_exception(internal_error), + {ShouldClose, Code, Text, none}. + +amqp_exception_explanation(Text, Expl) -> + ExplBin = list_to_binary(Expl), + CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, + if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; + true -> CompleteTextBin + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e78d889d58..ad433e2ec1 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -727,7 +727,8 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> send_exception(State, Channel, Reason). send_exception(State, Channel, Reason) -> - {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason), + {ShouldClose, CloseChannel, CloseMethod} = + rabbit_binary_generator:map_exception(Channel, Reason), NewState = case ShouldClose of true -> terminate_channels(), close_connection(State); @@ -736,50 +737,3 @@ send_exception(State, Channel, Reason) -> ok = rabbit_writer:internal_send_command( NewState#v1.sock, CloseChannel, CloseMethod), NewState. - -map_exception(Channel, Reason) -> - {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = - lookup_amqp_exception(Reason), - ShouldClose = SuggestedClose or (Channel == 0), - {ClassId, MethodId} = case FailedMethod of - {_, _} -> FailedMethod; - none -> {0, 0}; - _ -> rabbit_framing:method_id(FailedMethod) - end, - {CloseChannel, CloseMethod} = - case ShouldClose of - true -> {0, #'connection.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}}; - false -> {Channel, #'channel.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}} - end, - {ShouldClose, CloseChannel, CloseMethod}. - -%% FIXME: this clause can go when we move to AMQP spec >=8.1 -lookup_amqp_exception(#amqp_error{name = precondition_failed, - explanation = Expl, - method = Method}) -> - ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl), - {false, 406, ExplBin, Method}; -lookup_amqp_exception(#amqp_error{name = Name, - explanation = Expl, - method = Method}) -> - {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name), - ExplBin = amqp_exception_explanation(Text, Expl), - {ShouldClose, Code, ExplBin, Method}; -lookup_amqp_exception(Other) -> - rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {ShouldClose, Code, Text} = - rabbit_framing:lookup_amqp_exception(internal_error), - {ShouldClose, Code, Text, none}. - -amqp_exception_explanation(Text, Expl) -> - ExplBin = list_to_binary(Expl), - CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, - if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; - true -> CompleteTextBin - end. |
