diff options
| author | Steve Powell <steve@rabbitmq.com> | 2012-02-01 10:57:20 +0000 |
|---|---|---|
| committer | Steve Powell <steve@rabbitmq.com> | 2012-02-01 10:57:20 +0000 |
| commit | ac8c7a57af4b381d44a62ef0ba0447c1740072af (patch) | |
| tree | b899eb7eb70d87354b0098589f00934925a660f8 | |
| parent | 1a36ebfe14727586c600ea2d031db5efeddc3959 (diff) | |
| parent | 33a91402a7bbe105f514557b5de4be70fbf3a798 (diff) | |
| download | rabbitmq-server-git-ac8c7a57af4b381d44a62ef0ba0447c1740072af.tar.gz | |
Merge default in
| -rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
| -rw-r--r-- | include/rabbit.hrl | 1 | ||||
| -rw-r--r-- | src/credit_flow.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_log.erl | 79 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 65 | ||||
| -rw-r--r-- | src/rabbit_net.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 44 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 12 | ||||
| -rw-r--r-- | src/tcp_acceptor.erl | 34 |
16 files changed, 218 insertions, 140 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 9301af6bdc..2fee1114aa 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -37,6 +37,7 @@ {auth_backends, [rabbit_auth_backend_internal]}, {delegate_count, 16}, {trace_vhosts, []}, + {log_levels, [{connection, info}]}, {tcp_listen_options, [binary, {packet, raw}, {reuseaddr, true}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index c38eca7ccd..f6a8a30319 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -95,6 +95,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-define(CREDIT_DISC_BOUND, {2000, 1500}). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). diff --git a/src/credit_flow.erl b/src/credit_flow.erl index 7df6c92a6d..397aa19169 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -16,8 +16,8 @@ -module(credit_flow). -%% Credit starts at ?MAX_CREDIT and goes down. Both sides keep -%% track. When the receiver goes below ?MORE_CREDIT_AT it issues more +%% Credit starts at MaxCredit and goes down. Both sides keep +%% track. When the receiver goes below MoreCreditAt it issues more %% credit by sending a message to the sender. The sender should pass %% this message in to handle_bump_msg/1. The sender should block when %% it goes below 0 (check by invoking blocked/0). If a process is both @@ -25,10 +25,9 @@ %% senders when it is itself blocked - thus the only processes that %% need to check blocked/0 are ones that read from network sockets. --define(MAX_CREDIT, 200). --define(MORE_CREDIT_AT, 150). +-define(DEFAULT_CREDIT, {200, 150}). --export([ack/1, handle_bump_msg/1, blocked/0, send/1]). +-export([ack/1, ack/2, handle_bump_msg/1, blocked/0, send/1, send/2]). -export([peer_down/1]). %%---------------------------------------------------------------------------- @@ -36,11 +35,14 @@ -ifdef(use_specs). -opaque(bump_msg() :: {pid(), non_neg_integer()}). +-opaque(credit_spec() :: {non_neg_integer(), non_neg_integer()}). -spec(ack/1 :: (pid()) -> 'ok'). +-spec(ack/2 :: (pid(), credit_spec()) -> 'ok'). -spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok'). -spec(blocked/0 :: () -> boolean()). -spec(send/1 :: (pid()) -> 'ok'). +-spec(send/2 :: (pid(), credit_spec()) -> 'ok'). -spec(peer_down/1 :: (pid()) -> 'ok'). -endif. @@ -55,12 +57,18 @@ %% variable names are used in credit bookkeeping and want to make %% sense internally). -ack(To) -> +%% For any given pair of processes, ack/2 and send/2 must always be +%% called with the same credit_spec(). + +ack(To) -> ack(To, ?DEFAULT_CREDIT). + +ack(To, {MaxCredit, MoreCreditAt}) -> + MoreCreditAt1 = MoreCreditAt + 1, Credit = - case get({credit_to, To}, ?MAX_CREDIT) of - ?MORE_CREDIT_AT + 1 -> grant(To, ?MAX_CREDIT - ?MORE_CREDIT_AT), - ?MAX_CREDIT; - C -> C - 1 + case get({credit_to, To}, MaxCredit) of + MoreCreditAt1 -> grant(To, MaxCredit - MoreCreditAt), + MaxCredit; + C -> C - 1 end, put({credit_to, To}, Credit). @@ -76,8 +84,10 @@ handle_bump_msg({From, MoreCredit}) -> blocked() -> get(credit_blocked, []) =/= []. -send(From) -> - Credit = get({credit_from, From}, ?MAX_CREDIT) - 1, +send(From) -> send(From, ?DEFAULT_CREDIT). + +send(From, {MaxCredit, _MoreCreditAt}) -> + Credit = get({credit_from, From}, MaxCredit) - 1, case Credit of 0 -> block(From); _ -> ok diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c21db21be2..f63a09d310 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1159,6 +1159,10 @@ handle_info(timeout, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; +handle_info({bump_credit, Msg}, State) -> + credit_flow:handle_bump_msg(Msg), + noreply(State); + handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 603091b1e0..d8f5508573 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1085,9 +1085,9 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, uncommitted_acks = TAL}) -> - ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ)), - State1 = new_tx(State), - {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; + State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), + ack(TAL, State1), + {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 8f58f848ee..83cead6e03 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -23,7 +23,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([info/1, info/2, warning/1, warning/2, error/1, error/2]). +-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). -define(SERVER, ?MODULE). @@ -31,7 +31,15 @@ -ifdef(use_specs). +-export_type([level/0]). + +-type(category() :: atom()). +-type(level() :: 'info' | 'warning' | 'error'). + -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). + +-spec(log/3 :: (category(), level(), string()) -> 'ok'). +-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok'). -spec(info/1 :: (string()) -> 'ok'). -spec(info/2 :: (string(), [any()]) -> 'ok'). -spec(warning/1 :: (string()) -> 'ok'). @@ -42,53 +50,44 @@ -endif. %%---------------------------------------------------------------------------- - start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +log(Category, Level, Fmt) -> log(Category, Level, Fmt, []). -info(Fmt) -> - gen_server:cast(?SERVER, {info, Fmt}). - -info(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {info, Fmt, Args}). +log(Category, Level, Fmt, Args) when is_list(Args) -> + gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}). -warning(Fmt) -> - gen_server:cast(?SERVER, {warning, Fmt}). - -warning(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {warning, Fmt, Args}). - -error(Fmt) -> - gen_server:cast(?SERVER, {error, Fmt}). - -error(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {error, Fmt, Args}). +info(Fmt) -> log(default, info, Fmt). +info(Fmt, Args) -> log(default, info, Fmt, Args). +warning(Fmt) -> log(default, warning, Fmt). +warning(Fmt, Args) -> log(default, warning, Fmt, Args). +error(Fmt) -> log(default, error, Fmt). +error(Fmt, Args) -> log(default, error, Fmt, Args). %%-------------------------------------------------------------------- -init([]) -> {ok, none}. +init([]) -> + {ok, CatLevelList} = application:get_env(log_levels), + CatLevels = [{Cat, level(Level)} || {Cat, Level} <- CatLevelList], + {ok, orddict:from_list(CatLevels)}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({info, Fmt}, State) -> - error_logger:info_msg(Fmt), - {noreply, State}; -handle_cast({info, Fmt, Args}, State) -> - error_logger:info_msg(Fmt, Args), - {noreply, State}; -handle_cast({warning, Fmt}, State) -> - error_logger:warning_msg(Fmt), - {noreply, State}; -handle_cast({warning, Fmt, Args}, State) -> - error_logger:warning_msg(Fmt, Args), - {noreply, State}; -handle_cast({error, Fmt}, State) -> - error_logger:error_msg(Fmt), - {noreply, State}; -handle_cast({error, Fmt, Args}, State) -> - error_logger:error_msg(Fmt, Args), - {noreply, State}; +handle_cast({log, Category, Level, Fmt, Args}, CatLevels) -> + CatLevel = case orddict:find(Category, CatLevels) of + {ok, L} -> L; + error -> level(info) + end, + case level(Level) =< CatLevel of + false -> ok; + true -> (case Level of + info -> fun error_logger:info_msg/2; + warning -> fun error_logger:warning_msg/2; + error -> fun error_logger:error_msg/2 + end)(Fmt, Args) + end, + {noreply, CatLevels}; handle_cast(_Msg, State) -> {noreply, State}. @@ -101,3 +100,9 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%-------------------------------------------------------------------- + +level(info) -> 3; +level(warning) -> 2; +level(error) -> 1; +level(none) -> 0. diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 8ed2bede32..ee64b5a856 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -325,8 +325,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> true = link(GM), GM end, - {ok, _TRef} = - timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), + ensure_gm_heartbeat(), {ok, #state { q = Q, gm = GM1, monitors = dict:new(), @@ -366,6 +365,11 @@ handle_cast({ensure_monitoring, Pids}, end, Monitors, Pids), noreply(State #state { monitors = Monitors1 }). +handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> + gm:broadcast(GM, heartbeat), + ensure_gm_heartbeat(), + noreply(State); + handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, State = #state { monitors = Monitors, death_fun = DeathFun }) -> @@ -419,3 +423,6 @@ noreply(State) -> reply(Reply, State) -> {reply, Reply, State, hibernate}. + +ensure_gm_heartbeat() -> + erlang:send_after(?ONE_SECOND, self(), send_gm_heartbeat). diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index baebc52b27..d1caf5aa68 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -137,7 +137,7 @@ add_mirror(Queue, MirrorNode) -> [] -> Result = rabbit_mirror_queue_slave_sup:start_child( MirrorNode, [Q]), rabbit_log:info( - "Adding mirror of queue ~s on node ~p: ~p~n", + "Adding mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, Result]), case Result of {ok, _Pid} -> ok; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 06c5beac01..2cdc763723 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -253,6 +253,10 @@ handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; +handle_info({bump_credit, Msg}, State) -> + credit_flow:handle_bump_msg(Msg), + noreply(State); + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e6a32b9023..495e29762a 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -21,7 +21,7 @@ -export([start_link/4, successfully_recovered_state/1, client_init/4, client_terminate/1, client_delete_and_terminate/1, client_ref/1, close_all_indicated/1, - write/3, read/2, contains/2, remove/2]). + write/3, write_flow/3, read/2, contains/2, remove/2]). -export([set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal @@ -152,6 +152,7 @@ -spec(close_all_indicated/1 :: (client_msstate()) -> rabbit_types:ok(client_msstate())). -spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). +-spec(write_flow/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). -spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) -> {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). @@ -436,7 +437,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = gen_server2:call( - Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), + Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun}, + infinity), #client_msstate { server = Server, client_ref = Ref, file_handle_cache = dict:new(), @@ -460,12 +462,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. -write(MsgId, Msg, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, - client_ref = CRef }) -> - ok = client_update_flying(+1, MsgId, CState), - ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), - ok = server_cast(CState, {write, CRef, MsgId}). +write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) -> + credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND), + client_write(MsgId, Msg, flow, CState). + +write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). read(MsgId, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> @@ -500,6 +501,13 @@ server_call(#client_msstate { server = Server }, Msg) -> server_cast(#client_msstate { server = Server }, Msg) -> gen_server2:cast(Server, Msg). +client_write(MsgId, Msg, Flow, + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + client_ref = CRef }) -> + ok = client_update_flying(+1, MsgId, CState), + ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), + ok = server_cast(CState, {write, CRef, MsgId, Flow}). + client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of @@ -666,7 +674,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> recover_index_and_client_refs(IndexModule, FileSummaryRecovered, ClientRefs, Dir, Server), Clients = dict:from_list( - [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]), + [{CRef, {undefined, undefined, undefined}} || + CRef <- ClientRefs1]), %% CleanShutdown => msg location index and file_summary both %% recovered correctly. true = case {FileSummaryRecovered, CleanShutdown} of @@ -731,10 +740,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - successfully_recovered_state -> 7; - {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7; - {read, _MsgId} -> 2; - _ -> 0 + successfully_recovered_state -> 7; + {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7; + {read, _MsgId} -> 2; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -755,7 +764,7 @@ prioritise_info(Msg, _State) -> handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, +handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From, State = #msstate { dir = Dir, index_state = IndexState, index_module = IndexModule, @@ -765,7 +774,7 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, flying_ets = FlyingEts, clients = Clients, gc_pid = GCPid }) -> - Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), + Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts}, State #msstate { clients = Clients1 }); @@ -789,11 +798,19 @@ handle_cast({client_dying, CRef}, handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> + {CPid, _, _} = dict:fetch(CRef, Clients), + credit_flow:peer_down(CPid), State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); -handle_cast({write, CRef, MsgId}, - State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> +handle_cast({write, CRef, MsgId, Flow}, + State = #msstate { cur_file_cache_ets = CurFileCacheEts, + clients = Clients }) -> + case Flow of + flow -> {CPid, _, _} = dict:fetch(CRef, Clients), + credit_flow:ack(CPid, ?CREDIT_DISC_BOUND); + noflow -> ok + end, true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), case update_flying(-1, MsgId, CRef, State) of process -> @@ -1204,10 +1221,10 @@ update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients, cref_to_msg_ids = CTM }) -> case dict:fetch(CRef, Clients) of - {undefined, _CloseFDsFun} -> State; - {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM), - State #msstate { - cref_to_msg_ids = CTM1 } + {_CPid, undefined, _CloseFDsFun} -> State; + {_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM), + State #msstate { + cref_to_msg_ids = CTM1 } end. record_pending_confirm(CRef, MsgId, State) -> @@ -1294,8 +1311,10 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) -> case (ets:update_element(FileHandlesEts, Key, {2, close}) andalso Invoke) of true -> case dict:fetch(Ref, ClientRefs) of - {_MsgOnDiskFun, undefined} -> ok; - {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun() + {_CPid, _MsgOnDiskFun, undefined} -> + ok; + {_CPid, _MsgOnDiskFun, CloseFDsFun} -> + ok = CloseFDsFun() end; false -> ok end diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index b944ec81a1..fef8ae88ac 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -19,7 +19,7 @@ -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1, - sockname/1, peername/1, peercert/1]). + sockname/1, peername/1, peercert/1, connection_string/2]). %%--------------------------------------------------------------------------- @@ -62,6 +62,8 @@ -spec(peercert/1 :: (socket()) -> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())). +-spec(connection_string/2 :: + (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())). -endif. @@ -141,3 +143,20 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock). peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl); peercert(Sock) when is_port(Sock) -> nossl. + +connection_string(Sock, Direction) -> + {From, To} = case Direction of + inbound -> {fun peername/1, fun sockname/1}; + outbound -> {fun sockname/1, fun peername/1} + end, + case {From(Sock), To(Sock)} of + {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} -> + {ok, lists:flatten( + io_lib:format("~s:~p -> ~s:~p", + [rabbit_misc:ntoab(FromAddress), FromPort, + rabbit_misc:ntoab(ToAddress), ToPort]))}; + {{error, _Reason} = Error, _} -> + Error; + {_, {error, _Reason} = Error} -> + Error + end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 923967ead1..7355704a95 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -164,8 +164,6 @@ ssl_transform_fun(SslOpts) -> fun (Sock) -> case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of {ok, SslSock} -> - rabbit_log:info("upgraded TCP connection ~p to SSL~n", - [self()]), {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; {error, Reason} -> {error, {ssl_upgrade_error, Reason}}; @@ -271,6 +269,16 @@ start_client(Sock, SockTransform) -> {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []), ok = rabbit_net:controlling_process(Sock, Reader), Reader ! {go, Sock, SockTransform}, + + %% In the event that somebody floods us with connections, the + %% reader processes can spew log events at error_logger faster + %% than it can keep up, causing its mailbox to grow unbounded + %% until we eat all the memory available and crash. So here is a + %% meaningless synchronous call to the underlying gen_event + %% mechanism. When it returns the mailbox is drained, and we + %% return to our caller to accept more connetions. + gen_event:which_handlers(error_logger), + Reader. start_client(Sock) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 6e2ddedb58..5ebe65c929 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -173,25 +173,26 @@ server_capabilities(rabbit_framing_amqp_0_9_1) -> server_capabilities(_) -> []. +log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). + inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). socket_op(Sock, Fun) -> case Fun(Sock) of {ok, Res} -> Res; - {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n", - [self(), Reason]), - rabbit_log:info("closing TCP connection ~p~n", - [self()]), + {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n", + [self(), Reason]), exit(normal) end. start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), - {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), - PeerAddressS = rabbit_misc:ntoab(PeerAddress), - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), + ConnStr = socket_op(Sock, fun (Sock0) -> + rabbit_net:connection_string( + Sock0, inbound) + end), + log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]), ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), @@ -223,17 +224,15 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, try recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), - handshake, 8)) + handshake, 8)), + log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr]) catch - Ex -> (if Ex == connection_closed_abruptly -> - fun rabbit_log:warning/2; - true -> - fun rabbit_log:error/2 - end)("exception on TCP connection ~p from ~s:~p~n~p~n", - [self(), PeerAddressS, PeerPort, Ex]) + Ex -> log(case Ex of + connection_closed_abruptly -> warning; + _ -> error + end, "closing AMQP connection ~p (~s):~n~p~n", + [self(), ConnStr, Ex]) after - rabbit_log:info("closing TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), %% We don't close the socket explicitly. The reader is the %% controlling process and hence its termination will close %% the socket. Furthermore, gen_tcp:close/1 waits for pending @@ -404,8 +403,8 @@ handle_dependent_exit(ChPid, Reason, State) -> {_Channel, controlled} -> maybe_close(control_throttle(State)); {Channel, uncontrolled} -> - rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", - [self(), Channel, Reason]), + log(error, "AMQP connection ~p, channel ~p - error:~n~p~n", + [self(), Channel, Reason]), maybe_close(handle_exception(control_throttle(State), Channel, Reason)) end. @@ -449,9 +448,10 @@ wait_for_channel_termination(N, TimerRef) -> {_Channel, controlled} -> wait_for_channel_termination(N-1, TimerRef); {Channel, uncontrolled} -> - rabbit_log:error("connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]), + log(error, + "AMQP connection ~p, channel ~p - " + "error while terminating:~n~p~n", + [self(), Channel, Reason]), wait_for_channel_termination(N-1, TimerRef) end; cancel_wait -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 9afb95b98f..96acbd9b51 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2222,14 +2222,26 @@ test_amqqueue(Durable) -> #amqqueue { durable = Durable }. with_fresh_variable_queue(Fun) -> - ok = empty_test_queue(), - VQ = variable_queue_init(test_amqqueue(true), false), - S0 = rabbit_variable_queue:status(VQ), - assert_props(S0, [{q1, 0}, {q2, 0}, - {delta, {delta, undefined, 0, undefined}}, - {q3, 0}, {q4, 0}, - {len, 0}]), - _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)), + Ref = make_ref(), + Me = self(), + %% Run in a separate process since rabbit_msg_store will send + %% bump_credit messages and we want to ignore them + spawn_link(fun() -> + ok = empty_test_queue(), + VQ = variable_queue_init(test_amqqueue(true), false), + S0 = rabbit_variable_queue:status(VQ), + assert_props(S0, [{q1, 0}, {q2, 0}, + {delta, + {delta, undefined, 0, undefined}}, + {q3, 0}, {q4, 0}, + {len, 0}]), + _ = rabbit_variable_queue:delete_and_terminate( + shutdown, Fun(VQ)), + Me ! Ref + end), + receive + Ref -> ok + end, passed. publish_and_confirm(Q, Payload, Count) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 63a0927f77..9b45b55852 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -870,17 +870,23 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end). + fun (MSCState1) -> + rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) + end). msg_store_read(MSCState, IsPersistent, MsgId) -> with_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end). + fun (MSCState1) -> + rabbit_msg_store:read(MsgId, MSCState1) + end). msg_store_remove(MSCState, IsPersistent, MsgIds) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end). + fun (MCSState1) -> + rabbit_msg_store:remove(MsgIds, MCSState1) + end). msg_store_close_fds(MSCState, IsPersistent) -> with_msg_store_state( diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 8678c2c9e0..88da74c523 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -54,28 +54,9 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), - try - %% report - {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), - {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end), - error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", - [rabbit_misc:ntoab(Address), Port, - rabbit_misc:ntoab(PeerAddress), PeerPort]), - %% In the event that somebody floods us with connections we can spew - %% the above message at error_logger faster than it can keep up. - %% So error_logger's mailbox grows unbounded until we eat all the - %% memory available and crash. So here's a meaningless synchronous call - %% to the underlying gen_event mechanism - when it returns the mailbox - %% is drained. - gen_event:which_handlers(error_logger), - %% handle - file_handle_cache:transfer(apply(M, F, A ++ [Sock])), - ok = file_handle_cache:obtain() - catch {inet_error, Reason} -> - gen_tcp:close(Sock), - error_logger:error_msg("unable to accept TCP connection: ~p~n", - [Reason]) - end, + %% handle + file_handle_cache:transfer(apply(M, F, A ++ [Sock])), + ok = file_handle_cache:obtain(), %% accept more accept(State); @@ -88,9 +69,12 @@ handle_info({inet_async, LSock, Ref, {error, closed}}, handle_info({inet_async, LSock, Ref, {error, Reason}}, State=#state{sock=LSock, ref=Ref}) -> - {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), + {AddressS, Port} = case inet:sockname(LSock) of + {ok, {A, P}} -> {rabbit_misc:ntoab(A), P}; + {error, _} -> {"unknown", unknown} + end, error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n", - [rabbit_misc:ntoab(Address), Port, Reason]), + [AddressS, Port, Reason]), accept(State); handle_info(_Info, State) -> @@ -104,8 +88,6 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). - accept(State = #state{sock=LSock}) -> case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; |
