diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2015-08-27 18:11:08 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2015-08-27 18:11:08 +0300 |
| commit | a576769d5aff720938077f8a75aa33d5a4ae7c7f (patch) | |
| tree | 95d02ff32fed8d2dc01c6f7b9f4ecec254ab2413 | |
| parent | 67c24aa1ccf5209660d54ad49033a1d1e3cc0502 (diff) | |
| download | rabbitmq-server-git-a576769d5aff720938077f8a75aa33d5a4ae7c7f.tar.gz | |
Merge stable into master
| -rw-r--r-- | ebin/rabbit_app.in | 7 | ||||
| -rw-r--r-- | include/rabbit.hrl | 4 | ||||
| -rw-r--r-- | src/rabbit.erl | 84 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 21 | ||||
| -rw-r--r-- | test/src/on_disk_store_tunable_parameter_validation_test.erl | 47 | ||||
| -rw-r--r-- | test/src/rabbit_tests.erl | 1 |
9 files changed, 196 insertions, 22 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 48edbd4d1e..4bfdd192f5 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -30,7 +30,7 @@ {msg_store_file_size_limit, 16777216}, {fhc_write_buffering, true}, {fhc_read_buffering, true}, - {queue_index_max_journal_entries, 65536}, + {queue_index_max_journal_entries, 32768}, {queue_index_embed_msgs_below, 4096}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, @@ -85,5 +85,8 @@ %% see rabbitmq-server#114 {mirroring_flow_control, true}, {credit_flow_initial_credit, 200}, - {credit_flow_more_credit_after, 50} + {credit_flow_more_credit_after, 50}, + %% see rabbitmq-server#227 and related tickets + {msg_store_credit_disc_bound, {2000, 500}}, + {msg_store_io_batch_size, 2048} ]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index d16a38f981..9ad99a754a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -125,6 +125,10 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(CREDIT_DISC_BOUND, {2000, 500}). +%% When we discover that we should write some indices to disk for some +%% betas, the IO_BATCH_SIZE sets the number of betas that we must be +%% due to write indices for before we do any work at all. +-define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND -define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). diff --git a/src/rabbit.erl b/src/rabbit.erl index 5152c11eeb..d11b8d95a5 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -30,6 +30,9 @@ %% Boot steps. -export([maybe_insert_default_data/0, boot_delegate/0, recover/0]). +%% for tests +-export([validate_msg_store_io_batch_size_and_credit_disc_bound/2]). + -rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}). -rabbit_boot_step({codec_correctness_check, @@ -517,6 +520,7 @@ start(normal, []) -> print_banner(), log_banner(), warn_if_kernel_config_dubious(), + warn_if_disc_io_options_dubious(), rabbit_boot_steps:run_boot_steps(), {ok, SupPid}; Error -> @@ -788,6 +792,86 @@ warn_if_kernel_config_dubious() -> true -> ok end. +warn_if_disc_io_options_dubious() -> + %% if these values are not set, it doesn't matter since + %% rabbit_variable_queue will pick up the values defined in the + %% IO_BATCH_SIZE and CREDIT_DISC_BOUND constants. + CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound, + undefined), + IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size, + undefined), + case catch validate_msg_store_io_batch_size_and_credit_disc_bound( + CreditDiscBound, IoBatchSize) of + ok -> ok; + {error, {Reason, Vars}} -> + rabbit_log:warning(Reason, Vars) + end. + +validate_msg_store_io_batch_size_and_credit_disc_bound(CreditDiscBound, + IoBatchSize) -> + case IoBatchSize of + undefined -> + ok; + IoBatchSize when is_integer(IoBatchSize) -> + if IoBatchSize < ?IO_BATCH_SIZE -> + throw({error, + {"io_batch_size of ~b lower than recommended value ~b, " + "paging performance may worsen~n", + [IoBatchSize, ?IO_BATCH_SIZE]}}); + true -> + ok + end; + IoBatchSize -> + throw({error, + {"io_batch_size should be an integer, but ~b given", + [IoBatchSize]}}) + end, + + %% CreditDiscBound = {InitialCredit, MoreCreditAfter} + {RIC, RMCA} = ?CREDIT_DISC_BOUND, + case CreditDiscBound of + undefined -> + ok; + {IC, MCA} when is_integer(IC), is_integer(MCA) -> + if IC < RIC; MCA < RMCA -> + throw({error, + {"msg_store_credit_disc_bound {~b, ~b} lower than" + "recommended value {~b, ~b}," + " paging performance may worsen~n", + [IC, MCA, RIC, RMCA]}}); + true -> + ok + end; + {IC, MCA} -> + throw({error, + {"both msg_store_credit_disc_bound values should be integers, but ~p given", + [{IC, MCA}]}}); + CreditDiscBound -> + throw({error, + {"invalid msg_store_credit_disc_bound value given: ~p", + [CreditDiscBound]}}) + end, + + case {CreditDiscBound, IoBatchSize} of + {undefined, undefined} -> + ok; + {_CDB, undefined} -> + ok; + {undefined, _IBS} -> + ok; + {{InitialCredit, _MCA}, IoBatchSize} -> + if IoBatchSize < InitialCredit -> + throw( + {error, + {"msg_store_io_batch_size ~b should be bigger than the initial " + "credit value from msg_store_credit_disc_bound ~b," + " paging performance may worsen~n", + [IoBatchSize, InitialCredit]}}); + true -> + ok + end + end. + home_dir() -> case init:get_argument(home) of {ok, [[Home]]} -> Home; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b94ed6e1b7..1a6beb5438 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -99,7 +99,9 @@ %% how big are our files allowed to get? file_size_limit, %% client ref to synced messages mapping - cref_to_msg_ids + cref_to_msg_ids, + %% See CREDIT_DISC_BOUND in rabbit.hrl + credit_disc_bound }). -record(client_msstate, @@ -113,7 +115,8 @@ file_handles_ets, file_summary_ets, cur_file_cache_ets, - flying_ets + flying_ets, + credit_disc_bound }). -record(file_summary, @@ -156,7 +159,8 @@ file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), cur_file_cache_ets :: ets:tid(), - flying_ets :: ets:tid()}). + flying_ets :: ets:tid(), + credit_disc_bound :: {pos_integer(), pos_integer()}}). -type(msg_ref_delta_gen(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). @@ -476,6 +480,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> gen_server2:call( Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun}, infinity), + CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound, + ?CREDIT_DISC_BOUND), #client_msstate { server = Server, client_ref = Ref, file_handle_cache = dict:new(), @@ -486,7 +492,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, - flying_ets = FlyingEts }. + flying_ets = FlyingEts, + credit_disc_bound = CreditDiscBound }. client_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), @@ -499,8 +506,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. -write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) -> - credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND), +write_flow(MsgId, Msg, + CState = #client_msstate { + server = Server, + credit_disc_bound = CreditDiscBound }) -> + credit_flow:send(whereis(Server), CreditDiscBound), client_write(MsgId, Msg, flow, CState). write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). @@ -743,6 +753,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> msg_store = self() }), + CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound, + ?CREDIT_DISC_BOUND), + State = #msstate { dir = Dir, index_module = IndexModule, index_state = IndexState, @@ -762,7 +775,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> clients = Clients, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, - cref_to_msg_ids = dict:new() + cref_to_msg_ids = dict:new(), + credit_disc_bound = CreditDiscBound }, %% If we didn't recover the msg location index then we need to @@ -846,10 +860,11 @@ handle_cast({client_delete, CRef}, handle_cast({write, CRef, MsgId, Flow}, State = #msstate { cur_file_cache_ets = CurFileCacheEts, - clients = Clients }) -> + clients = Clients, + credit_disc_bound = CreditDiscBound }) -> case Flow of flow -> {CPid, _, _} = dict:fetch(CRef, Clients), - credit_flow:ack(CPid, ?CREDIT_DISC_BOUND); + credit_flow:ack(CPid, CreditDiscBound); noflow -> ok end, true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 5d87743472..f95f8c5818 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -475,8 +475,22 @@ hostname() -> cmap(F) -> rabbit_misc:filter_exit_map(F, connections()). tcp_opts() -> - {ok, Opts} = application:get_env(rabbit, tcp_listen_options), - Opts. + {ok, ConfigOpts} = application:get_env(rabbit, tcp_listen_options), + merge_essential_tcp_listen_options(ConfigOpts). + +-define(ESSENTIAL_LISTEN_OPTIONS, + [binary, + {active, false}, + {packet, raw}, + {reuseaddr, true}, + {nodelay, true}]). + +merge_essential_tcp_listen_options(Opts) -> + lists:foldl(fun ({K, _} = Opt, Acc) -> + lists:keystore(K, 1, Acc, Opt); + (Opt, Acc) -> + [Opt | Acc] + end , Opts, ?ESSENTIAL_LISTEN_OPTIONS). %% inet_parse:address takes care of ip string, like "0.0.0.0" %% inet:getaddr returns immediately for ip tuple {0,0,0,0}, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0129b848ce..544b536aa2 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -127,7 +127,8 @@ %% binary generation/matching with constant vs variable lengths. -define(REL_SEQ_BITS, 14). --define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))). +%% calculated as trunc(math:pow(2,?REL_SEQ_BITS))). +-define(SEGMENT_ENTRY_COUNT, 16384). %% seq only is binary 01 followed by 14 bits of rel seq id %% (range: 0 - 16383) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f66ba21605..2224a74b59 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -297,7 +297,9 @@ %% Unlike the other counters these two do not feed into %% #rates{} and get reset disk_read_count, - disk_write_count + disk_write_count, + + io_batch_size }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -320,10 +322,6 @@ end_seq_id %% end_seq_id is exclusive }). -%% When we discover that we should write some indices to disk for some -%% betas, the IO_BATCH_SIZE sets the number of betas that we must be -%% due to write indices for before we do any work at all. --define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND -define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). @@ -395,7 +393,9 @@ ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), disk_read_count :: non_neg_integer(), - disk_write_count :: non_neg_integer() }). + disk_write_count :: non_neg_integer(), + + io_batch_size :: pos_integer()}). %% Duplicated from rabbit_backing_queue -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). @@ -1196,6 +1196,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, end_seq_id = NextSeqId }) end, Now = time_compat:monotonic_time(), + IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size, + ?IO_BATCH_SIZE), State = #vqstate { q1 = ?QUEUE:new(), q2 = ?QUEUE:new(), @@ -1232,7 +1234,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, ack_out_counter = 0, ack_in_counter = 0, disk_read_count = 0, - disk_write_count = 0 }, + disk_write_count = 0, + + io_batch_size = IoBatchSize }, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> @@ -1809,6 +1813,7 @@ reduce_memory_use(State = #vqstate { ram_pending_ack = RPA, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, + io_batch_size = IoBatchSize, rates = #rates { in = AvgIngress, out = AvgEgress, ack_in = AvgAckIngress, @@ -1836,7 +1841,7 @@ reduce_memory_use(State = #vqstate { case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), permitted_beta_count(State1)) of - S2 when S2 >= ?IO_BATCH_SIZE -> + S2 when S2 >= IoBatchSize -> %% There is an implicit, but subtle, upper bound here. We %% may shuffle a lot of messages from Q2/3 into delta, but %% the number of these that require any disk operation, diff --git a/test/src/on_disk_store_tunable_parameter_validation_test.erl b/test/src/on_disk_store_tunable_parameter_validation_test.erl new file mode 100644 index 0000000000..9db5425e6d --- /dev/null +++ b/test/src/on_disk_store_tunable_parameter_validation_test.erl @@ -0,0 +1,47 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved. +%% + +-module(on_disk_store_tunable_parameter_validation_test). + +-include("rabbit.hrl"). + +-export([test_msg_store_parameter_validation/0]). + +-define(T(Fun, Args), (catch apply(rabbit, Fun, Args))). + +test_msg_store_parameter_validation() -> + %% make sure it works with default values + ok = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [?CREDIT_DISC_BOUND, ?IO_BATCH_SIZE]), + + %% IO_BATCH_SIZE must be greater than CREDIT_DISC_BOUND initial credit + ok = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, 3000]), + {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, 1500]), + + %% All values must be integers + {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, "1500"]), + {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{"2000", 500}, abc]), + {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, "500"}, 2048]), + + %% CREDIT_DISC_BOUND must be a tuple + {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [[2000, 500], 1500]), + {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [2000, 1500]), + + %% config values can't be smaller than default values + {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{1999, 500}, 2048]), + {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 499}, 2048]), + {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, 2047]), + + passed. diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index 4c1489f6aa..891af272fb 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -89,6 +89,7 @@ all_tests0() -> passed = test_configurable_server_properties(), passed = vm_memory_monitor_tests:all_tests(), passed = credit_flow_test:test_credit_flow_settings(), + passed = on_disk_store_tunable_parameter_validation_test:test_msg_store_parameter_validation(), passed. do_if_secondary_node(Up, Down) -> |
