summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-07-17 10:52:22 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-07-17 10:52:22 +0100
commit0008e5f11fa2f3f7b9c36e89dea7a94f85aeef21 (patch)
tree25c3526a3687b742ac85dcbc6cb3602d1b92484c
parent41e009229feab6f1b89a1e0867ace49d051968e8 (diff)
parenta8aa180d3ee05f2bbf2c75aed3b86fad7d070950 (diff)
downloadrabbitmq-server-git-0008e5f11fa2f3f7b9c36e89dea7a94f85aeef21.tar.gz
merge default into bug24991
-rw-r--r--src/rabbit_channel.erl52
-rw-r--r--src/rabbit_misc.erl7
-rw-r--r--src/rabbit_nodes.erl4
-rw-r--r--src/rabbit_prelaunch.erl6
-rw-r--r--src/rabbit_reader.erl134
5 files changed, 103 insertions, 100 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 22c6a22361..864e100afc 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -267,7 +267,7 @@ handle_cast({method, Method, Content, Flow},
catch
exit:Reason = #amqp_error{} ->
MethodName = rabbit_misc:method_record_type(Method),
- send_exception(Reason#amqp_error{method = MethodName}, State);
+ handle_exception(Reason#amqp_error{method = MethodName}, State);
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
@@ -400,11 +400,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-send_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid,
- conn_pid = ConnPid}) ->
+handle_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid}) ->
{CloseChannel, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
@@ -418,6 +418,11 @@ send_exception(Reason, State = #ch{protocol = Protocol,
{stop, normal, State1}
end.
+precondition_failed(Format) -> precondition_failed(Format, []).
+
+precondition_failed(Format, Params) ->
+ rabbit_misc:protocol_error(precondition_failed, Format, Params).
+
return_queue_declare_ok(#resource{name = ActualName},
NoWait, MessageCount, ConsumerCount, State) ->
return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait,
@@ -461,9 +466,9 @@ check_user_id_header(#'P_basic'{user_id = Username},
ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
#ch{user = #user{username = Actual}}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "user_id property set to '~s' but "
- "authenticated user was '~s'", [Claimed, Actual]).
+ precondition_failed(
+ "user_id property set to '~s' but authenticated user was '~s'",
+ [Claimed, Actual]).
check_internal_exchange(#exchange{name = Name, internal = true}) ->
rabbit_misc:protocol_error(access_refused,
@@ -625,8 +630,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
State1#ch{uncommitted_message_q = NewTMQ}
end};
{error, Reason} ->
- rabbit_misc:protocol_error(precondition_failed,
- "invalid message: ~p", [Reason])
+ precondition_failed("invalid message: ~p", [Reason])
end;
handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
@@ -881,8 +885,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
{error, not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, in_use} ->
- rabbit_misc:protocol_error(
- precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]);
+ precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]);
ok ->
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
@@ -980,11 +983,9 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
- rabbit_misc:protocol_error(
- precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
+ precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
{error, not_empty} ->
- rabbit_misc:protocol_error(
- precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]);
+ precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]);
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount})
@@ -1019,15 +1020,13 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
#'queue.purge_ok'{message_count = PurgedMessageCount});
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot switch from confirm to tx mode", []);
+ precondition_failed("cannot switch from confirm to tx mode");
handle_method(#'tx.select'{}, _, State) ->
{reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "channel is not transactional", []);
+ precondition_failed("channel is not transactional");
handle_method(#'tx.commit'{}, _,
State = #ch{uncommitted_message_q = TMQ,
@@ -1041,8 +1040,7 @@ handle_method(#'tx.commit'{}, _,
{noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "channel is not transactional", []);
+ precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
uncommitted_acks = TAL,
@@ -1052,8 +1050,7 @@ handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
{reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot switch from tx to confirm mode", []);
+ precondition_failed("cannot switch from tx to confirm mode");
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
@@ -1263,8 +1260,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
- rabbit_misc:protocol_error(
- precondition_failed, "unknown delivery tag ~w", [DeliveryTag])
+ precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
ack(Acked, State) ->
@@ -1423,7 +1419,7 @@ complete_tx(State = #ch{tx_status = committing}) ->
ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
State#ch{tx_status = in_progress};
complete_tx(State = #ch{tx_status = failed}) ->
- {noreply, State1} = send_exception(
+ {noreply, State1} = handle_exception(
rabbit_misc:amqp_error(
precondition_failed, "partial tx completion", [],
'tx.commit'),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d41aa09b15..1fefa68877 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -36,7 +36,7 @@
-export([execute_mnesia_transaction/2]).
-export([execute_mnesia_tx_with_tail/1]).
-export([ensure_ok/2]).
--export([tcp_name/3]).
+-export([tcp_name/3, format_inet_error/1]).
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
@@ -152,6 +152,7 @@
-spec(tcp_name/3 ::
(atom(), inet:ip_address(), rabbit_networking:ip_port())
-> atom()).
+-spec(format_inet_error/1 :: (atom()) -> string()).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(table_filter/3:: (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'),
@@ -510,6 +511,10 @@ tcp_name(Prefix, IPAddress, Port)
list_to_atom(
format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])).
+format_inet_error(address) -> "cannot connect to host/port";
+format_inet_error(timeout) -> "timed out";
+format_inet_error(Error) -> inet:format_error(Error).
+
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
%% the order in which results are received.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 1c23632d52..c8d77b0f87 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -70,8 +70,8 @@ diagnostics0() ->
diagnostics_host(Host) ->
case names(Host) of
{error, EpmdReason} ->
- {"- unable to connect to epmd on ~s: ~w",
- [Host, EpmdReason]};
+ {"- unable to connect to epmd on ~s: ~w (~s)",
+ [Host, EpmdReason, rabbit_misc:format_inet_error(EpmdReason)]};
{ok, NamePorts} ->
{"- ~s: ~p",
[Host, [{list_to_atom(Name), Port} ||
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index d56211b50e..b045443533 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -67,9 +67,5 @@ duplicate_node_check(NodeStr) ->
{error, EpmdReason} ->
rabbit_misc:quit("epmd error for host ~p: ~p (~s)~n",
[NodeHost, EpmdReason,
- case EpmdReason of
- address -> "unable to establish tcp connection";
- timeout -> "timed out establishing tcp connection";
- _ -> inet:format_error(EpmdReason)
- end])
+ rabbit_misc:format_inet_error(EpmdReason)])
end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f81e94ba76..ee9b22c2ec 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -173,6 +173,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
server_capabilities(_) ->
[].
+%%--------------------------------------------------------------------------
+
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -383,6 +385,9 @@ update_last_blocked_by(State = #v1{conserve_resources = true}) ->
update_last_blocked_by(State = #v1{conserve_resources = false}) ->
State#v1{last_blocked_by = flow}.
+%%--------------------------------------------------------------------------
+%% error handling / termination
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -412,18 +417,6 @@ handle_dependent_exit(ChPid, Reason, State) ->
Channel, Reason))
end.
-channel_cleanup(ChPid) ->
- case get({ch_pid, ChPid}) of
- undefined -> undefined;
- {Channel, MRef} -> credit_flow:peer_down(ChPid),
- erase({channel, Channel}),
- erase({ch_pid, ChPid}),
- erlang:demonitor(MRef, [flush]),
- Channel
- end.
-
-all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
-
terminate_channels() ->
NChannels =
length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
@@ -477,6 +470,53 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
+handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
+ State;
+handle_exception(State, Channel, Reason) ->
+ send_exception(State, Channel, Reason).
+
+send_exception(State = #v1{connection = #connection{protocol = Protocol}},
+ Channel, Reason) ->
+ {0, CloseMethod} =
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ terminate_channels(),
+ State1 = close_connection(State),
+ ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
+ State1.
+
+%%--------------------------------------------------------------------------
+
+create_channel(Channel, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
+ {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
+ Protocol, User, VHost, Capabilities, Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ ok.
+
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ erlang:demonitor(MRef, [flush]),
+ Channel
+ end.
+
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
+
+%%--------------------------------------------------------------------------
+
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
@@ -522,6 +562,17 @@ process_frame(Frame, Channel, State) ->
Channel, State#v1.connection_state, Frame})
end.
+process_channel_frame(Frame, ChPid, AState) ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} -> {ok, NewAState};
+ {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ {ok, NewAState};
+ {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
+ ChPid, Method, Content),
+ {ok, NewAState};
+ {error, Reason} -> {error, Reason}
+ end.
+
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
control_throttle(State);
@@ -536,6 +587,8 @@ post_process_frame({method, MethodName, _}, _ChPid,
post_process_frame(_Frame, _ChPid, State) ->
control_throttle(State).
+%%--------------------------------------------------------------------------
+
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
#v1{connection = #connection{frame_max = FrameMax}})
when FrameMax /= 0 andalso PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE ->
@@ -546,8 +599,8 @@ handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
PayloadSize + 1));
-handle_input({frame_payload, Type, Channel, PayloadSize},
- PayloadAndMarker, State) ->
+handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker,
+ State) ->
case PayloadAndMarker of
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
switch_callback(handle_frame(Type, Channel, Payload, State),
@@ -839,8 +892,8 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
- socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end,
- fun ([{_, I}]) -> I end);
+ socket_info(fun (S) -> rabbit_net:getstat(S, [SockStat]) end,
+ fun ([{_, I}]) -> I end, Sock);
i(state, #v1{connection_state = S}) ->
S;
i(last_blocked_by, #v1{last_blocked_by = By}) ->
@@ -876,10 +929,7 @@ i(Item, #v1{}) ->
throw({bad_argument, Item}).
socket_info(Get, Select, Sock) ->
- socket_info(fun() -> Get(Sock) end, Select).
-
-socket_info(Get, Select) ->
- case Get() of
+ case Get(Sock) of
{ok, T} -> Select(T);
{error, _} -> ''
end.
@@ -902,50 +952,6 @@ cert_info(F, Sock) ->
{ok, Cert} -> list_to_binary(F(Cert))
end.
-%%--------------------------------------------------------------------------
-
-create_channel(Channel, State) ->
- #v1{sock = Sock, queue_collector = Collector,
- channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State,
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- ok.
-
-process_channel_frame(Frame, ChPid, AState) ->
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> {ok, NewAState};
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- {ok, NewAState};
- {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
- ChPid, Method, Content),
- {ok, NewAState};
- {error, Reason} -> {error, Reason}
- end.
-
-handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
- State;
-handle_exception(State, Channel, Reason) ->
- send_exception(State, Channel, Reason).
-
-send_exception(State = #v1{connection = #connection{protocol = Protocol}},
- Channel, Reason) ->
- {0, CloseMethod} =
- rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- terminate_channels(),
- State1 = close_connection(State),
- ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
- State1.
-
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).