summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2015-08-27 18:11:08 +0300
committerMichael Klishin <mklishin@pivotal.io>2015-08-27 18:11:08 +0300
commita576769d5aff720938077f8a75aa33d5a4ae7c7f (patch)
tree95d02ff32fed8d2dc01c6f7b9f4ecec254ab2413
parent67c24aa1ccf5209660d54ad49033a1d1e3cc0502 (diff)
downloadrabbitmq-server-git-a576769d5aff720938077f8a75aa33d5a4ae7c7f.tar.gz
Merge stable into master
-rw-r--r--ebin/rabbit_app.in7
-rw-r--r--include/rabbit.hrl4
-rw-r--r--src/rabbit.erl84
-rw-r--r--src/rabbit_msg_store.erl33
-rw-r--r--src/rabbit_networking.erl18
-rw-r--r--src/rabbit_queue_index.erl3
-rw-r--r--src/rabbit_variable_queue.erl21
-rw-r--r--test/src/on_disk_store_tunable_parameter_validation_test.erl47
-rw-r--r--test/src/rabbit_tests.erl1
9 files changed, 196 insertions, 22 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 48edbd4d1e..4bfdd192f5 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -30,7 +30,7 @@
{msg_store_file_size_limit, 16777216},
{fhc_write_buffering, true},
{fhc_read_buffering, true},
- {queue_index_max_journal_entries, 65536},
+ {queue_index_max_journal_entries, 32768},
{queue_index_embed_msgs_below, 4096},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
@@ -85,5 +85,8 @@
%% see rabbitmq-server#114
{mirroring_flow_control, true},
{credit_flow_initial_credit, 200},
- {credit_flow_more_credit_after, 50}
+ {credit_flow_more_credit_after, 50},
+ %% see rabbitmq-server#227 and related tickets
+ {msg_store_credit_disc_bound, {2000, 500}},
+ {msg_store_io_batch_size, 2048}
]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index d16a38f981..9ad99a754a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -125,6 +125,10 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(CREDIT_DISC_BOUND, {2000, 500}).
+%% When we discover that we should write some indices to disk for some
+%% betas, the IO_BATCH_SIZE sets the number of betas that we must be
+%% due to write indices for before we do any work at all.
+-define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND
-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 5152c11eeb..d11b8d95a5 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -30,6 +30,9 @@
%% Boot steps.
-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]).
+%% for tests
+-export([validate_msg_store_io_batch_size_and_credit_disc_bound/2]).
+
-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).
-rabbit_boot_step({codec_correctness_check,
@@ -517,6 +520,7 @@ start(normal, []) ->
print_banner(),
log_banner(),
warn_if_kernel_config_dubious(),
+ warn_if_disc_io_options_dubious(),
rabbit_boot_steps:run_boot_steps(),
{ok, SupPid};
Error ->
@@ -788,6 +792,86 @@ warn_if_kernel_config_dubious() ->
true -> ok
end.
+warn_if_disc_io_options_dubious() ->
+ %% if these values are not set, it doesn't matter since
+ %% rabbit_variable_queue will pick up the values defined in the
+ %% IO_BATCH_SIZE and CREDIT_DISC_BOUND constants.
+ CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
+ undefined),
+ IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
+ undefined),
+ case catch validate_msg_store_io_batch_size_and_credit_disc_bound(
+ CreditDiscBound, IoBatchSize) of
+ ok -> ok;
+ {error, {Reason, Vars}} ->
+ rabbit_log:warning(Reason, Vars)
+ end.
+
+validate_msg_store_io_batch_size_and_credit_disc_bound(CreditDiscBound,
+ IoBatchSize) ->
+ case IoBatchSize of
+ undefined ->
+ ok;
+ IoBatchSize when is_integer(IoBatchSize) ->
+ if IoBatchSize < ?IO_BATCH_SIZE ->
+ throw({error,
+ {"io_batch_size of ~b lower than recommended value ~b, "
+ "paging performance may worsen~n",
+ [IoBatchSize, ?IO_BATCH_SIZE]}});
+ true ->
+ ok
+ end;
+ IoBatchSize ->
+ throw({error,
+ {"io_batch_size should be an integer, but ~b given",
+ [IoBatchSize]}})
+ end,
+
+ %% CreditDiscBound = {InitialCredit, MoreCreditAfter}
+ {RIC, RMCA} = ?CREDIT_DISC_BOUND,
+ case CreditDiscBound of
+ undefined ->
+ ok;
+ {IC, MCA} when is_integer(IC), is_integer(MCA) ->
+ if IC < RIC; MCA < RMCA ->
+ throw({error,
+ {"msg_store_credit_disc_bound {~b, ~b} lower than"
+ "recommended value {~b, ~b},"
+ " paging performance may worsen~n",
+ [IC, MCA, RIC, RMCA]}});
+ true ->
+ ok
+ end;
+ {IC, MCA} ->
+ throw({error,
+ {"both msg_store_credit_disc_bound values should be integers, but ~p given",
+ [{IC, MCA}]}});
+ CreditDiscBound ->
+ throw({error,
+ {"invalid msg_store_credit_disc_bound value given: ~p",
+ [CreditDiscBound]}})
+ end,
+
+ case {CreditDiscBound, IoBatchSize} of
+ {undefined, undefined} ->
+ ok;
+ {_CDB, undefined} ->
+ ok;
+ {undefined, _IBS} ->
+ ok;
+ {{InitialCredit, _MCA}, IoBatchSize} ->
+ if IoBatchSize < InitialCredit ->
+ throw(
+ {error,
+ {"msg_store_io_batch_size ~b should be bigger than the initial "
+ "credit value from msg_store_credit_disc_bound ~b,"
+ " paging performance may worsen~n",
+ [IoBatchSize, InitialCredit]}});
+ true ->
+ ok
+ end
+ end.
+
home_dir() ->
case init:get_argument(home) of
{ok, [[Home]]} -> Home;
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b94ed6e1b7..1a6beb5438 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -99,7 +99,9 @@
%% how big are our files allowed to get?
file_size_limit,
%% client ref to synced messages mapping
- cref_to_msg_ids
+ cref_to_msg_ids,
+ %% See CREDIT_DISC_BOUND in rabbit.hrl
+ credit_disc_bound
}).
-record(client_msstate,
@@ -113,7 +115,8 @@
file_handles_ets,
file_summary_ets,
cur_file_cache_ets,
- flying_ets
+ flying_ets,
+ credit_disc_bound
}).
-record(file_summary,
@@ -156,7 +159,8 @@
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid(),
- flying_ets :: ets:tid()}).
+ flying_ets :: ets:tid(),
+ credit_disc_bound :: {pos_integer(), pos_integer()}}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
@@ -476,6 +480,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gen_server2:call(
Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun},
infinity),
+ CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
+ ?CREDIT_DISC_BOUND),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -486,7 +492,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
- flying_ets = FlyingEts }.
+ flying_ets = FlyingEts,
+ credit_disc_bound = CreditDiscBound }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
@@ -499,8 +506,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) ->
- credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND),
+write_flow(MsgId, Msg,
+ CState = #client_msstate {
+ server = Server,
+ credit_disc_bound = CreditDiscBound }) ->
+ credit_flow:send(whereis(Server), CreditDiscBound),
client_write(MsgId, Msg, flow, CState).
write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
@@ -743,6 +753,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
msg_store = self()
}),
+ CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
+ ?CREDIT_DISC_BOUND),
+
State = #msstate { dir = Dir,
index_module = IndexModule,
index_state = IndexState,
@@ -762,7 +775,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
- cref_to_msg_ids = dict:new()
+ cref_to_msg_ids = dict:new(),
+ credit_disc_bound = CreditDiscBound
},
%% If we didn't recover the msg location index then we need to
@@ -846,10 +860,11 @@ handle_cast({client_delete, CRef},
handle_cast({write, CRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
- clients = Clients }) ->
+ clients = Clients,
+ credit_disc_bound = CreditDiscBound }) ->
case Flow of
flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
- credit_flow:ack(CPid, ?CREDIT_DISC_BOUND);
+ credit_flow:ack(CPid, CreditDiscBound);
noflow -> ok
end,
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 5d87743472..f95f8c5818 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -475,8 +475,22 @@ hostname() ->
cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
tcp_opts() ->
- {ok, Opts} = application:get_env(rabbit, tcp_listen_options),
- Opts.
+ {ok, ConfigOpts} = application:get_env(rabbit, tcp_listen_options),
+ merge_essential_tcp_listen_options(ConfigOpts).
+
+-define(ESSENTIAL_LISTEN_OPTIONS,
+ [binary,
+ {active, false},
+ {packet, raw},
+ {reuseaddr, true},
+ {nodelay, true}]).
+
+merge_essential_tcp_listen_options(Opts) ->
+ lists:foldl(fun ({K, _} = Opt, Acc) ->
+ lists:keystore(K, 1, Acc, Opt);
+ (Opt, Acc) ->
+ [Opt | Acc]
+ end , Opts, ?ESSENTIAL_LISTEN_OPTIONS).
%% inet_parse:address takes care of ip string, like "0.0.0.0"
%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0129b848ce..544b536aa2 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -127,7 +127,8 @@
%% binary generation/matching with constant vs variable lengths.
-define(REL_SEQ_BITS, 14).
--define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))).
+%% calculated as trunc(math:pow(2,?REL_SEQ_BITS))).
+-define(SEGMENT_ENTRY_COUNT, 16384).
%% seq only is binary 01 followed by 14 bits of rel seq id
%% (range: 0 - 16383)
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f66ba21605..2224a74b59 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -297,7 +297,9 @@
%% Unlike the other counters these two do not feed into
%% #rates{} and get reset
disk_read_count,
- disk_write_count
+ disk_write_count,
+
+ io_batch_size
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -320,10 +322,6 @@
end_seq_id %% end_seq_id is exclusive
}).
-%% When we discover that we should write some indices to disk for some
-%% betas, the IO_BATCH_SIZE sets the number of betas that we must be
-%% due to write indices for before we do any work at all.
--define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
@@ -395,7 +393,9 @@
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
disk_read_count :: non_neg_integer(),
- disk_write_count :: non_neg_integer() }).
+ disk_write_count :: non_neg_integer(),
+
+ io_batch_size :: pos_integer()}).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
@@ -1196,6 +1196,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
end_seq_id = NextSeqId })
end,
Now = time_compat:monotonic_time(),
+ IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
+ ?IO_BATCH_SIZE),
State = #vqstate {
q1 = ?QUEUE:new(),
q2 = ?QUEUE:new(),
@@ -1232,7 +1234,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
ack_out_counter = 0,
ack_in_counter = 0,
disk_read_count = 0,
- disk_write_count = 0 },
+ disk_write_count = 0,
+
+ io_batch_size = IoBatchSize },
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
@@ -1809,6 +1813,7 @@ reduce_memory_use(State = #vqstate {
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
+ io_batch_size = IoBatchSize,
rates = #rates { in = AvgIngress,
out = AvgEgress,
ack_in = AvgAckIngress,
@@ -1836,7 +1841,7 @@ reduce_memory_use(State = #vqstate {
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
permitted_beta_count(State1)) of
- S2 when S2 >= ?IO_BATCH_SIZE ->
+ S2 when S2 >= IoBatchSize ->
%% There is an implicit, but subtle, upper bound here. We
%% may shuffle a lot of messages from Q2/3 into delta, but
%% the number of these that require any disk operation,
diff --git a/test/src/on_disk_store_tunable_parameter_validation_test.erl b/test/src/on_disk_store_tunable_parameter_validation_test.erl
new file mode 100644
index 0000000000..9db5425e6d
--- /dev/null
+++ b/test/src/on_disk_store_tunable_parameter_validation_test.erl
@@ -0,0 +1,47 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(on_disk_store_tunable_parameter_validation_test).
+
+-include("rabbit.hrl").
+
+-export([test_msg_store_parameter_validation/0]).
+
+-define(T(Fun, Args), (catch apply(rabbit, Fun, Args))).
+
+test_msg_store_parameter_validation() ->
+ %% make sure it works with default values
+ ok = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [?CREDIT_DISC_BOUND, ?IO_BATCH_SIZE]),
+
+ %% IO_BATCH_SIZE must be greater than CREDIT_DISC_BOUND initial credit
+ ok = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, 3000]),
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, 1500]),
+
+ %% All values must be integers
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, "1500"]),
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{"2000", 500}, abc]),
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, "500"}, 2048]),
+
+ %% CREDIT_DISC_BOUND must be a tuple
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [[2000, 500], 1500]),
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [2000, 1500]),
+
+ %% config values can't be smaller than default values
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{1999, 500}, 2048]),
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 499}, 2048]),
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, 2047]),
+
+ passed.
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index 4c1489f6aa..891af272fb 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -89,6 +89,7 @@ all_tests0() ->
passed = test_configurable_server_properties(),
passed = vm_memory_monitor_tests:all_tests(),
passed = credit_flow_test:test_credit_flow_settings(),
+ passed = on_disk_store_tunable_parameter_validation_test:test_msg_store_parameter_validation(),
passed.
do_if_secondary_node(Up, Down) ->