diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 84 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 21 |
3 files changed, 120 insertions, 17 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 7ee51f09f7..bb906ede4f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -30,6 +30,9 @@ %% Boot steps. -export([maybe_insert_default_data/0, boot_delegate/0, recover/0]). +%% for tests +-export([validate_msg_store_io_batch_size_and_credit_disc_bound/2]). + -rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}). -rabbit_boot_step({codec_correctness_check, @@ -520,6 +523,7 @@ start(normal, []) -> print_banner(), log_banner(), warn_if_kernel_config_dubious(), + warn_if_disc_io_options_dubious(), run_boot_steps(), {ok, SupPid}; Error -> @@ -848,6 +852,86 @@ warn_if_kernel_config_dubious() -> true -> ok end. +warn_if_disc_io_options_dubious() -> + %% if these values are not set, it doesn't matter since + %% rabbit_variable_queue will pick up the values defined in the + %% IO_BATCH_SIZE and CREDIT_DISC_BOUND constants. + CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound, + undefined), + IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size, + undefined), + case catch validate_msg_store_io_batch_size_and_credit_disc_bound( + CreditDiscBound, IoBatchSize) of + ok -> ok; + {error, {Reason, Vars}} -> + rabbit_log:warning(Reason, Vars) + end. + +validate_msg_store_io_batch_size_and_credit_disc_bound(CreditDiscBound, + IoBatchSize) -> + case IoBatchSize of + undefined -> + ok; + IoBatchSize when is_integer(IoBatchSize) -> + if IoBatchSize < ?IO_BATCH_SIZE -> + throw({error, + {"io_batch_size of ~b lower than recommended value ~b, " + "paging performance may worsen~n", + [IoBatchSize, ?IO_BATCH_SIZE]}}); + true -> + ok + end; + IoBatchSize -> + throw({error, + {"io_batch_size should be an integer, but ~b given", + [IoBatchSize]}}) + end, + + %% CreditDiscBound = {InitialCredit, MoreCreditAfter} + {RIC, RMCA} = ?CREDIT_DISC_BOUND, + case CreditDiscBound of + undefined -> + ok; + {IC, MCA} when is_integer(IC), is_integer(MCA) -> + if IC < RIC; MCA < RMCA -> + throw({error, + {"msg_store_credit_disc_bound {~b, ~b} lower than" + "recommended value {~b, ~b}," + " paging performance may worsen~n", + [IC, MCA, RIC, RMCA]}}); + true -> + ok + end; + {IC, MCA} -> + throw({error, + {"both msg_store_credit_disc_bound values should be integers, but ~p given", + [{IC, MCA}]}}); + CreditDiscBound -> + throw({error, + {"invalid msg_store_credit_disc_bound value given: ~p", + [CreditDiscBound]}}) + end, + + case {CreditDiscBound, IoBatchSize} of + {undefined, undefined} -> + ok; + {_CDB, undefined} -> + ok; + {undefined, _IBS} -> + ok; + {{InitialCredit, _MCA}, IoBatchSize} -> + if IoBatchSize < InitialCredit -> + throw( + {error, + {"msg_store_io_batch_size ~b should be bigger than the initial " + "credit value from msg_store_credit_disc_bound ~b," + " paging performance may worsen~n", + [IoBatchSize, InitialCredit]}}); + true -> + ok + end + end. + home_dir() -> case init:get_argument(home) of {ok, [[Home]]} -> Home; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 02a3bd0f15..d0969f1b0e 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -77,7 +77,8 @@ %% to callbacks successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? - cref_to_msg_ids %% client ref to synced messages mapping + cref_to_msg_ids, %% client ref to synced messages mapping + credit_disc_bound %% See rabbit.hrl CREDIT_DISC_BOUND }). -record(client_msstate, @@ -91,7 +92,8 @@ file_handles_ets, file_summary_ets, cur_file_cache_ets, - flying_ets + flying_ets, + credit_disc_bound }). -record(file_summary, @@ -134,7 +136,8 @@ file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), cur_file_cache_ets :: ets:tid(), - flying_ets :: ets:tid()}). + flying_ets :: ets:tid(), + credit_disc_bound :: {pos_integer(), pos_integer()}}). -type(msg_ref_delta_gen(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). @@ -442,6 +445,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> gen_server2:call( Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun}, infinity), + CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound, + ?CREDIT_DISC_BOUND), #client_msstate { server = Server, client_ref = Ref, file_handle_cache = dict:new(), @@ -452,7 +457,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, - flying_ets = FlyingEts }. + flying_ets = FlyingEts, + credit_disc_bound = CreditDiscBound }. client_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), @@ -465,8 +471,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. -write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) -> - credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND), +write_flow(MsgId, Msg, + CState = #client_msstate { + server = Server, + credit_disc_bound = CreditDiscBound }) -> + credit_flow:send(whereis(Server), CreditDiscBound), client_write(MsgId, Msg, flow, CState). write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). @@ -709,6 +718,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> msg_store = self() }), + CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound, + ?CREDIT_DISC_BOUND), + State = #msstate { dir = Dir, index_module = IndexModule, index_state = IndexState, @@ -728,7 +740,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> clients = Clients, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, - cref_to_msg_ids = dict:new() + cref_to_msg_ids = dict:new(), + credit_disc_bound = CreditDiscBound }, %% If we didn't recover the msg location index then we need to @@ -812,10 +825,11 @@ handle_cast({client_delete, CRef}, handle_cast({write, CRef, MsgId, Flow}, State = #msstate { cur_file_cache_ets = CurFileCacheEts, - clients = Clients }) -> + clients = Clients, + credit_disc_bound = CreditDiscBound }) -> case Flow of flow -> {CPid, _, _} = dict:fetch(CRef, Clients), - credit_flow:ack(CPid, ?CREDIT_DISC_BOUND); + credit_flow:ack(CPid, CreditDiscBound); noflow -> ok end, true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 691e4ce2e2..4ccd9757e0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -297,7 +297,9 @@ %% Unlike the other counters these two do not feed into %% #rates{} and get reset disk_read_count, - disk_write_count + disk_write_count, + + io_batch_size }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -320,10 +322,6 @@ end_seq_id %% end_seq_id is exclusive }). -%% When we discover that we should write some indices to disk for some -%% betas, the IO_BATCH_SIZE sets the number of betas that we must be -%% due to write indices for before we do any work at all. --define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND -define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). @@ -396,7 +394,9 @@ ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), disk_read_count :: non_neg_integer(), - disk_write_count :: non_neg_integer() }). + disk_write_count :: non_neg_integer(), + + io_batch_size :: pos_integer()}). %% Duplicated from rabbit_backing_queue -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). @@ -1135,6 +1135,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, end_seq_id = NextSeqId }) end, Now = now(), + IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size, + ?IO_BATCH_SIZE), State = #vqstate { q1 = ?QUEUE:new(), q2 = ?QUEUE:new(), @@ -1171,7 +1173,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, ack_out_counter = 0, ack_in_counter = 0, disk_read_count = 0, - disk_write_count = 0 }, + disk_write_count = 0, + + io_batch_size = IoBatchSize }, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> @@ -1748,6 +1752,7 @@ reduce_memory_use(State = #vqstate { ram_pending_ack = RPA, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, + io_batch_size = IoBatchSize, rates = #rates { in = AvgIngress, out = AvgEgress, ack_in = AvgAckIngress, @@ -1775,7 +1780,7 @@ reduce_memory_use(State = #vqstate { case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), permitted_beta_count(State1)) of - S2 when S2 >= ?IO_BATCH_SIZE -> + S2 when S2 >= IoBatchSize -> %% There is an implicit, but subtle, upper bound here. We %% may shuffle a lot of messages from Q2/3 into delta, but %% the number of these that require any disk operation, |
