summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Powell <steve@rabbitmq.com>2012-02-01 10:57:20 +0000
committerSteve Powell <steve@rabbitmq.com>2012-02-01 10:57:20 +0000
commitac8c7a57af4b381d44a62ef0ba0447c1740072af (patch)
treeb899eb7eb70d87354b0098589f00934925a660f8
parent1a36ebfe14727586c600ea2d031db5efeddc3959 (diff)
parent33a91402a7bbe105f514557b5de4be70fbf3a798 (diff)
downloadrabbitmq-server-git-ac8c7a57af4b381d44a62ef0ba0447c1740072af.tar.gz
Merge default in
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl1
-rw-r--r--src/credit_flow.erl34
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_log.erl79
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl11
-rw-r--r--src/rabbit_mirror_queue_misc.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_msg_store.erl65
-rw-r--r--src/rabbit_net.erl21
-rw-r--r--src/rabbit_networking.erl12
-rw-r--r--src/rabbit_reader.erl44
-rw-r--r--src/rabbit_tests.erl28
-rw-r--r--src/rabbit_variable_queue.erl12
-rw-r--r--src/tcp_acceptor.erl34
16 files changed, 218 insertions, 140 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 9301af6bdc..2fee1114aa 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -37,6 +37,7 @@
{auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16},
{trace_vhosts, []},
+ {log_levels, [{connection, info}]},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index c38eca7ccd..f6a8a30319 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -95,6 +95,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(CREDIT_DISC_BOUND, {2000, 1500}).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index 7df6c92a6d..397aa19169 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -16,8 +16,8 @@
-module(credit_flow).
-%% Credit starts at ?MAX_CREDIT and goes down. Both sides keep
-%% track. When the receiver goes below ?MORE_CREDIT_AT it issues more
+%% Credit starts at MaxCredit and goes down. Both sides keep
+%% track. When the receiver goes below MoreCreditAt it issues more
%% credit by sending a message to the sender. The sender should pass
%% this message in to handle_bump_msg/1. The sender should block when
%% it goes below 0 (check by invoking blocked/0). If a process is both
@@ -25,10 +25,9 @@
%% senders when it is itself blocked - thus the only processes that
%% need to check blocked/0 are ones that read from network sockets.
--define(MAX_CREDIT, 200).
--define(MORE_CREDIT_AT, 150).
+-define(DEFAULT_CREDIT, {200, 150}).
--export([ack/1, handle_bump_msg/1, blocked/0, send/1]).
+-export([ack/1, ack/2, handle_bump_msg/1, blocked/0, send/1, send/2]).
-export([peer_down/1]).
%%----------------------------------------------------------------------------
@@ -36,11 +35,14 @@
-ifdef(use_specs).
-opaque(bump_msg() :: {pid(), non_neg_integer()}).
+-opaque(credit_spec() :: {non_neg_integer(), non_neg_integer()}).
-spec(ack/1 :: (pid()) -> 'ok').
+-spec(ack/2 :: (pid(), credit_spec()) -> 'ok').
-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok').
-spec(blocked/0 :: () -> boolean()).
-spec(send/1 :: (pid()) -> 'ok').
+-spec(send/2 :: (pid(), credit_spec()) -> 'ok').
-spec(peer_down/1 :: (pid()) -> 'ok').
-endif.
@@ -55,12 +57,18 @@
%% variable names are used in credit bookkeeping and want to make
%% sense internally).
-ack(To) ->
+%% For any given pair of processes, ack/2 and send/2 must always be
+%% called with the same credit_spec().
+
+ack(To) -> ack(To, ?DEFAULT_CREDIT).
+
+ack(To, {MaxCredit, MoreCreditAt}) ->
+ MoreCreditAt1 = MoreCreditAt + 1,
Credit =
- case get({credit_to, To}, ?MAX_CREDIT) of
- ?MORE_CREDIT_AT + 1 -> grant(To, ?MAX_CREDIT - ?MORE_CREDIT_AT),
- ?MAX_CREDIT;
- C -> C - 1
+ case get({credit_to, To}, MaxCredit) of
+ MoreCreditAt1 -> grant(To, MaxCredit - MoreCreditAt),
+ MaxCredit;
+ C -> C - 1
end,
put({credit_to, To}, Credit).
@@ -76,8 +84,10 @@ handle_bump_msg({From, MoreCredit}) ->
blocked() ->
get(credit_blocked, []) =/= [].
-send(From) ->
- Credit = get({credit_from, From}, ?MAX_CREDIT) - 1,
+send(From) -> send(From, ?DEFAULT_CREDIT).
+
+send(From, {MaxCredit, _MoreCreditAt}) ->
+ Credit = get({credit_from, From}, MaxCredit) - 1,
case Credit of
0 -> block(From);
_ -> ok
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c21db21be2..f63a09d310 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1159,6 +1159,10 @@ handle_info(timeout, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 603091b1e0..d8f5508573 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1085,9 +1085,9 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
uncommitted_acks = TAL}) ->
- ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ)),
- State1 = new_tx(State),
- {noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
+ ack(TAL, State1),
+ {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 8f58f848ee..83cead6e03 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -23,7 +23,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([info/1, info/2, warning/1, warning/2, error/1, error/2]).
+-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]).
-define(SERVER, ?MODULE).
@@ -31,7 +31,15 @@
-ifdef(use_specs).
+-export_type([level/0]).
+
+-type(category() :: atom()).
+-type(level() :: 'info' | 'warning' | 'error').
+
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+
+-spec(log/3 :: (category(), level(), string()) -> 'ok').
+-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
-spec(info/2 :: (string(), [any()]) -> 'ok').
-spec(warning/1 :: (string()) -> 'ok').
@@ -42,53 +50,44 @@
-endif.
%%----------------------------------------------------------------------------
-
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
-info(Fmt) ->
- gen_server:cast(?SERVER, {info, Fmt}).
-
-info(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {info, Fmt, Args}).
+log(Category, Level, Fmt, Args) when is_list(Args) ->
+ gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}).
-warning(Fmt) ->
- gen_server:cast(?SERVER, {warning, Fmt}).
-
-warning(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {warning, Fmt, Args}).
-
-error(Fmt) ->
- gen_server:cast(?SERVER, {error, Fmt}).
-
-error(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {error, Fmt, Args}).
+info(Fmt) -> log(default, info, Fmt).
+info(Fmt, Args) -> log(default, info, Fmt, Args).
+warning(Fmt) -> log(default, warning, Fmt).
+warning(Fmt, Args) -> log(default, warning, Fmt, Args).
+error(Fmt) -> log(default, error, Fmt).
+error(Fmt, Args) -> log(default, error, Fmt, Args).
%%--------------------------------------------------------------------
-init([]) -> {ok, none}.
+init([]) ->
+ {ok, CatLevelList} = application:get_env(log_levels),
+ CatLevels = [{Cat, level(Level)} || {Cat, Level} <- CatLevelList],
+ {ok, orddict:from_list(CatLevels)}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({info, Fmt}, State) ->
- error_logger:info_msg(Fmt),
- {noreply, State};
-handle_cast({info, Fmt, Args}, State) ->
- error_logger:info_msg(Fmt, Args),
- {noreply, State};
-handle_cast({warning, Fmt}, State) ->
- error_logger:warning_msg(Fmt),
- {noreply, State};
-handle_cast({warning, Fmt, Args}, State) ->
- error_logger:warning_msg(Fmt, Args),
- {noreply, State};
-handle_cast({error, Fmt}, State) ->
- error_logger:error_msg(Fmt),
- {noreply, State};
-handle_cast({error, Fmt, Args}, State) ->
- error_logger:error_msg(Fmt, Args),
- {noreply, State};
+handle_cast({log, Category, Level, Fmt, Args}, CatLevels) ->
+ CatLevel = case orddict:find(Category, CatLevels) of
+ {ok, L} -> L;
+ error -> level(info)
+ end,
+ case level(Level) =< CatLevel of
+ false -> ok;
+ true -> (case Level of
+ info -> fun error_logger:info_msg/2;
+ warning -> fun error_logger:warning_msg/2;
+ error -> fun error_logger:error_msg/2
+ end)(Fmt, Args)
+ end,
+ {noreply, CatLevels};
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -101,3 +100,9 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+%%--------------------------------------------------------------------
+
+level(info) -> 3;
+level(warning) -> 2;
+level(error) -> 1;
+level(none) -> 0.
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 8ed2bede32..ee64b5a856 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -325,8 +325,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
true = link(GM),
GM
end,
- {ok, _TRef} =
- timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
+ ensure_gm_heartbeat(),
{ok, #state { q = Q,
gm = GM1,
monitors = dict:new(),
@@ -366,6 +365,11 @@ handle_cast({ensure_monitoring, Pids},
end, Monitors, Pids),
noreply(State #state { monitors = Monitors1 }).
+handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
+ gm:broadcast(GM, heartbeat),
+ ensure_gm_heartbeat(),
+ noreply(State);
+
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
State = #state { monitors = Monitors,
death_fun = DeathFun }) ->
@@ -419,3 +423,6 @@ noreply(State) ->
reply(Reply, State) ->
{reply, Reply, State, hibernate}.
+
+ensure_gm_heartbeat() ->
+ erlang:send_after(?ONE_SECOND, self(), send_gm_heartbeat).
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index baebc52b27..d1caf5aa68 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -137,7 +137,7 @@ add_mirror(Queue, MirrorNode) ->
[] -> Result = rabbit_mirror_queue_slave_sup:start_child(
MirrorNode, [Q]),
rabbit_log:info(
- "Adding mirror of queue ~s on node ~p: ~p~n",
+ "Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, Result]),
case Result of
{ok, _Pid} -> ok;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 06c5beac01..2cdc763723 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -253,6 +253,10 @@ handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e6a32b9023..495e29762a 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -21,7 +21,7 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2]).
+ write/3, write_flow/3, read/2, contains/2, remove/2]).
-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
delete_file/2]). %% internal
@@ -152,6 +152,7 @@
-spec(close_all_indicated/1 ::
(client_msstate()) -> rabbit_types:ok(client_msstate())).
-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
+-spec(write_flow/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
@@ -436,7 +437,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
- Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
+ Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun},
+ infinity),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -460,12 +462,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write(MsgId, Msg,
- CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
- client_ref = CRef }) ->
- ok = client_update_flying(+1, MsgId, CState),
- ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
- ok = server_cast(CState, {write, CRef, MsgId}).
+write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) ->
+ credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND),
+ client_write(MsgId, Msg, flow, CState).
+
+write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
read(MsgId,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
@@ -500,6 +501,13 @@ server_call(#client_msstate { server = Server }, Msg) ->
server_cast(#client_msstate { server = Server }, Msg) ->
gen_server2:cast(Server, Msg).
+client_write(MsgId, Msg, Flow,
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+ client_ref = CRef }) ->
+ ok = client_update_flying(+1, MsgId, CState),
+ ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
+ ok = server_cast(CState, {write, CRef, MsgId, Flow}).
+
client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
@@ -666,7 +674,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
ClientRefs, Dir, Server),
Clients = dict:from_list(
- [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]),
+ [{CRef, {undefined, undefined, undefined}} ||
+ CRef <- ClientRefs1]),
%% CleanShutdown => msg location index and file_summary both
%% recovered correctly.
true = case {FileSummaryRecovered, CleanShutdown} of
@@ -731,10 +740,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- successfully_recovered_state -> 7;
- {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7;
- {read, _MsgId} -> 2;
- _ -> 0
+ successfully_recovered_state -> 7;
+ {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7;
+ {read, _MsgId} -> 2;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -755,7 +764,7 @@ prioritise_info(Msg, _State) ->
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
+handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From,
State = #msstate { dir = Dir,
index_state = IndexState,
index_module = IndexModule,
@@ -765,7 +774,7 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
flying_ets = FlyingEts,
clients = Clients,
gc_pid = GCPid }) ->
- Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
+ Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
CurFileCacheEts, FlyingEts},
State #msstate { clients = Clients1 });
@@ -789,11 +798,19 @@ handle_cast({client_dying, CRef},
handle_cast({client_delete, CRef},
State = #msstate { clients = Clients }) ->
+ {CPid, _, _} = dict:fetch(CRef, Clients),
+ credit_flow:peer_down(CPid),
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
-handle_cast({write, CRef, MsgId},
- State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+handle_cast({write, CRef, MsgId, Flow},
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts,
+ clients = Clients }) ->
+ case Flow of
+ flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
+ credit_flow:ack(CPid, ?CREDIT_DISC_BOUND);
+ noflow -> ok
+ end,
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
case update_flying(-1, MsgId, CRef, State) of
process ->
@@ -1204,10 +1221,10 @@ update_pending_confirms(Fun, CRef,
State = #msstate { clients = Clients,
cref_to_msg_ids = CTM }) ->
case dict:fetch(CRef, Clients) of
- {undefined, _CloseFDsFun} -> State;
- {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
- State #msstate {
- cref_to_msg_ids = CTM1 }
+ {_CPid, undefined, _CloseFDsFun} -> State;
+ {_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
+ State #msstate {
+ cref_to_msg_ids = CTM1 }
end.
record_pending_confirm(CRef, MsgId, State) ->
@@ -1294,8 +1311,10 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
case (ets:update_element(FileHandlesEts, Key, {2, close})
andalso Invoke) of
true -> case dict:fetch(Ref, ClientRefs) of
- {_MsgOnDiskFun, undefined} -> ok;
- {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun()
+ {_CPid, _MsgOnDiskFun, undefined} ->
+ ok;
+ {_CPid, _MsgOnDiskFun, CloseFDsFun} ->
+ ok = CloseFDsFun()
end;
false -> ok
end
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index b944ec81a1..fef8ae88ac 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -19,7 +19,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
- sockname/1, peername/1, peercert/1]).
+ sockname/1, peername/1, peercert/1, connection_string/2]).
%%---------------------------------------------------------------------------
@@ -62,6 +62,8 @@
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
+-spec(connection_string/2 ::
+ (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
-endif.
@@ -141,3 +143,20 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock).
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
peercert(Sock) when is_port(Sock) -> nossl.
+
+connection_string(Sock, Direction) ->
+ {From, To} = case Direction of
+ inbound -> {fun peername/1, fun sockname/1};
+ outbound -> {fun sockname/1, fun peername/1}
+ end,
+ case {From(Sock), To(Sock)} of
+ {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
+ {ok, lists:flatten(
+ io_lib:format("~s:~p -> ~s:~p",
+ [rabbit_misc:ntoab(FromAddress), FromPort,
+ rabbit_misc:ntoab(ToAddress), ToPort]))};
+ {{error, _Reason} = Error, _} ->
+ Error;
+ {_, {error, _Reason} = Error} ->
+ Error
+ end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 923967ead1..7355704a95 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -164,8 +164,6 @@ ssl_transform_fun(SslOpts) ->
fun (Sock) ->
case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
{ok, SslSock} ->
- rabbit_log:info("upgraded TCP connection ~p to SSL~n",
- [self()]),
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
{error, Reason} ->
{error, {ssl_upgrade_error, Reason}};
@@ -271,6 +269,16 @@ start_client(Sock, SockTransform) ->
{ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Reader),
Reader ! {go, Sock, SockTransform},
+
+ %% In the event that somebody floods us with connections, the
+ %% reader processes can spew log events at error_logger faster
+ %% than it can keep up, causing its mailbox to grow unbounded
+ %% until we eat all the memory available and crash. So here is a
+ %% meaningless synchronous call to the underlying gen_event
+ %% mechanism. When it returns the mailbox is drained, and we
+ %% return to our caller to accept more connetions.
+ gen_event:which_handlers(error_logger),
+
Reader.
start_client(Sock) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 6e2ddedb58..5ebe65c929 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -173,25 +173,26 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
server_capabilities(_) ->
[].
+log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
case Fun(Sock) of
{ok, Res} -> Res;
- {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n",
- [self(), Reason]),
- rabbit_log:info("closing TCP connection ~p~n",
- [self()]),
+ {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
+ [self(), Reason]),
exit(normal)
end.
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
- {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
- PeerAddressS = rabbit_misc:ntoab(PeerAddress),
- rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
+ ConnStr = socket_op(Sock, fun (Sock0) ->
+ rabbit_net:connection_string(
+ Sock0, inbound)
+ end),
+ log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
@@ -223,17 +224,15 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
try
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
- handshake, 8))
+ handshake, 8)),
+ log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr])
catch
- Ex -> (if Ex == connection_closed_abruptly ->
- fun rabbit_log:warning/2;
- true ->
- fun rabbit_log:error/2
- end)("exception on TCP connection ~p from ~s:~p~n~p~n",
- [self(), PeerAddressS, PeerPort, Ex])
+ Ex -> log(case Ex of
+ connection_closed_abruptly -> warning;
+ _ -> error
+ end, "closing AMQP connection ~p (~s):~n~p~n",
+ [self(), ConnStr, Ex])
after
- rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
%% We don't close the socket explicitly. The reader is the
%% controlling process and hence its termination will close
%% the socket. Furthermore, gen_tcp:close/1 waits for pending
@@ -404,8 +403,8 @@ handle_dependent_exit(ChPid, Reason, State) ->
{_Channel, controlled} ->
maybe_close(control_throttle(State));
{Channel, uncontrolled} ->
- rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
+ log(error, "AMQP connection ~p, channel ~p - error:~n~p~n",
+ [self(), Channel, Reason]),
maybe_close(handle_exception(control_throttle(State),
Channel, Reason))
end.
@@ -449,9 +448,10 @@ wait_for_channel_termination(N, TimerRef) ->
{_Channel, controlled} ->
wait_for_channel_termination(N-1, TimerRef);
{Channel, uncontrolled} ->
- rabbit_log:error("connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason]),
+ log(error,
+ "AMQP connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
+ [self(), Channel, Reason]),
wait_for_channel_termination(N-1, TimerRef)
end;
cancel_wait ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 9afb95b98f..96acbd9b51 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2222,14 +2222,26 @@ test_amqqueue(Durable) ->
#amqqueue { durable = Durable }.
with_fresh_variable_queue(Fun) ->
- ok = empty_test_queue(),
- VQ = variable_queue_init(test_amqqueue(true), false),
- S0 = rabbit_variable_queue:status(VQ),
- assert_props(S0, [{q1, 0}, {q2, 0},
- {delta, {delta, undefined, 0, undefined}},
- {q3, 0}, {q4, 0},
- {len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
+ Ref = make_ref(),
+ Me = self(),
+ %% Run in a separate process since rabbit_msg_store will send
+ %% bump_credit messages and we want to ignore them
+ spawn_link(fun() ->
+ ok = empty_test_queue(),
+ VQ = variable_queue_init(test_amqqueue(true), false),
+ S0 = rabbit_variable_queue:status(VQ),
+ assert_props(S0, [{q1, 0}, {q2, 0},
+ {delta,
+ {delta, undefined, 0, undefined}},
+ {q3, 0}, {q4, 0},
+ {len, 0}]),
+ _ = rabbit_variable_queue:delete_and_terminate(
+ shutdown, Fun(VQ)),
+ Me ! Ref
+ end),
+ receive
+ Ref -> ok
+ end,
passed.
publish_and_confirm(Q, Payload, Count) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 63a0927f77..9b45b55852 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -870,17 +870,23 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end).
+ fun (MSCState1) ->
+ rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
+ end).
msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end).
+ fun (MSCState1) ->
+ rabbit_msg_store:read(MsgId, MSCState1)
+ end).
msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
+ fun (MCSState1) ->
+ rabbit_msg_store:remove(MsgIds, MCSState1)
+ end).
msg_store_close_fds(MSCState, IsPersistent) ->
with_msg_store_state(
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 8678c2c9e0..88da74c523 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -54,28 +54,9 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
- try
- %% report
- {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
- {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
- error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
- [rabbit_misc:ntoab(Address), Port,
- rabbit_misc:ntoab(PeerAddress), PeerPort]),
- %% In the event that somebody floods us with connections we can spew
- %% the above message at error_logger faster than it can keep up.
- %% So error_logger's mailbox grows unbounded until we eat all the
- %% memory available and crash. So here's a meaningless synchronous call
- %% to the underlying gen_event mechanism - when it returns the mailbox
- %% is drained.
- gen_event:which_handlers(error_logger),
- %% handle
- file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
- ok = file_handle_cache:obtain()
- catch {inet_error, Reason} ->
- gen_tcp:close(Sock),
- error_logger:error_msg("unable to accept TCP connection: ~p~n",
- [Reason])
- end,
+ %% handle
+ file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
+ ok = file_handle_cache:obtain(),
%% accept more
accept(State);
@@ -88,9 +69,12 @@ handle_info({inet_async, LSock, Ref, {error, closed}},
handle_info({inet_async, LSock, Ref, {error, Reason}},
State=#state{sock=LSock, ref=Ref}) ->
- {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
+ {AddressS, Port} = case inet:sockname(LSock) of
+ {ok, {A, P}} -> {rabbit_misc:ntoab(A), P};
+ {error, _} -> {"unknown", unknown}
+ end,
error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n",
- [rabbit_misc:ntoab(Address), Port, Reason]),
+ [AddressS, Port, Reason]),
accept(State);
handle_info(_Info, State) ->
@@ -104,8 +88,6 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
-inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
-
accept(State = #state{sock=LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};