diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-07-17 10:52:22 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-07-17 10:52:22 +0100 |
| commit | 0008e5f11fa2f3f7b9c36e89dea7a94f85aeef21 (patch) | |
| tree | 25c3526a3687b742ac85dcbc6cb3602d1b92484c | |
| parent | 41e009229feab6f1b89a1e0867ace49d051968e8 (diff) | |
| parent | a8aa180d3ee05f2bbf2c75aed3b86fad7d070950 (diff) | |
| download | rabbitmq-server-git-0008e5f11fa2f3f7b9c36e89dea7a94f85aeef21.tar.gz | |
merge default into bug24991
| -rw-r--r-- | src/rabbit_channel.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_nodes.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_prelaunch.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 134 |
5 files changed, 103 insertions, 100 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 22c6a22361..864e100afc 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -267,7 +267,7 @@ handle_cast({method, Method, Content, Flow}, catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), - send_exception(Reason#amqp_error{method = MethodName}, State); + handle_exception(Reason#amqp_error{method = MethodName}, State); _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; @@ -400,11 +400,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -send_exception(Reason, State = #ch{protocol = Protocol, - channel = Channel, - writer_pid = WriterPid, - reader_pid = ReaderPid, - conn_pid = ConnPid}) -> +handle_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid, + conn_pid = ConnPid}) -> {CloseChannel, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", @@ -418,6 +418,11 @@ send_exception(Reason, State = #ch{protocol = Protocol, {stop, normal, State1} end. +precondition_failed(Format) -> precondition_failed(Format, []). + +precondition_failed(Format, Params) -> + rabbit_misc:protocol_error(precondition_failed, Format, Params). + return_queue_declare_ok(#resource{name = ActualName}, NoWait, MessageCount, ConsumerCount, State) -> return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait, @@ -461,9 +466,9 @@ check_user_id_header(#'P_basic'{user_id = Username}, ok; check_user_id_header(#'P_basic'{user_id = Claimed}, #ch{user = #user{username = Actual}}) -> - rabbit_misc:protocol_error( - precondition_failed, "user_id property set to '~s' but " - "authenticated user was '~s'", [Claimed, Actual]). + precondition_failed( + "user_id property set to '~s' but authenticated user was '~s'", + [Claimed, Actual]). check_internal_exchange(#exchange{name = Name, internal = true}) -> rabbit_misc:protocol_error(access_refused, @@ -625,8 +630,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, State1#ch{uncommitted_message_q = NewTMQ} end}; {error, Reason} -> - rabbit_misc:protocol_error(precondition_failed, - "invalid message: ~p", [Reason]) + precondition_failed("invalid message: ~p", [Reason]) end; handle_method(#'basic.nack'{delivery_tag = DeliveryTag, @@ -881,8 +885,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, {error, not_found} -> rabbit_misc:not_found(ExchangeName); {error, in_use} -> - rabbit_misc:protocol_error( - precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]); + precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]); ok -> return_ok(State, NoWait, #'exchange.delete_ok'{}) end; @@ -980,11 +983,9 @@ handle_method(#'queue.delete'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> - rabbit_misc:protocol_error( - precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]); + precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]); {error, not_empty} -> - rabbit_misc:protocol_error( - precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]); + precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]); {ok, PurgedMessageCount} -> return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}) @@ -1019,15 +1020,13 @@ handle_method(#'queue.purge'{queue = QueueNameBin, #'queue.purge_ok'{message_count = PurgedMessageCount}); handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> - rabbit_misc:protocol_error( - precondition_failed, "cannot switch from confirm to tx mode", []); + precondition_failed("cannot switch from confirm to tx mode"); handle_method(#'tx.select'{}, _, State) -> {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}}; handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> - rabbit_misc:protocol_error( - precondition_failed, "channel is not transactional", []); + precondition_failed("channel is not transactional"); handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, @@ -1041,8 +1040,7 @@ handle_method(#'tx.commit'{}, _, {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> - rabbit_misc:protocol_error( - precondition_failed, "channel is not transactional", []); + precondition_failed("channel is not transactional"); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, uncommitted_acks = TAL, @@ -1052,8 +1050,7 @@ handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> - rabbit_misc:protocol_error( - precondition_failed, "cannot switch from tx to confirm mode", []); + precondition_failed("cannot switch from tx to confirm mode"); handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, @@ -1263,8 +1260,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> QTail, DeliveryTag, Multiple) end; {empty, _} -> - rabbit_misc:protocol_error( - precondition_failed, "unknown delivery tag ~w", [DeliveryTag]) + precondition_failed("unknown delivery tag ~w", [DeliveryTag]) end. ack(Acked, State) -> @@ -1423,7 +1419,7 @@ complete_tx(State = #ch{tx_status = committing}) -> ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}), State#ch{tx_status = in_progress}; complete_tx(State = #ch{tx_status = failed}) -> - {noreply, State1} = send_exception( + {noreply, State1} = handle_exception( rabbit_misc:amqp_error( precondition_failed, "partial tx completion", [], 'tx.commit'), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d41aa09b15..1fefa68877 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -36,7 +36,7 @@ -export([execute_mnesia_transaction/2]). -export([execute_mnesia_tx_with_tail/1]). -export([ensure_ok/2]). --export([tcp_name/3]). +-export([tcp_name/3, format_inet_error/1]). -export([upmap/2, map_in_order/2]). -export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). @@ -152,6 +152,7 @@ -spec(tcp_name/3 :: (atom(), inet:ip_address(), rabbit_networking:ip_port()) -> atom()). +-spec(format_inet_error/1 :: (atom()) -> string()). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(table_filter/3:: (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'), @@ -510,6 +511,10 @@ tcp_name(Prefix, IPAddress, Port) list_to_atom( format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])). +format_inet_error(address) -> "cannot connect to host/port"; +format_inet_error(timeout) -> "timed out"; +format_inet_error(Error) -> inet:format_error(Error). + %% This is a modified version of Luke Gorrie's pmap - %% http://lukego.livejournal.com/6753.html - that doesn't care about %% the order in which results are received. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 1c23632d52..c8d77b0f87 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -70,8 +70,8 @@ diagnostics0() -> diagnostics_host(Host) -> case names(Host) of {error, EpmdReason} -> - {"- unable to connect to epmd on ~s: ~w", - [Host, EpmdReason]}; + {"- unable to connect to epmd on ~s: ~w (~s)", + [Host, EpmdReason, rabbit_misc:format_inet_error(EpmdReason)]}; {ok, NamePorts} -> {"- ~s: ~p", [Host, [{list_to_atom(Name), Port} || diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index d56211b50e..b045443533 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -67,9 +67,5 @@ duplicate_node_check(NodeStr) -> {error, EpmdReason} -> rabbit_misc:quit("epmd error for host ~p: ~p (~s)~n", [NodeHost, EpmdReason, - case EpmdReason of - address -> "unable to establish tcp connection"; - timeout -> "timed out establishing tcp connection"; - _ -> inet:format_error(EpmdReason) - end]) + rabbit_misc:format_inet_error(EpmdReason)]) end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f81e94ba76..ee9b22c2ec 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -173,6 +173,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) -> server_capabilities(_) -> []. +%%-------------------------------------------------------------------------- + log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -383,6 +385,9 @@ update_last_blocked_by(State = #v1{conserve_resources = true}) -> update_last_blocked_by(State = #v1{conserve_resources = false}) -> State#v1{last_blocked_by = flow}. +%%-------------------------------------------------------------------------- +%% error handling / termination + close_connection(State = #v1{queue_collector = Collector, connection = #connection{ timeout_sec = TimeoutSec}}) -> @@ -412,18 +417,6 @@ handle_dependent_exit(ChPid, Reason, State) -> Channel, Reason)) end. -channel_cleanup(ChPid) -> - case get({ch_pid, ChPid}) of - undefined -> undefined; - {Channel, MRef} -> credit_flow:peer_down(ChPid), - erase({channel, Channel}), - erase({ch_pid, ChPid}), - erlang:demonitor(MRef, [flush]), - Channel - end. - -all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. - terminate_channels() -> NChannels = length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), @@ -477,6 +470,53 @@ maybe_close(State) -> termination_kind(normal) -> controlled; termination_kind(_) -> uncontrolled. +handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> + State; +handle_exception(State, Channel, Reason) -> + send_exception(State, Channel, Reason). + +send_exception(State = #v1{connection = #connection{protocol = Protocol}}, + Channel, Reason) -> + {0, CloseMethod} = + rabbit_binary_generator:map_exception(Channel, Reason, Protocol), + terminate_channels(), + State1 = close_connection(State), + ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), + State1. + +%%-------------------------------------------------------------------------- + +create_channel(Channel, State) -> + #v1{sock = Sock, queue_collector = Collector, + channel_sup_sup_pid = ChanSupSup, + connection = #connection{protocol = Protocol, + frame_max = FrameMax, + user = User, + vhost = VHost, + capabilities = Capabilities}} = State, + {ok, _ChSupPid, {ChPid, AState}} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock), + Protocol, User, VHost, Capabilities, Collector}), + MRef = erlang:monitor(process, ChPid), + put({ch_pid, ChPid}, {Channel, MRef}), + put({channel, Channel}, {ChPid, AState}), + ok. + +channel_cleanup(ChPid) -> + case get({ch_pid, ChPid}) of + undefined -> undefined; + {Channel, MRef} -> credit_flow:peer_down(ChPid), + erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + Channel + end. + +all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. + +%%-------------------------------------------------------------------------- + handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, connection = #connection{protocol = Protocol}}) @@ -522,6 +562,17 @@ process_frame(Frame, Channel, State) -> Channel, State#v1.connection_state, Frame}) end. +process_channel_frame(Frame, ChPid, AState) -> + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> {ok, NewAState}; + {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), + {ok, NewAState}; + {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( + ChPid, Method, Content), + {ok, NewAState}; + {error, Reason} -> {error, Reason} + end. + post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> channel_cleanup(ChPid), control_throttle(State); @@ -536,6 +587,8 @@ post_process_frame({method, MethodName, _}, _ChPid, post_process_frame(_Frame, _ChPid, State) -> control_throttle(State). +%%-------------------------------------------------------------------------- + handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, #v1{connection = #connection{frame_max = FrameMax}}) when FrameMax /= 0 andalso PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE -> @@ -546,8 +599,8 @@ handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1)); -handle_input({frame_payload, Type, Channel, PayloadSize}, - PayloadAndMarker, State) -> +handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, + State) -> case PayloadAndMarker of <<Payload:PayloadSize/binary, ?FRAME_END>> -> switch_callback(handle_frame(Type, Channel, Payload, State), @@ -839,8 +892,8 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; SockStat =:= send_oct; SockStat =:= send_cnt; SockStat =:= send_pend -> - socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end, - fun ([{_, I}]) -> I end); + socket_info(fun (S) -> rabbit_net:getstat(S, [SockStat]) end, + fun ([{_, I}]) -> I end, Sock); i(state, #v1{connection_state = S}) -> S; i(last_blocked_by, #v1{last_blocked_by = By}) -> @@ -876,10 +929,7 @@ i(Item, #v1{}) -> throw({bad_argument, Item}). socket_info(Get, Select, Sock) -> - socket_info(fun() -> Get(Sock) end, Select). - -socket_info(Get, Select) -> - case Get() of + case Get(Sock) of {ok, T} -> Select(T); {error, _} -> '' end. @@ -902,50 +952,6 @@ cert_info(F, Sock) -> {ok, Cert} -> list_to_binary(F(Cert)) end. -%%-------------------------------------------------------------------------- - -create_channel(Channel, State) -> - #v1{sock = Sock, queue_collector = Collector, - channel_sup_sup_pid = ChanSupSup, - connection = #connection{protocol = Protocol, - frame_max = FrameMax, - user = User, - vhost = VHost, - capabilities = Capabilities}} = State, - {ok, _ChSupPid, {ChPid, AState}} = - rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock), - Protocol, User, VHost, Capabilities, Collector}), - MRef = erlang:monitor(process, ChPid), - put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ChPid, AState}), - ok. - -process_channel_frame(Frame, ChPid, AState) -> - case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> {ok, NewAState}; - {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), - {ok, NewAState}; - {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( - ChPid, Method, Content), - {ok, NewAState}; - {error, Reason} -> {error, Reason} - end. - -handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> - State; -handle_exception(State, Channel, Reason) -> - send_exception(State, Channel, Reason). - -send_exception(State = #v1{connection = #connection{protocol = Protocol}}, - Channel, Reason) -> - {0, CloseMethod} = - rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - terminate_channels(), - State1 = close_connection(State), - ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), - State1. - emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), rabbit_event:reset_stats_timer(State, #v1.stats_timer). |
