diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-05 00:28:12 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-05 00:28:12 +0200 |
| commit | e0bb5df5b5ee83e6542df4471ffe3dfd695deb04 (patch) | |
| tree | f474157cc9892fd3fdbca7f041fa998ed5340f3b | |
| parent | 7f27f432c2ac97a7cc12b57d652a522d4b9293cf (diff) | |
| parent | cba94b12742f436ddf423c432a2fb429961909bf (diff) | |
| download | rabbitmq-server-git-e0bb5df5b5ee83e6542df4471ffe3dfd695deb04.tar.gz | |
Merge branch 'master' into rabbitmq-server-336
| -rw-r--r-- | docs/rabbitmq.config.example | 5 | ||||
| -rw-r--r-- | src/rabbit_error_logger_file_h.erl | 69 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 97 | ||||
| -rw-r--r-- | test/src/rabbit_tests.erl | 14 |
6 files changed, 148 insertions, 42 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index dc0fd5d4a5..3047f8f70e 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -161,6 +161,11 @@ %% %% {frame_max, 131072}, + %% Set the max frame size the server will accept before connection + %% tuning occurs + %% + %% {initial_frame_max, 4096}, + %% Set the max permissible number of channels per connection. %% 0 means "no limit". %% diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 65ab7fcca8..0758a98484 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -24,6 +24,27 @@ -export([safe_handle_event/3]). +%% extracted from error_logger_file_h. Since 18.1 the state of the +%% error logger module changed. See: +%% https://github.com/erlang/otp/commit/003091a1fcc749a182505ef5675c763f71eacbb0#diff-d9a19ba08f5d2b60fadfc3aa1566b324R108 +%% github issue: +%% https://github.com/rabbitmq/rabbitmq-server/issues/324 +-record(st, {fd, + filename, + prev_handler, + depth = unlimited}). + +%% extracted from error_logger_file_h. See comment above. +get_depth() -> + case application:get_env(kernel, error_logger_format_depth) of + {ok, Depth} when is_integer(Depth) -> + max(10, Depth); + undefined -> + unlimited + end. + +-define(ERTS_NEW_LOGGER_STATE, "7.1"). + %% rabbit_error_logger_file_h is a wrapper around the error_logger_file_h %% module because the original's init/1 does not match properly %% with the result of closing the old handler when swapping handlers. @@ -34,16 +55,13 @@ %% lib/stdlib/src/error_logger_file_h.erl from R14B3 was copied as %% init_file/2 and changed so that it opens the file in 'append' mode. -%% Used only when swapping handlers in log rotation +%% Used only when swapping handlers in log rotation, pre OTP 18.1 init({{File, Suffix}, []}) -> - case rabbit_file:append_file(File, Suffix) of - ok -> file:delete(File), - ok; - {error, Error} -> - rabbit_log:error("Failed to append contents of " - "log file '~s' to '~s':~n~p~n", - [File, [File, Suffix], Error]) - end, + rotate_logs(File, Suffix), + init(File); +%% Used only when swapping handlers in log rotation, since OTP 18.1 +init({{File, Suffix}, ok}) -> + rotate_logs(File, Suffix), init(File); %% Used only when swapping handlers and the original handler %% failed to terminate or was never installed @@ -65,18 +83,31 @@ init(File) -> init_file(File, {error_logger, Buf}) -> case init_file(File, error_logger) of - {ok, {Fd, File, PrevHandler}} -> - [handle_event(Event, {Fd, File, PrevHandler}) || + {ok, State} -> + [handle_event(Event, State) || {_, Event} <- lists:reverse(Buf)], - {ok, {Fd, File, PrevHandler}}; + {ok, State}; Error -> Error end; init_file(File, PrevHandler) -> process_flag(trap_exit, true), case file:open(File, [append]) of - {ok,Fd} -> {ok, {Fd, File, PrevHandler}}; - Error -> Error + {ok, Fd} -> + FoundVer = erlang:system_info(version), + State = + case rabbit_misc:version_compare( + ?ERTS_NEW_LOGGER_STATE, FoundVer, lte) of + true -> + #st{fd = Fd, + filename = File, + prev_handler = PrevHandler, + depth = get_depth()}; + _ -> + {Fd, File, PrevHandler} + end, + {ok, State}; + Error -> Error end. handle_event(Event, State) -> @@ -134,3 +165,13 @@ code_change(OldVsn, State, Extra) -> %%---------------------------------------------------------------------- t(Term) -> truncate:log_event(Term, ?LOG_TRUNC). + +rotate_logs(File, Suffix) -> + case rabbit_file:append_file(File, Suffix) of + ok -> file:delete(File), + ok; + {error, Error} -> + rabbit_log:error("Failed to append contents of " + "log file '~s' to '~s':~n~p~n", + [File, [File, Suffix], Error]) + end. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index b33425c5a3..b8b197de49 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -269,11 +269,11 @@ reset_state(#qistate{ dir = Dir, on_sync = OnSyncFun, on_sync_msg = OnSyncMsgFun, journal_handle = JournalHdl }) -> - ok = erase_index_dir(Dir), ok = case JournalHdl of undefined -> ok; _ -> file_handle_cache:close(JournalHdl) end, + ok = erase_index_dir(Dir), blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun). init(Name, OnSyncFun, OnSyncMsgFun) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 58c5aef8fd..f66a80d811 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -336,6 +336,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> exit(normal) end, {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout), + InitialFrameMax = application:get_env(rabbit, initial_frame_max, ?FRAME_MIN_SIZE), ClientSock = socket_op(Sock, SockTransform), erlang:send_after(HandshakeTimeout, self(), handshake_timeout), {PeerHost, PeerPort, Host, Port} = @@ -352,7 +353,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> protocol = none, user = none, timeout_sec = (HandshakeTimeout / 1000), - frame_max = ?FRAME_MIN_SIZE, + frame_max = InitialFrameMax, vhost = none, client_properties = none, capabilities = [], diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ee45a8d671..5e7672e7f0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -602,17 +602,9 @@ dropwhile(Pred, State) -> {MsgProps, a(State1)}. fetchwhile(Pred, Fun, Acc, State) -> - case queue_out(State) of - {empty, State1} -> - {undefined, Acc, a(State1)}; - {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case Pred(MsgProps) of - true -> {Msg, State2} = read_msg(MsgStatus, State1), - {AckTag, State3} = remove(true, MsgStatus, State2), - fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3); - false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} - end - end. + {MsgProps, Acc1, State1} = + fetch_by_predicate(Pred, Fun, Acc, State), + {MsgProps, Acc1, a(State1)}. fetch(AckRequired, State) -> case queue_out(State) of @@ -1392,6 +1384,59 @@ remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) -> State2 #vqstate { out_counter = OutCount + ?QUEUE:len(QAcc)})}. +%% This function exists as a way to improve fetchwhile/4 +%% performance. The idea of having this function is to optimise calls +%% to rabbit_queue_index by batching delivers, instead of sending them +%% one by one. +%% +%% Fun is the function passed to fetchwhile/4 that's +%% applied to every fetched message and used to build the fetchwhile/4 +%% result accumulator FetchAcc. +fetch_by_predicate(Pred, Fun, FetchAcc, + State = #vqstate { + index_state = IndexState, + out_counter = OutCount}) -> + {MsgProps, QAcc, State1} = + collect_by_predicate(Pred, ?QUEUE:new(), State), + + {Delivers, FetchAcc1, State2} = + process_queue_entries(QAcc, Fun, FetchAcc, State1), + + IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState), + + {MsgProps, FetchAcc1, maybe_update_rates( + State2 #vqstate { + index_state = IndexState1, + out_counter = OutCount + ?QUEUE:len(QAcc)})}. + +%% We try to do here the same as what remove(true, State) does but +%% processing several messages at the same time. The idea is to +%% optimize rabbit_queue_index:deliver/2 calls by sending a list of +%% SeqIds instead of one by one, thus process_queue_entries1 will +%% accumulate the required deliveries, will record_pending_ack for +%% each message, and will update stats, like remove/2 does. +%% +%% For the meaning of Fun and FetchAcc arguments see +%% fetch_by_predicate/4 above. +process_queue_entries(Q, Fun, FetchAcc, State) -> + ?QUEUE:foldl(fun (MsgStatus, Acc) -> + process_queue_entries1(MsgStatus, Fun, Acc) + end, + {[], FetchAcc, State}, Q). + +process_queue_entries1( + #msg_status { seq_id = SeqId, is_delivered = IsDelivered, + index_on_disk = IndexOnDisk} = MsgStatus, + Fun, + {Delivers, FetchAcc, State}) -> + {Msg, State1} = read_msg(MsgStatus, State), + State2 = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State1), + {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), + Fun(Msg, SeqId, FetchAcc), + stats({-1, 1}, {MsgStatus, MsgStatus}, State2)}. + collect_by_predicate(Pred, QAcc, State) -> case queue_out(State) of {empty, State1} -> @@ -2046,19 +2091,23 @@ reduce_memory_use(State = #vqstate { State2 end, - case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), - permitted_beta_count(State1)) of - S2 when S2 >= IoBatchSize -> - %% There is an implicit, but subtle, upper bound here. We - %% may shuffle a lot of messages from Q2/3 into delta, but - %% the number of these that require any disk operation, - %% namely index writing, i.e. messages that are genuine - %% betas and not gammas, is bounded by the credit_flow - %% limiting of the alpha->beta conversion above. - push_betas_to_deltas(S2, State1); - _ -> - State1 - end. + State3 = + case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), + permitted_beta_count(State1)) of + S2 when S2 >= IoBatchSize -> + %% There is an implicit, but subtle, upper bound here. We + %% may shuffle a lot of messages from Q2/3 into delta, but + %% the number of these that require any disk operation, + %% namely index writing, i.e. messages that are genuine + %% betas and not gammas, is bounded by the credit_flow + %% limiting of the alpha->beta conversion above. + push_betas_to_deltas(S2, State1); + _ -> + State1 + end, + %% See rabbitmq-server-290 for the reasons behind this GC call. + garbage_collect(), + State3. limit_ram_acks(0, State) -> {0, ui(State)}; diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index 36e6aa920d..7fbfea2a4b 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -1914,9 +1914,19 @@ add_log_handlers(Handlers) -> {Handler, Args} <- Handlers], ok. +%% sasl_report_file_h returns [] during terminate +%% see: https://github.com/erlang/otp/blob/maint/lib/stdlib/src/error_logger_file_h.erl#L98 +%% +%% error_logger_file_h returns ok since OTP 18.1 +%% see: https://github.com/erlang/otp/blob/maint/lib/stdlib/src/error_logger_file_h.erl#L98 delete_log_handlers(Handlers) -> - [[] = error_logger:delete_report_handler(Handler) || - Handler <- Handlers], + [ok_or_empty_list(error_logger:delete_report_handler(Handler)) + || Handler <- Handlers], + ok. + +ok_or_empty_list([]) -> + []; +ok_or_empty_list(ok) -> ok. test_supervisor_delayed_restart() -> |
