summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-08-28 16:41:24 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-08-28 16:41:24 +0200
commit623f1de05bccad3170d26438816855ef1e439256 (patch)
tree4a1c744748c15e3c404ad9abf844922381fddb59
parent75071795bfa4581cc08a39529e27f06c767079f2 (diff)
parent1085e243a6df514f675b452aa96ffa23b6632ac3 (diff)
downloadrabbitmq-server-git-623f1de05bccad3170d26438816855ef1e439256.tar.gz
Merge branch 'stable' into rabbitmq-server-289
-rw-r--r--ebin/rabbit_app.in5
-rw-r--r--include/rabbit.hrl4
-rw-r--r--src/rabbit.erl84
-rw-r--r--src/rabbit_msg_store.erl32
-rw-r--r--src/rabbit_queue_index.erl135
-rw-r--r--src/rabbit_variable_queue.erl21
-rw-r--r--test/src/on_disk_store_tunable_parameter_validation_test.erl47
-rw-r--r--test/src/rabbit_tests.erl1
8 files changed, 263 insertions, 66 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 37658d7312..636851989e 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -83,5 +83,8 @@
gen_fsm, ssl]},
{ssl_apps, [asn1, crypto, public_key, ssl]},
%% see rabbitmq-server#114
- {mirroring_flow_control, true}
+ {mirroring_flow_control, true},
+ %% see rabbitmq-server#227 and related tickets
+ {msg_store_credit_disc_bound, {2000, 500}},
+ {msg_store_io_batch_size, 2048}
]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index ddcfd6a648..8da29d4e65 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -122,6 +122,10 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(CREDIT_DISC_BOUND, {2000, 500}).
+%% When we discover that we should write some indices to disk for some
+%% betas, the IO_BATCH_SIZE sets the number of betas that we must be
+%% due to write indices for before we do any work at all.
+-define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND
-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7ee51f09f7..bb906ede4f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -30,6 +30,9 @@
%% Boot steps.
-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]).
+%% for tests
+-export([validate_msg_store_io_batch_size_and_credit_disc_bound/2]).
+
-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).
-rabbit_boot_step({codec_correctness_check,
@@ -520,6 +523,7 @@ start(normal, []) ->
print_banner(),
log_banner(),
warn_if_kernel_config_dubious(),
+ warn_if_disc_io_options_dubious(),
run_boot_steps(),
{ok, SupPid};
Error ->
@@ -848,6 +852,86 @@ warn_if_kernel_config_dubious() ->
true -> ok
end.
+warn_if_disc_io_options_dubious() ->
+ %% if these values are not set, it doesn't matter since
+ %% rabbit_variable_queue will pick up the values defined in the
+ %% IO_BATCH_SIZE and CREDIT_DISC_BOUND constants.
+ CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
+ undefined),
+ IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
+ undefined),
+ case catch validate_msg_store_io_batch_size_and_credit_disc_bound(
+ CreditDiscBound, IoBatchSize) of
+ ok -> ok;
+ {error, {Reason, Vars}} ->
+ rabbit_log:warning(Reason, Vars)
+ end.
+
+validate_msg_store_io_batch_size_and_credit_disc_bound(CreditDiscBound,
+ IoBatchSize) ->
+ case IoBatchSize of
+ undefined ->
+ ok;
+ IoBatchSize when is_integer(IoBatchSize) ->
+ if IoBatchSize < ?IO_BATCH_SIZE ->
+ throw({error,
+ {"io_batch_size of ~b lower than recommended value ~b, "
+ "paging performance may worsen~n",
+ [IoBatchSize, ?IO_BATCH_SIZE]}});
+ true ->
+ ok
+ end;
+ IoBatchSize ->
+ throw({error,
+ {"io_batch_size should be an integer, but ~b given",
+ [IoBatchSize]}})
+ end,
+
+ %% CreditDiscBound = {InitialCredit, MoreCreditAfter}
+ {RIC, RMCA} = ?CREDIT_DISC_BOUND,
+ case CreditDiscBound of
+ undefined ->
+ ok;
+ {IC, MCA} when is_integer(IC), is_integer(MCA) ->
+ if IC < RIC; MCA < RMCA ->
+ throw({error,
+ {"msg_store_credit_disc_bound {~b, ~b} lower than"
+ "recommended value {~b, ~b},"
+ " paging performance may worsen~n",
+ [IC, MCA, RIC, RMCA]}});
+ true ->
+ ok
+ end;
+ {IC, MCA} ->
+ throw({error,
+ {"both msg_store_credit_disc_bound values should be integers, but ~p given",
+ [{IC, MCA}]}});
+ CreditDiscBound ->
+ throw({error,
+ {"invalid msg_store_credit_disc_bound value given: ~p",
+ [CreditDiscBound]}})
+ end,
+
+ case {CreditDiscBound, IoBatchSize} of
+ {undefined, undefined} ->
+ ok;
+ {_CDB, undefined} ->
+ ok;
+ {undefined, _IBS} ->
+ ok;
+ {{InitialCredit, _MCA}, IoBatchSize} ->
+ if IoBatchSize < InitialCredit ->
+ throw(
+ {error,
+ {"msg_store_io_batch_size ~b should be bigger than the initial "
+ "credit value from msg_store_credit_disc_bound ~b,"
+ " paging performance may worsen~n",
+ [IoBatchSize, InitialCredit]}});
+ true ->
+ ok
+ end
+ end.
+
home_dir() ->
case init:get_argument(home) of
{ok, [[Home]]} -> Home;
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 02a3bd0f15..d0969f1b0e 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -77,7 +77,8 @@
%% to callbacks
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
- cref_to_msg_ids %% client ref to synced messages mapping
+ cref_to_msg_ids, %% client ref to synced messages mapping
+ credit_disc_bound %% See rabbit.hrl CREDIT_DISC_BOUND
}).
-record(client_msstate,
@@ -91,7 +92,8 @@
file_handles_ets,
file_summary_ets,
cur_file_cache_ets,
- flying_ets
+ flying_ets,
+ credit_disc_bound
}).
-record(file_summary,
@@ -134,7 +136,8 @@
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid(),
- flying_ets :: ets:tid()}).
+ flying_ets :: ets:tid(),
+ credit_disc_bound :: {pos_integer(), pos_integer()}}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
@@ -442,6 +445,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gen_server2:call(
Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun},
infinity),
+ CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
+ ?CREDIT_DISC_BOUND),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -452,7 +457,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
- flying_ets = FlyingEts }.
+ flying_ets = FlyingEts,
+ credit_disc_bound = CreditDiscBound }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
@@ -465,8 +471,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) ->
- credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND),
+write_flow(MsgId, Msg,
+ CState = #client_msstate {
+ server = Server,
+ credit_disc_bound = CreditDiscBound }) ->
+ credit_flow:send(whereis(Server), CreditDiscBound),
client_write(MsgId, Msg, flow, CState).
write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
@@ -709,6 +718,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
msg_store = self()
}),
+ CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
+ ?CREDIT_DISC_BOUND),
+
State = #msstate { dir = Dir,
index_module = IndexModule,
index_state = IndexState,
@@ -728,7 +740,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
- cref_to_msg_ids = dict:new()
+ cref_to_msg_ids = dict:new(),
+ credit_disc_bound = CreditDiscBound
},
%% If we didn't recover the msg location index then we need to
@@ -812,10 +825,11 @@ handle_cast({client_delete, CRef},
handle_cast({write, CRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
- clients = Clients }) ->
+ clients = Clients,
+ credit_disc_bound = CreditDiscBound }) ->
case Flow of
flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
- credit_flow:ack(CPid, ?CREDIT_DISC_BOUND);
+ credit_flow:ack(CPid, CreditDiscBound);
noflow -> ok
end,
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index bfa449b2a0..7158058adb 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -181,7 +181,8 @@
unconfirmed, unconfirmed_msg,
pre_publish_cache, delivered_cache}).
--record(segment, {num, path, journal_entries, unacked}).
+-record(segment, {num, path, journal_entries,
+ entries_to_segment, unacked}).
-include("rabbit.hrl").
@@ -196,10 +197,11 @@
-type(hdl() :: ('undefined' | any())).
-type(segment() :: ('undefined' |
- #segment { num :: non_neg_integer(),
- path :: file:filename(),
- journal_entries :: array:array(),
- unacked :: non_neg_integer()
+ #segment { num :: non_neg_integer(),
+ path :: file:filename(),
+ journal_entries :: array:array(),
+ entries_to_segment :: array:array(),
+ unacked :: non_neg_integer()
})).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict:dict(), [segment()]}).
@@ -716,30 +718,46 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
add_to_journal(RelSeq, Action,
Segment = #segment { journal_entries = JEntries,
+ entries_to_segment = EToSeg,
unacked = UnackedCount }) ->
+
+ {Fun, Entry} = action_to_entry(RelSeq, Action, JEntries),
+
+ {JEntries1, EToSeg1} =
+ case Fun of
+ set ->
+ {array:set(RelSeq, Entry, JEntries),
+ array:set(RelSeq, entry_to_segment(RelSeq, Entry, []),
+ EToSeg)};
+ reset ->
+ {array:reset(RelSeq, JEntries),
+ array:reset(RelSeq, EToSeg)}
+ end,
+
Segment #segment {
- journal_entries = add_to_journal(RelSeq, Action, JEntries),
+ journal_entries = JEntries1,
+ entries_to_segment = EToSeg1,
unacked = UnackedCount + case Action of
?PUB -> +1;
del -> 0;
ack -> -1
- end};
+ end}.
-add_to_journal(RelSeq, Action, JEntries) ->
+action_to_entry(RelSeq, Action, JEntries) ->
case array:get(RelSeq, JEntries) of
undefined ->
- array:set(RelSeq,
- case Action of
- ?PUB -> {Action, no_del, no_ack};
- del -> {no_pub, del, no_ack};
- ack -> {no_pub, no_del, ack}
- end, JEntries);
+ {set,
+ case Action of
+ ?PUB -> {Action, no_del, no_ack};
+ del -> {no_pub, del, no_ack};
+ ack -> {no_pub, no_del, ack}
+ end};
({Pub, no_del, no_ack}) when Action == del ->
- array:set(RelSeq, {Pub, del, no_ack}, JEntries);
+ {set, {Pub, del, no_ack}};
({no_pub, del, no_ack}) when Action == ack ->
- array:set(RelSeq, {no_pub, del, ack}, JEntries);
+ {set, {no_pub, del, ack}};
({?PUB, del, no_ack}) when Action == ack ->
- array:reset(RelSeq, JEntries)
+ {reset, none}
end.
maybe_flush_journal(State) ->
@@ -770,18 +788,23 @@ flush_journal(State = #qistate { segments = Segments }) ->
notify_sync(State1 #qistate { dirty_count = 0 }).
append_journal_to_segment(#segment { journal_entries = JEntries,
+ entries_to_segment = EToSeg,
path = Path } = Segment) ->
case array:sparse_size(JEntries) of
0 -> Segment;
- _ -> Seg = array:sparse_foldr(
- fun entry_to_segment/3, [], JEntries),
- file_handle_cache_stats:update(queue_index_write),
-
- {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
- [{write_buffer, infinity}]),
- file_handle_cache:append(Hdl, Seg),
- ok = file_handle_cache:close(Hdl),
- Segment #segment { journal_entries = array_new() }
+ _ ->
+ file_handle_cache_stats:update(queue_index_write),
+
+ {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
+ [{write_buffer, infinity}]),
+ %% the file_handle_cache also does a list reverse, so this
+ %% might not be required here, but before we were doing a
+ %% sparse_foldr, a lists:reverse/1 seems to be the correct
+ %% thing to do for now.
+ file_handle_cache:append(Hdl, lists:reverse(array:to_list(EToSeg))),
+ ok = file_handle_cache:close(Hdl),
+ Segment #segment { journal_entries = array_new(),
+ entries_to_segment = array_new([]) }
end.
get_journal_handle(State = #qistate { journal_handle = undefined,
@@ -814,14 +837,16 @@ recover_journal(State) ->
Segments1 =
segment_map(
fun (Segment = #segment { journal_entries = JEntries,
+ entries_to_segment = EToSeg,
unacked = UnackedCountInJournal }) ->
%% We want to keep ack'd entries in so that we can
%% remove them if duplicates are in the journal. The
%% counts here are purely from the segment itself.
{SegEntries, UnackedCountInSeg} = load_segment(true, Segment),
- {JEntries1, UnackedCountDuplicates} =
- journal_minus_segment(JEntries, SegEntries),
+ {JEntries1, EToSeg1, UnackedCountDuplicates} =
+ journal_minus_segment(JEntries, EToSeg, SegEntries),
Segment #segment { journal_entries = JEntries1,
+ entries_to_segment = EToSeg1,
unacked = (UnackedCountInJournal +
UnackedCountInSeg -
UnackedCountDuplicates) }
@@ -908,10 +933,11 @@ segment_find_or_new(Seg, Dir, Segments) ->
{ok, Segment} -> Segment;
error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION,
Path = filename:join(Dir, SegName),
- #segment { num = Seg,
- path = Path,
- journal_entries = array_new(),
- unacked = 0 }
+ #segment { num = Seg,
+ path = Path,
+ journal_entries = array_new(),
+ entries_to_segment = array_new([]),
+ unacked = 0 }
end.
segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) ->
@@ -951,20 +977,20 @@ segment_nums({Segments, CachedSegments}) ->
segments_new() ->
{dict:new(), []}.
-entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) ->
- Buf;
-entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) ->
+entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) ->
+ Initial;
+entry_to_segment(RelSeq, {Pub, Del, Ack}, Initial) ->
%% NB: we are assembling the segment in reverse order here, so
%% del/ack comes first.
Buf1 = case {Del, Ack} of
{no_del, no_ack} ->
- Buf;
+ Initial;
_ ->
Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS>>,
case {Del, Ack} of
- {del, ack} -> [[Binary, Binary] | Buf];
- _ -> [Binary | Buf]
+ {del, ack} -> [[Binary, Binary] | Initial];
+ _ -> [Binary | Initial]
end
end,
case Pub of
@@ -1053,7 +1079,10 @@ add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
end.
array_new() ->
- array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
+ array_new(undefined).
+
+array_new(Default) ->
+ array:new([{default, Default}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
bool_to_int(true ) -> 1;
bool_to_int(false) -> 0.
@@ -1099,19 +1128,29 @@ segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) ->
%% Remove from the journal entries for a segment, items that are
%% duplicates of entries found in the segment itself. Used on start up
%% to clean up the journal.
-journal_minus_segment(JEntries, SegEntries) ->
+%%
+%% We need to update the entries_to_segment since they are just a
+%% cache of what's on the journal.
+journal_minus_segment(JEntries, EToSeg, SegEntries) ->
array:sparse_foldl(
- fun (RelSeq, JObj, {JEntriesOut, UnackedRemoved}) ->
+ fun (RelSeq, JObj, {JEntriesOut, EToSegOut, UnackedRemoved}) ->
SegEntry = array:get(RelSeq, SegEntries),
{Obj, UnackedRemovedDelta} =
journal_minus_segment1(JObj, SegEntry),
- {case Obj of
- keep -> JEntriesOut;
- undefined -> array:reset(RelSeq, JEntriesOut);
- _ -> array:set(RelSeq, Obj, JEntriesOut)
- end,
- UnackedRemoved + UnackedRemovedDelta}
- end, {JEntries, 0}, JEntries).
+ {JEntriesOut1, EToSegOut1} =
+ case Obj of
+ keep ->
+ {JEntriesOut, EToSegOut};
+ undefined ->
+ {array:reset(RelSeq, JEntriesOut),
+ array:reset(RelSeq, EToSegOut)};
+ _ ->
+ {array:set(RelSeq, Obj, JEntriesOut),
+ array:set(RelSeq, entry_to_segment(RelSeq, Obj, []),
+ EToSegOut)}
+ end,
+ {JEntriesOut1, EToSegOut1, UnackedRemoved + UnackedRemovedDelta}
+ end, {JEntries, EToSeg, 0}, JEntries).
%% Here, the result is a tuple with the first element containing the
%% item we are adding to or modifying in the (initially fresh) journal
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a77301d37d..5c3ce90326 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -297,7 +297,9 @@
%% Unlike the other counters these two do not feed into
%% #rates{} and get reset
disk_read_count,
- disk_write_count
+ disk_write_count,
+
+ io_batch_size
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -320,10 +322,6 @@
end_seq_id %% end_seq_id is exclusive
}).
-%% When we discover that we should write some indices to disk for some
-%% betas, the IO_BATCH_SIZE sets the number of betas that we must be
-%% due to write indices for before we do any work at all.
--define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
@@ -396,7 +394,9 @@
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
disk_read_count :: non_neg_integer(),
- disk_write_count :: non_neg_integer() }).
+ disk_write_count :: non_neg_integer(),
+
+ io_batch_size :: pos_integer()}).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
@@ -1135,6 +1135,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
end_seq_id = NextSeqId })
end,
Now = now(),
+ IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
+ ?IO_BATCH_SIZE),
State = #vqstate {
q1 = ?QUEUE:new(),
q2 = ?QUEUE:new(),
@@ -1171,7 +1173,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
ack_out_counter = 0,
ack_in_counter = 0,
disk_read_count = 0,
- disk_write_count = 0 },
+ disk_write_count = 0,
+
+ io_batch_size = IoBatchSize },
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
@@ -1788,6 +1792,7 @@ reduce_memory_use(State = #vqstate {
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
+ io_batch_size = IoBatchSize,
rates = #rates { in = AvgIngress,
out = AvgEgress,
ack_in = AvgAckIngress,
@@ -1815,7 +1820,7 @@ reduce_memory_use(State = #vqstate {
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
permitted_beta_count(State1)) of
- S2 when S2 >= ?IO_BATCH_SIZE ->
+ S2 when S2 >= IoBatchSize ->
%% There is an implicit, but subtle, upper bound here. We
%% may shuffle a lot of messages from Q2/3 into delta, but
%% the number of these that require any disk operation,
diff --git a/test/src/on_disk_store_tunable_parameter_validation_test.erl b/test/src/on_disk_store_tunable_parameter_validation_test.erl
new file mode 100644
index 0000000000..9db5425e6d
--- /dev/null
+++ b/test/src/on_disk_store_tunable_parameter_validation_test.erl
@@ -0,0 +1,47 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(on_disk_store_tunable_parameter_validation_test).
+
+-include("rabbit.hrl").
+
+-export([test_msg_store_parameter_validation/0]).
+
+-define(T(Fun, Args), (catch apply(rabbit, Fun, Args))).
+
+test_msg_store_parameter_validation() ->
+ %% make sure it works with default values
+ ok = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [?CREDIT_DISC_BOUND, ?IO_BATCH_SIZE]),
+
+ %% IO_BATCH_SIZE must be greater than CREDIT_DISC_BOUND initial credit
+ ok = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, 3000]),
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, 1500]),
+
+ %% All values must be integers
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, "1500"]),
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{"2000", 500}, abc]),
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, "500"}, 2048]),
+
+ %% CREDIT_DISC_BOUND must be a tuple
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [[2000, 500], 1500]),
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [2000, 1500]),
+
+ %% config values can't be smaller than default values
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{1999, 500}, 2048]),
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 499}, 2048]),
+ {error, _} = ?T(validate_msg_store_io_batch_size_and_credit_disc_bound, [{2000, 500}, 2047]),
+
+ passed.
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index e45ec82500..39a276f102 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -87,6 +87,7 @@ all_tests0() ->
end),
passed = test_configurable_server_properties(),
passed = vm_memory_monitor_tests:all_tests(),
+ passed = on_disk_store_tunable_parameter_validation_test:test_msg_store_parameter_validation(),
passed.