summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-05 00:28:12 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-05 00:28:12 +0200
commite0bb5df5b5ee83e6542df4471ffe3dfd695deb04 (patch)
treef474157cc9892fd3fdbca7f041fa998ed5340f3b
parent7f27f432c2ac97a7cc12b57d652a522d4b9293cf (diff)
parentcba94b12742f436ddf423c432a2fb429961909bf (diff)
downloadrabbitmq-server-git-e0bb5df5b5ee83e6542df4471ffe3dfd695deb04.tar.gz
Merge branch 'master' into rabbitmq-server-336
-rw-r--r--docs/rabbitmq.config.example5
-rw-r--r--src/rabbit_error_logger_file_h.erl69
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_reader.erl3
-rw-r--r--src/rabbit_variable_queue.erl97
-rw-r--r--test/src/rabbit_tests.erl14
6 files changed, 148 insertions, 42 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index dc0fd5d4a5..3047f8f70e 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -161,6 +161,11 @@
%%
%% {frame_max, 131072},
+ %% Set the max frame size the server will accept before connection
+ %% tuning occurs
+ %%
+ %% {initial_frame_max, 4096},
+
%% Set the max permissible number of channels per connection.
%% 0 means "no limit".
%%
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 65ab7fcca8..0758a98484 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -24,6 +24,27 @@
-export([safe_handle_event/3]).
+%% extracted from error_logger_file_h. Since 18.1 the state of the
+%% error logger module changed. See:
+%% https://github.com/erlang/otp/commit/003091a1fcc749a182505ef5675c763f71eacbb0#diff-d9a19ba08f5d2b60fadfc3aa1566b324R108
+%% github issue:
+%% https://github.com/rabbitmq/rabbitmq-server/issues/324
+-record(st, {fd,
+ filename,
+ prev_handler,
+ depth = unlimited}).
+
+%% extracted from error_logger_file_h. See comment above.
+get_depth() ->
+ case application:get_env(kernel, error_logger_format_depth) of
+ {ok, Depth} when is_integer(Depth) ->
+ max(10, Depth);
+ undefined ->
+ unlimited
+ end.
+
+-define(ERTS_NEW_LOGGER_STATE, "7.1").
+
%% rabbit_error_logger_file_h is a wrapper around the error_logger_file_h
%% module because the original's init/1 does not match properly
%% with the result of closing the old handler when swapping handlers.
@@ -34,16 +55,13 @@
%% lib/stdlib/src/error_logger_file_h.erl from R14B3 was copied as
%% init_file/2 and changed so that it opens the file in 'append' mode.
-%% Used only when swapping handlers in log rotation
+%% Used only when swapping handlers in log rotation, pre OTP 18.1
init({{File, Suffix}, []}) ->
- case rabbit_file:append_file(File, Suffix) of
- ok -> file:delete(File),
- ok;
- {error, Error} ->
- rabbit_log:error("Failed to append contents of "
- "log file '~s' to '~s':~n~p~n",
- [File, [File, Suffix], Error])
- end,
+ rotate_logs(File, Suffix),
+ init(File);
+%% Used only when swapping handlers in log rotation, since OTP 18.1
+init({{File, Suffix}, ok}) ->
+ rotate_logs(File, Suffix),
init(File);
%% Used only when swapping handlers and the original handler
%% failed to terminate or was never installed
@@ -65,18 +83,31 @@ init(File) ->
init_file(File, {error_logger, Buf}) ->
case init_file(File, error_logger) of
- {ok, {Fd, File, PrevHandler}} ->
- [handle_event(Event, {Fd, File, PrevHandler}) ||
+ {ok, State} ->
+ [handle_event(Event, State) ||
{_, Event} <- lists:reverse(Buf)],
- {ok, {Fd, File, PrevHandler}};
+ {ok, State};
Error ->
Error
end;
init_file(File, PrevHandler) ->
process_flag(trap_exit, true),
case file:open(File, [append]) of
- {ok,Fd} -> {ok, {Fd, File, PrevHandler}};
- Error -> Error
+ {ok, Fd} ->
+ FoundVer = erlang:system_info(version),
+ State =
+ case rabbit_misc:version_compare(
+ ?ERTS_NEW_LOGGER_STATE, FoundVer, lte) of
+ true ->
+ #st{fd = Fd,
+ filename = File,
+ prev_handler = PrevHandler,
+ depth = get_depth()};
+ _ ->
+ {Fd, File, PrevHandler}
+ end,
+ {ok, State};
+ Error -> Error
end.
handle_event(Event, State) ->
@@ -134,3 +165,13 @@ code_change(OldVsn, State, Extra) ->
%%----------------------------------------------------------------------
t(Term) -> truncate:log_event(Term, ?LOG_TRUNC).
+
+rotate_logs(File, Suffix) ->
+ case rabbit_file:append_file(File, Suffix) of
+ ok -> file:delete(File),
+ ok;
+ {error, Error} ->
+ rabbit_log:error("Failed to append contents of "
+ "log file '~s' to '~s':~n~p~n",
+ [File, [File, Suffix], Error])
+ end.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index b33425c5a3..b8b197de49 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -269,11 +269,11 @@ reset_state(#qistate{ dir = Dir,
on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun,
journal_handle = JournalHdl }) ->
- ok = erase_index_dir(Dir),
ok = case JournalHdl of
undefined -> ok;
_ -> file_handle_cache:close(JournalHdl)
end,
+ ok = erase_index_dir(Dir),
blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun).
init(Name, OnSyncFun, OnSyncMsgFun) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 58c5aef8fd..f66a80d811 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -336,6 +336,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
exit(normal)
end,
{ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
+ InitialFrameMax = application:get_env(rabbit, initial_frame_max, ?FRAME_MIN_SIZE),
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
@@ -352,7 +353,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
protocol = none,
user = none,
timeout_sec = (HandshakeTimeout / 1000),
- frame_max = ?FRAME_MIN_SIZE,
+ frame_max = InitialFrameMax,
vhost = none,
client_properties = none,
capabilities = [],
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ee45a8d671..5e7672e7f0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -602,17 +602,9 @@ dropwhile(Pred, State) ->
{MsgProps, a(State1)}.
fetchwhile(Pred, Fun, Acc, State) ->
- case queue_out(State) of
- {empty, State1} ->
- {undefined, Acc, a(State1)};
- {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case Pred(MsgProps) of
- true -> {Msg, State2} = read_msg(MsgStatus, State1),
- {AckTag, State3} = remove(true, MsgStatus, State2),
- fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
- false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
- end
- end.
+ {MsgProps, Acc1, State1} =
+ fetch_by_predicate(Pred, Fun, Acc, State),
+ {MsgProps, Acc1, a(State1)}.
fetch(AckRequired, State) ->
case queue_out(State) of
@@ -1392,6 +1384,59 @@ remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) ->
State2 #vqstate {
out_counter = OutCount + ?QUEUE:len(QAcc)})}.
+%% This function exists as a way to improve fetchwhile/4
+%% performance. The idea of having this function is to optimise calls
+%% to rabbit_queue_index by batching delivers, instead of sending them
+%% one by one.
+%%
+%% Fun is the function passed to fetchwhile/4 that's
+%% applied to every fetched message and used to build the fetchwhile/4
+%% result accumulator FetchAcc.
+fetch_by_predicate(Pred, Fun, FetchAcc,
+ State = #vqstate {
+ index_state = IndexState,
+ out_counter = OutCount}) ->
+ {MsgProps, QAcc, State1} =
+ collect_by_predicate(Pred, ?QUEUE:new(), State),
+
+ {Delivers, FetchAcc1, State2} =
+ process_queue_entries(QAcc, Fun, FetchAcc, State1),
+
+ IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState),
+
+ {MsgProps, FetchAcc1, maybe_update_rates(
+ State2 #vqstate {
+ index_state = IndexState1,
+ out_counter = OutCount + ?QUEUE:len(QAcc)})}.
+
+%% We try to do here the same as what remove(true, State) does but
+%% processing several messages at the same time. The idea is to
+%% optimize rabbit_queue_index:deliver/2 calls by sending a list of
+%% SeqIds instead of one by one, thus process_queue_entries1 will
+%% accumulate the required deliveries, will record_pending_ack for
+%% each message, and will update stats, like remove/2 does.
+%%
+%% For the meaning of Fun and FetchAcc arguments see
+%% fetch_by_predicate/4 above.
+process_queue_entries(Q, Fun, FetchAcc, State) ->
+ ?QUEUE:foldl(fun (MsgStatus, Acc) ->
+ process_queue_entries1(MsgStatus, Fun, Acc)
+ end,
+ {[], FetchAcc, State}, Q).
+
+process_queue_entries1(
+ #msg_status { seq_id = SeqId, is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk} = MsgStatus,
+ Fun,
+ {Delivers, FetchAcc, State}) ->
+ {Msg, State1} = read_msg(MsgStatus, State),
+ State2 = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State1),
+ {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
+ Fun(Msg, SeqId, FetchAcc),
+ stats({-1, 1}, {MsgStatus, MsgStatus}, State2)}.
+
collect_by_predicate(Pred, QAcc, State) ->
case queue_out(State) of
{empty, State1} ->
@@ -2046,19 +2091,23 @@ reduce_memory_use(State = #vqstate {
State2
end,
- case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
- permitted_beta_count(State1)) of
- S2 when S2 >= IoBatchSize ->
- %% There is an implicit, but subtle, upper bound here. We
- %% may shuffle a lot of messages from Q2/3 into delta, but
- %% the number of these that require any disk operation,
- %% namely index writing, i.e. messages that are genuine
- %% betas and not gammas, is bounded by the credit_flow
- %% limiting of the alpha->beta conversion above.
- push_betas_to_deltas(S2, State1);
- _ ->
- State1
- end.
+ State3 =
+ case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
+ permitted_beta_count(State1)) of
+ S2 when S2 >= IoBatchSize ->
+ %% There is an implicit, but subtle, upper bound here. We
+ %% may shuffle a lot of messages from Q2/3 into delta, but
+ %% the number of these that require any disk operation,
+ %% namely index writing, i.e. messages that are genuine
+ %% betas and not gammas, is bounded by the credit_flow
+ %% limiting of the alpha->beta conversion above.
+ push_betas_to_deltas(S2, State1);
+ _ ->
+ State1
+ end,
+ %% See rabbitmq-server-290 for the reasons behind this GC call.
+ garbage_collect(),
+ State3.
limit_ram_acks(0, State) ->
{0, ui(State)};
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index 36e6aa920d..7fbfea2a4b 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -1914,9 +1914,19 @@ add_log_handlers(Handlers) ->
{Handler, Args} <- Handlers],
ok.
+%% sasl_report_file_h returns [] during terminate
+%% see: https://github.com/erlang/otp/blob/maint/lib/stdlib/src/error_logger_file_h.erl#L98
+%%
+%% error_logger_file_h returns ok since OTP 18.1
+%% see: https://github.com/erlang/otp/blob/maint/lib/stdlib/src/error_logger_file_h.erl#L98
delete_log_handlers(Handlers) ->
- [[] = error_logger:delete_report_handler(Handler) ||
- Handler <- Handlers],
+ [ok_or_empty_list(error_logger:delete_report_handler(Handler))
+ || Handler <- Handlers],
+ ok.
+
+ok_or_empty_list([]) ->
+ [];
+ok_or_empty_list(ok) ->
ok.
test_supervisor_delayed_restart() ->