summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl84
-rw-r--r--src/rabbit_msg_store.erl32
-rw-r--r--src/rabbit_variable_queue.erl21
3 files changed, 120 insertions, 17 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7ee51f09f7..bb906ede4f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -30,6 +30,9 @@
%% Boot steps.
-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]).
+%% for tests
+-export([validate_msg_store_io_batch_size_and_credit_disc_bound/2]).
+
-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).
-rabbit_boot_step({codec_correctness_check,
@@ -520,6 +523,7 @@ start(normal, []) ->
print_banner(),
log_banner(),
warn_if_kernel_config_dubious(),
+ warn_if_disc_io_options_dubious(),
run_boot_steps(),
{ok, SupPid};
Error ->
@@ -848,6 +852,86 @@ warn_if_kernel_config_dubious() ->
true -> ok
end.
+warn_if_disc_io_options_dubious() ->
+ %% if these values are not set, it doesn't matter since
+ %% rabbit_variable_queue will pick up the values defined in the
+ %% IO_BATCH_SIZE and CREDIT_DISC_BOUND constants.
+ CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
+ undefined),
+ IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
+ undefined),
+ case catch validate_msg_store_io_batch_size_and_credit_disc_bound(
+ CreditDiscBound, IoBatchSize) of
+ ok -> ok;
+ {error, {Reason, Vars}} ->
+ rabbit_log:warning(Reason, Vars)
+ end.
+
+validate_msg_store_io_batch_size_and_credit_disc_bound(CreditDiscBound,
+ IoBatchSize) ->
+ case IoBatchSize of
+ undefined ->
+ ok;
+ IoBatchSize when is_integer(IoBatchSize) ->
+ if IoBatchSize < ?IO_BATCH_SIZE ->
+ throw({error,
+ {"io_batch_size of ~b lower than recommended value ~b, "
+ "paging performance may worsen~n",
+ [IoBatchSize, ?IO_BATCH_SIZE]}});
+ true ->
+ ok
+ end;
+ IoBatchSize ->
+ throw({error,
+ {"io_batch_size should be an integer, but ~b given",
+ [IoBatchSize]}})
+ end,
+
+ %% CreditDiscBound = {InitialCredit, MoreCreditAfter}
+ {RIC, RMCA} = ?CREDIT_DISC_BOUND,
+ case CreditDiscBound of
+ undefined ->
+ ok;
+ {IC, MCA} when is_integer(IC), is_integer(MCA) ->
+ if IC < RIC; MCA < RMCA ->
+ throw({error,
+ {"msg_store_credit_disc_bound {~b, ~b} lower than"
+ "recommended value {~b, ~b},"
+ " paging performance may worsen~n",
+ [IC, MCA, RIC, RMCA]}});
+ true ->
+ ok
+ end;
+ {IC, MCA} ->
+ throw({error,
+ {"both msg_store_credit_disc_bound values should be integers, but ~p given",
+ [{IC, MCA}]}});
+ CreditDiscBound ->
+ throw({error,
+ {"invalid msg_store_credit_disc_bound value given: ~p",
+ [CreditDiscBound]}})
+ end,
+
+ case {CreditDiscBound, IoBatchSize} of
+ {undefined, undefined} ->
+ ok;
+ {_CDB, undefined} ->
+ ok;
+ {undefined, _IBS} ->
+ ok;
+ {{InitialCredit, _MCA}, IoBatchSize} ->
+ if IoBatchSize < InitialCredit ->
+ throw(
+ {error,
+ {"msg_store_io_batch_size ~b should be bigger than the initial "
+ "credit value from msg_store_credit_disc_bound ~b,"
+ " paging performance may worsen~n",
+ [IoBatchSize, InitialCredit]}});
+ true ->
+ ok
+ end
+ end.
+
home_dir() ->
case init:get_argument(home) of
{ok, [[Home]]} -> Home;
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 02a3bd0f15..d0969f1b0e 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -77,7 +77,8 @@
%% to callbacks
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
- cref_to_msg_ids %% client ref to synced messages mapping
+ cref_to_msg_ids, %% client ref to synced messages mapping
+ credit_disc_bound %% See rabbit.hrl CREDIT_DISC_BOUND
}).
-record(client_msstate,
@@ -91,7 +92,8 @@
file_handles_ets,
file_summary_ets,
cur_file_cache_ets,
- flying_ets
+ flying_ets,
+ credit_disc_bound
}).
-record(file_summary,
@@ -134,7 +136,8 @@
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid(),
- flying_ets :: ets:tid()}).
+ flying_ets :: ets:tid(),
+ credit_disc_bound :: {pos_integer(), pos_integer()}}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
@@ -442,6 +445,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gen_server2:call(
Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun},
infinity),
+ CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
+ ?CREDIT_DISC_BOUND),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -452,7 +457,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
- flying_ets = FlyingEts }.
+ flying_ets = FlyingEts,
+ credit_disc_bound = CreditDiscBound }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
@@ -465,8 +471,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) ->
- credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND),
+write_flow(MsgId, Msg,
+ CState = #client_msstate {
+ server = Server,
+ credit_disc_bound = CreditDiscBound }) ->
+ credit_flow:send(whereis(Server), CreditDiscBound),
client_write(MsgId, Msg, flow, CState).
write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
@@ -709,6 +718,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
msg_store = self()
}),
+ CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
+ ?CREDIT_DISC_BOUND),
+
State = #msstate { dir = Dir,
index_module = IndexModule,
index_state = IndexState,
@@ -728,7 +740,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
- cref_to_msg_ids = dict:new()
+ cref_to_msg_ids = dict:new(),
+ credit_disc_bound = CreditDiscBound
},
%% If we didn't recover the msg location index then we need to
@@ -812,10 +825,11 @@ handle_cast({client_delete, CRef},
handle_cast({write, CRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
- clients = Clients }) ->
+ clients = Clients,
+ credit_disc_bound = CreditDiscBound }) ->
case Flow of
flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
- credit_flow:ack(CPid, ?CREDIT_DISC_BOUND);
+ credit_flow:ack(CPid, CreditDiscBound);
noflow -> ok
end,
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 691e4ce2e2..4ccd9757e0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -297,7 +297,9 @@
%% Unlike the other counters these two do not feed into
%% #rates{} and get reset
disk_read_count,
- disk_write_count
+ disk_write_count,
+
+ io_batch_size
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -320,10 +322,6 @@
end_seq_id %% end_seq_id is exclusive
}).
-%% When we discover that we should write some indices to disk for some
-%% betas, the IO_BATCH_SIZE sets the number of betas that we must be
-%% due to write indices for before we do any work at all.
--define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
@@ -396,7 +394,9 @@
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
disk_read_count :: non_neg_integer(),
- disk_write_count :: non_neg_integer() }).
+ disk_write_count :: non_neg_integer(),
+
+ io_batch_size :: pos_integer()}).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
@@ -1135,6 +1135,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
end_seq_id = NextSeqId })
end,
Now = now(),
+ IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
+ ?IO_BATCH_SIZE),
State = #vqstate {
q1 = ?QUEUE:new(),
q2 = ?QUEUE:new(),
@@ -1171,7 +1173,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
ack_out_counter = 0,
ack_in_counter = 0,
disk_read_count = 0,
- disk_write_count = 0 },
+ disk_write_count = 0,
+
+ io_batch_size = IoBatchSize },
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
@@ -1748,6 +1752,7 @@ reduce_memory_use(State = #vqstate {
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
+ io_batch_size = IoBatchSize,
rates = #rates { in = AvgIngress,
out = AvgEgress,
ack_in = AvgAckIngress,
@@ -1775,7 +1780,7 @@ reduce_memory_use(State = #vqstate {
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
permitted_beta_count(State1)) of
- S2 when S2 >= ?IO_BATCH_SIZE ->
+ S2 when S2 >= IoBatchSize ->
%% There is an implicit, but subtle, upper bound here. We
%% may shuffle a lot of messages from Q2/3 into delta, but
%% the number of these that require any disk operation,