summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2015-08-27 18:11:08 +0300
committerMichael Klishin <mklishin@pivotal.io>2015-08-27 18:11:08 +0300
commita576769d5aff720938077f8a75aa33d5a4ae7c7f (patch)
tree95d02ff32fed8d2dc01c6f7b9f4ecec254ab2413 /src
parent67c24aa1ccf5209660d54ad49033a1d1e3cc0502 (diff)
downloadrabbitmq-server-git-a576769d5aff720938077f8a75aa33d5a4ae7c7f.tar.gz
Merge stable into master
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl84
-rw-r--r--src/rabbit_msg_store.erl33
-rw-r--r--src/rabbit_networking.erl18
-rw-r--r--src/rabbit_queue_index.erl3
-rw-r--r--src/rabbit_variable_queue.erl21
5 files changed, 139 insertions, 20 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 5152c11eeb..d11b8d95a5 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -30,6 +30,9 @@
%% Boot steps.
-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]).
+%% for tests
+-export([validate_msg_store_io_batch_size_and_credit_disc_bound/2]).
+
-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).
-rabbit_boot_step({codec_correctness_check,
@@ -517,6 +520,7 @@ start(normal, []) ->
print_banner(),
log_banner(),
warn_if_kernel_config_dubious(),
+ warn_if_disc_io_options_dubious(),
rabbit_boot_steps:run_boot_steps(),
{ok, SupPid};
Error ->
@@ -788,6 +792,86 @@ warn_if_kernel_config_dubious() ->
true -> ok
end.
+warn_if_disc_io_options_dubious() ->
+ %% if these values are not set, it doesn't matter since
+ %% rabbit_variable_queue will pick up the values defined in the
+ %% IO_BATCH_SIZE and CREDIT_DISC_BOUND constants.
+ CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
+ undefined),
+ IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
+ undefined),
+ case catch validate_msg_store_io_batch_size_and_credit_disc_bound(
+ CreditDiscBound, IoBatchSize) of
+ ok -> ok;
+ {error, {Reason, Vars}} ->
+ rabbit_log:warning(Reason, Vars)
+ end.
+
+validate_msg_store_io_batch_size_and_credit_disc_bound(CreditDiscBound,
+ IoBatchSize) ->
+ case IoBatchSize of
+ undefined ->
+ ok;
+ IoBatchSize when is_integer(IoBatchSize) ->
+ if IoBatchSize < ?IO_BATCH_SIZE ->
+ throw({error,
+ {"io_batch_size of ~b lower than recommended value ~b, "
+ "paging performance may worsen~n",
+ [IoBatchSize, ?IO_BATCH_SIZE]}});
+ true ->
+ ok
+ end;
+ IoBatchSize ->
+ throw({error,
+ {"io_batch_size should be an integer, but ~b given",
+ [IoBatchSize]}})
+ end,
+
+ %% CreditDiscBound = {InitialCredit, MoreCreditAfter}
+ {RIC, RMCA} = ?CREDIT_DISC_BOUND,
+ case CreditDiscBound of
+ undefined ->
+ ok;
+ {IC, MCA} when is_integer(IC), is_integer(MCA) ->
+ if IC < RIC; MCA < RMCA ->
+ throw({error,
+ {"msg_store_credit_disc_bound {~b, ~b} lower than"
+ "recommended value {~b, ~b},"
+ " paging performance may worsen~n",
+ [IC, MCA, RIC, RMCA]}});
+ true ->
+ ok
+ end;
+ {IC, MCA} ->
+ throw({error,
+ {"both msg_store_credit_disc_bound values should be integers, but ~p given",
+ [{IC, MCA}]}});
+ CreditDiscBound ->
+ throw({error,
+ {"invalid msg_store_credit_disc_bound value given: ~p",
+ [CreditDiscBound]}})
+ end,
+
+ case {CreditDiscBound, IoBatchSize} of
+ {undefined, undefined} ->
+ ok;
+ {_CDB, undefined} ->
+ ok;
+ {undefined, _IBS} ->
+ ok;
+ {{InitialCredit, _MCA}, IoBatchSize} ->
+ if IoBatchSize < InitialCredit ->
+ throw(
+ {error,
+ {"msg_store_io_batch_size ~b should be bigger than the initial "
+ "credit value from msg_store_credit_disc_bound ~b,"
+ " paging performance may worsen~n",
+ [IoBatchSize, InitialCredit]}});
+ true ->
+ ok
+ end
+ end.
+
home_dir() ->
case init:get_argument(home) of
{ok, [[Home]]} -> Home;
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b94ed6e1b7..1a6beb5438 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -99,7 +99,9 @@
%% how big are our files allowed to get?
file_size_limit,
%% client ref to synced messages mapping
- cref_to_msg_ids
+ cref_to_msg_ids,
+ %% See CREDIT_DISC_BOUND in rabbit.hrl
+ credit_disc_bound
}).
-record(client_msstate,
@@ -113,7 +115,8 @@
file_handles_ets,
file_summary_ets,
cur_file_cache_ets,
- flying_ets
+ flying_ets,
+ credit_disc_bound
}).
-record(file_summary,
@@ -156,7 +159,8 @@
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid(),
- flying_ets :: ets:tid()}).
+ flying_ets :: ets:tid(),
+ credit_disc_bound :: {pos_integer(), pos_integer()}}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
@@ -476,6 +480,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gen_server2:call(
Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun},
infinity),
+ CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
+ ?CREDIT_DISC_BOUND),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -486,7 +492,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
- flying_ets = FlyingEts }.
+ flying_ets = FlyingEts,
+ credit_disc_bound = CreditDiscBound }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
@@ -499,8 +506,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) ->
- credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND),
+write_flow(MsgId, Msg,
+ CState = #client_msstate {
+ server = Server,
+ credit_disc_bound = CreditDiscBound }) ->
+ credit_flow:send(whereis(Server), CreditDiscBound),
client_write(MsgId, Msg, flow, CState).
write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
@@ -743,6 +753,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
msg_store = self()
}),
+ CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
+ ?CREDIT_DISC_BOUND),
+
State = #msstate { dir = Dir,
index_module = IndexModule,
index_state = IndexState,
@@ -762,7 +775,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
- cref_to_msg_ids = dict:new()
+ cref_to_msg_ids = dict:new(),
+ credit_disc_bound = CreditDiscBound
},
%% If we didn't recover the msg location index then we need to
@@ -846,10 +860,11 @@ handle_cast({client_delete, CRef},
handle_cast({write, CRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
- clients = Clients }) ->
+ clients = Clients,
+ credit_disc_bound = CreditDiscBound }) ->
case Flow of
flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
- credit_flow:ack(CPid, ?CREDIT_DISC_BOUND);
+ credit_flow:ack(CPid, CreditDiscBound);
noflow -> ok
end,
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 5d87743472..f95f8c5818 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -475,8 +475,22 @@ hostname() ->
cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
tcp_opts() ->
- {ok, Opts} = application:get_env(rabbit, tcp_listen_options),
- Opts.
+ {ok, ConfigOpts} = application:get_env(rabbit, tcp_listen_options),
+ merge_essential_tcp_listen_options(ConfigOpts).
+
+-define(ESSENTIAL_LISTEN_OPTIONS,
+ [binary,
+ {active, false},
+ {packet, raw},
+ {reuseaddr, true},
+ {nodelay, true}]).
+
+merge_essential_tcp_listen_options(Opts) ->
+ lists:foldl(fun ({K, _} = Opt, Acc) ->
+ lists:keystore(K, 1, Acc, Opt);
+ (Opt, Acc) ->
+ [Opt | Acc]
+ end , Opts, ?ESSENTIAL_LISTEN_OPTIONS).
%% inet_parse:address takes care of ip string, like "0.0.0.0"
%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0129b848ce..544b536aa2 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -127,7 +127,8 @@
%% binary generation/matching with constant vs variable lengths.
-define(REL_SEQ_BITS, 14).
--define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))).
+%% calculated as trunc(math:pow(2,?REL_SEQ_BITS))).
+-define(SEGMENT_ENTRY_COUNT, 16384).
%% seq only is binary 01 followed by 14 bits of rel seq id
%% (range: 0 - 16383)
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f66ba21605..2224a74b59 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -297,7 +297,9 @@
%% Unlike the other counters these two do not feed into
%% #rates{} and get reset
disk_read_count,
- disk_write_count
+ disk_write_count,
+
+ io_batch_size
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -320,10 +322,6 @@
end_seq_id %% end_seq_id is exclusive
}).
-%% When we discover that we should write some indices to disk for some
-%% betas, the IO_BATCH_SIZE sets the number of betas that we must be
-%% due to write indices for before we do any work at all.
--define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
@@ -395,7 +393,9 @@
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
disk_read_count :: non_neg_integer(),
- disk_write_count :: non_neg_integer() }).
+ disk_write_count :: non_neg_integer(),
+
+ io_batch_size :: pos_integer()}).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
@@ -1196,6 +1196,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
end_seq_id = NextSeqId })
end,
Now = time_compat:monotonic_time(),
+ IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
+ ?IO_BATCH_SIZE),
State = #vqstate {
q1 = ?QUEUE:new(),
q2 = ?QUEUE:new(),
@@ -1232,7 +1234,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
ack_out_counter = 0,
ack_in_counter = 0,
disk_read_count = 0,
- disk_write_count = 0 },
+ disk_write_count = 0,
+
+ io_batch_size = IoBatchSize },
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
@@ -1809,6 +1813,7 @@ reduce_memory_use(State = #vqstate {
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
+ io_batch_size = IoBatchSize,
rates = #rates { in = AvgIngress,
out = AvgEgress,
ack_in = AvgAckIngress,
@@ -1836,7 +1841,7 @@ reduce_memory_use(State = #vqstate {
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
permitted_beta_count(State1)) of
- S2 when S2 >= ?IO_BATCH_SIZE ->
+ S2 when S2 >= IoBatchSize ->
%% There is an implicit, but subtle, upper bound here. We
%% may shuffle a lot of messages from Q2/3 into delta, but
%% the number of these that require any disk operation,