summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2021-12-28 00:36:22 +0300
committerMichael Klishin <michael@clojurewerkz.org>2021-12-28 00:36:22 +0300
commita29a68a63297ec64a28d14ae260ef66511f2c5d1 (patch)
treeb040b4a28ceb680c1d01e1e728e7150d49f31105
parent49809d1516ec2909c9306c1fb46baf06312ba365 (diff)
parentb569ab5d74c9078c285e1b1cfce6fdd5f2291c79 (diff)
downloadrabbitmq-server-git-a29a68a63297ec64a28d14ae260ef66511f2c5d1.tar.gz
Merge branch 'thuandb-master'
-rw-r--r--deps/rabbit/BUILD.bazel8
-rw-r--r--deps/rabbit/priv/schema/rabbit.schema17
-rw-r--r--deps/rabbit/src/rabbit_mirror_queue_master.erl10
-rw-r--r--deps/rabbit/src/rabbit_mirror_queue_misc.erl22
-rw-r--r--deps/rabbit/src/rabbit_mirror_queue_sync.erl70
-rw-r--r--deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl86
-rw-r--r--deps/rabbit/test/unit_classic_mirrored_queue_throughput_SUITE.erl29
7 files changed, 226 insertions, 16 deletions
diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel
index f607f38246..ebb517d2ad 100644
--- a/deps/rabbit/BUILD.bazel
+++ b/deps/rabbit/BUILD.bazel
@@ -989,6 +989,14 @@ suites = [
size = "medium",
flaky = True,
),
+ rabbitmq_suite(
+ name = "rabbit_mirror_queue_sync_SUITE",
+ size = "small",
+ ),
+ rabbitmq_suite(
+ name = "rabbit_mirror_queue_misc_SUITE",
+ size = "small",
+ ),
]
assert_suites(
diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema
index a9435fadb8..1537639b10 100644
--- a/deps/rabbit/priv/schema/rabbit.schema
+++ b/deps/rabbit/priv/schema/rabbit.schema
@@ -1021,6 +1021,23 @@ end}.
{mapping, "mirroring_sync_batch_size", "rabbit.mirroring_sync_batch_size",
[{datatype, bytesize}, {validators, ["mirroring_sync_batch_size"]}]}.
+%% Mirror sync max throughput (in bytes) per second.
+%% Supported unit symbols:
+%% k, kiB: kibibytes (2^10 - 1,024 bytes)
+%% M, MiB: mebibytes (2^20 - 1,048,576 bytes)
+%% G, GiB: gibibytes (2^30 - 1,073,741,824 bytes)
+%% kB: kilobytes (10^3 - 1,000 bytes)
+%% MB: megabytes (10^6 - 1,000,000 bytes)
+%% GB: gigabytes (10^9 - 1,000,000,000 bytes)
+%%
+%% 0 means "no limit".
+%%
+%% {mirroring_sync_max_throughput, 0},
+
+{mapping, "mirroring_sync_max_throughput", "rabbit.mirroring_sync_max_throughput", [
+ {datatype, [integer, string]}
+]}.
+
%% Peer discovery backend used by cluster formation.
%%
diff --git a/deps/rabbit/src/rabbit_mirror_queue_master.erl b/deps/rabbit/src/rabbit_mirror_queue_master.erl
index 370a11af2b..e7980d44bc 100644
--- a/deps/rabbit/src/rabbit_mirror_queue_master.erl
+++ b/deps/rabbit/src/rabbit_mirror_queue_master.erl
@@ -156,13 +156,14 @@ sync_mirrors(HandleInfo, EmitStats,
{ok, Q} = rabbit_amqqueue:lookup(QName),
SPids = amqqueue:get_slave_pids(Q),
SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q),
- Log("batch size: ~p", [SyncBatchSize]),
+ SyncThroughput = rabbit_mirror_queue_misc:default_max_sync_throughput(),
+ log_mirror_sync_config(Log, SyncBatchSize, SyncThroughput),
Ref = make_ref(),
Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(
- Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of
+ Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, SyncThroughput, BQ, BQS) of
{cancelled, BQS1} -> Log(" synchronisation cancelled ", []),
{ok, S(BQS1)};
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
@@ -173,6 +174,11 @@ sync_mirrors(HandleInfo, EmitStats,
{ok, S(BQS1)}
end.
+log_mirror_sync_config(Log, SyncBatchSize, 0) ->
+ Log("batch size: ~p", [SyncBatchSize]);
+log_mirror_sync_config(Log, SyncBatchSize, SyncThroughput) ->
+ Log("max batch size: ~p; max sync throughput: ~p bytes/s", [SyncBatchSize, SyncThroughput]).
+
terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/deps/rabbit/src/rabbit_mirror_queue_misc.erl b/deps/rabbit/src/rabbit_mirror_queue_misc.erl
index 7775489309..6b1e25122f 100644
--- a/deps/rabbit/src/rabbit_mirror_queue_misc.erl
+++ b/deps/rabbit/src/rabbit_mirror_queue_misc.erl
@@ -16,7 +16,8 @@
is_mirrored/1, is_mirrored_ha_nodes/1,
update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
- sync_batch_size/1, log_info/3, log_warning/3]).
+ sync_batch_size/1, default_max_sync_throughput/0,
+ log_info/3, log_warning/3]).
-export([stop_all_slaves/5]).
-export([sync_queue/1, cancel_sync_queue/1, queue_length/1]).
@@ -506,6 +507,25 @@ default_batch_size() ->
rabbit_misc:get_env(rabbit, mirroring_sync_batch_size,
?DEFAULT_BATCH_SIZE).
+-define(DEFAULT_MAX_SYNC_THROUGHPUT, 0).
+
+default_max_sync_throughput() ->
+ case application:get_env(rabbit, mirroring_sync_max_throughput) of
+ {ok, Value} ->
+ case rabbit_resource_monitor_misc:parse_information_unit(Value) of
+ {ok, ParsedThroughput} ->
+ ParsedThroughput;
+ {error, parse_error} ->
+ rabbit_log:warning(
+ "The configured value for the mirroring_sync_max_throughput is "
+ "not a valid value: ~p. Disabled sync throughput control. ",
+ [Value]),
+ ?DEFAULT_MAX_SYNC_THROUGHPUT
+ end;
+ undefined ->
+ ?DEFAULT_MAX_SYNC_THROUGHPUT
+ end.
+
-spec update_mirrors
(amqqueue:amqqueue(), amqqueue:amqqueue()) -> 'ok'.
diff --git a/deps/rabbit/src/rabbit_mirror_queue_sync.erl b/deps/rabbit/src/rabbit_mirror_queue_sync.erl
index 896bdd5c61..26554ece83 100644
--- a/deps/rabbit/src/rabbit_mirror_queue_sync.erl
+++ b/deps/rabbit/src/rabbit_mirror_queue_sync.erl
@@ -9,10 +9,15 @@
-include_lib("rabbit_common/include/rabbit.hrl").
--export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).
+-export([master_prepare/4, master_go/9, slave/7, conserve_resources/3]).
+
+%% Export for UTs
+-export([maybe_master_batch_send/2, get_time_diff/3, append_to_acc/4]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
+-define(SYNC_THROUGHPUT_EVAL_INTERVAL_MILLIS, 50).
+
%% There are three processes around, the master, the syncer and the
%% slave(s). The syncer is an intermediary, linked to the master in
%% order to make sure we do not mess with the master's credit flow or
@@ -67,23 +72,24 @@ master_prepare(Ref, QName, Log, SPids) ->
rabbit_mirror_queue_master:stats_fun(),
rabbit_mirror_queue_master:stats_fun(),
non_neg_integer(),
+ non_neg_integer(),
bq(), bqs()) ->
{'already_synced', bqs()} | {'ok', bqs()} |
{'cancelled', bqs()} |
{'shutdown', any(), bqs()} |
{'sync_died', any(), bqs()}.
-master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) ->
+master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, SyncThroughput, BQ, BQS) ->
Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
receive
{'EXIT', Syncer, normal} -> {already_synced, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
{ready, Syncer} -> EmitStats({syncing, 0}),
- master_batch_go0(Args, SyncBatchSize,
+ master_batch_go0(Args, SyncBatchSize, SyncThroughput,
BQ, BQS)
end.
-master_batch_go0(Args, BatchSize, BQ, BQS) ->
+master_batch_go0(Args, BatchSize, SyncThroughput, BQ, BQS) ->
FoldFun =
fun (Msg, MsgProps, Unacked, Acc) ->
Acc1 = append_to_acc(Msg, MsgProps, Unacked, Acc),
@@ -92,24 +98,27 @@ master_batch_go0(Args, BatchSize, BQ, BQS) ->
false -> {cont, Acc1}
end
end,
- FoldAcc = {[], 0, {0, BQ:depth(BQS)}, erlang:monotonic_time()},
+ FoldAcc = {[], 0, {0, erlang:monotonic_time(), SyncThroughput}, {0, BQ:depth(BQS)}, erlang:monotonic_time()},
bq_fold(FoldFun, FoldAcc, Args, BQ, BQS).
master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
- {Batch, I, {Curr, Len}, Last}) ->
+ {Batch, I, {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, Last}) ->
T = maybe_emit_stats(Last, I, EmitStats, Log),
HandleInfo({syncing, I}),
handle_set_maximum_since_use(),
SyncMsg = {msgs, Ref, lists:reverse(Batch)},
- NewAcc = {[], I + length(Batch), {Curr, Len}, T},
+ NewAcc = {[], I + length(Batch), {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, T},
master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent).
%% Either send messages when we reach the last one in the queue or
%% whenever we have accumulated BatchSize messages.
-maybe_master_batch_send({_, _, {Len, Len}, _}, _BatchSize) ->
+maybe_master_batch_send({_, _, _, {Len, Len}, _}, _BatchSize) ->
+ true;
+maybe_master_batch_send({_, _, _, {Curr, _Len}, _}, BatchSize)
+ when Curr rem BatchSize =:= 0 ->
true;
-maybe_master_batch_send({_, _, {Curr, _Len}, _}, BatchSize)
- when Curr rem BatchSize =:= 0 ->
+maybe_master_batch_send({_, _, {TotalBytes, _, SyncThroughput}, {_Curr, _Len}, _}, _BatchSize)
+ when TotalBytes > SyncThroughput ->
true;
maybe_master_batch_send(_Acc, _BatchSize) ->
false.
@@ -121,8 +130,10 @@ bq_fold(FoldFun, FoldAcc, Args, BQ, BQS) ->
{_, BQS1} -> master_done(Args, BQS1)
end.
-append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {Curr, Len}, T}) ->
- {[{Msg, MsgProps, Unacked} | Batch], I, {Curr + 1, Len}, T}.
+append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {_, _, 0}, {Curr, Len}, T}) ->
+ {[{Msg, MsgProps, Unacked} | Batch], I, {0, 0, 0}, {Curr + 1, Len}, T};
+append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, T}) ->
+ {[{Msg, MsgProps, Unacked} | Batch], I, {TotalBytes + rabbit_basic:msg_size(Msg), LastCheck, SyncThroughput}, {Curr + 1, Len}, T}.
master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) ->
receive
@@ -131,11 +142,44 @@ master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) ->
gen_server2:reply(From, ok),
{stop, cancelled};
{next, Ref} -> Syncer ! SyncMsg,
- {cont, NewAcc};
+ {Msgs, I , {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, T} = NewAcc,
+ {NewTotalBytes, NewLastCheck} = maybe_throttle_sync_throughput(TotalBytes, LastCheck, SyncThroughput),
+ {cont, {Msgs, I, {NewTotalBytes, NewLastCheck, SyncThroughput}, {Curr, Len}, T}};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
end.
+maybe_throttle_sync_throughput(_ , _, 0) ->
+ {0, erlang:monotonic_time()};
+maybe_throttle_sync_throughput(TotalBytes, LastCheck, SyncThroughput) ->
+ Interval = erlang:convert_time_unit(erlang:monotonic_time() - LastCheck, native, milli_seconds),
+ case Interval > ?SYNC_THROUGHPUT_EVAL_INTERVAL_MILLIS of
+ true -> maybe_pause_sync(TotalBytes, Interval, SyncThroughput),
+ {0, erlang:monotonic_time()}; %% reset TotalBytes counter and LastCheck.;
+ false -> {TotalBytes, LastCheck}
+ end.
+
+maybe_pause_sync(TotalBytes, Interval, SyncThroughput) ->
+ Delta = get_time_diff(TotalBytes, Interval, SyncThroughput),
+ pause_queue_sync(Delta).
+
+pause_queue_sync(0) ->
+ rabbit_log_mirroring:debug("Sync throughput is ok.");
+pause_queue_sync(Delta) ->
+ rabbit_log_mirroring:debug("Sync throughput exceeds threshold. Pause queue sync for ~p ms", [Delta]),
+ timer:sleep(Delta).
+
+%% Sync throughput computation:
+%% - Total bytes have been sent since last check: TotalBytes
+%% - Used/Elapsed time since last check: Interval (in milliseconds)
+%% - Effective/Used throughput in bytes/s: TotalBytes/Interval * 1000.
+%% - When UsedThroughput > SyncThroughput -> we need to slow down to compensate over-used rate.
+%% The amount of time to pause queue sync is the different between time needed to broadcast TotalBytes at max throughput
+%% and the elapsed time (Interval).
+get_time_diff(TotalBytes, Interval, SyncThroughput) ->
+ rabbit_log_mirroring:debug("Total ~p bytes has been sent over last ~p ms. Effective sync througput: ~p", [TotalBytes, Interval, round(TotalBytes * 1000 / Interval)]),
+ max(round(TotalBytes/SyncThroughput * 1000 - Interval), 0).
+
master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) ->
receive
{'$gen_call', From,
diff --git a/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl b/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl
new file mode 100644
index 0000000000..502d5e430e
--- /dev/null
+++ b/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl
@@ -0,0 +1,86 @@
+-module(unit_classic_mirrored_queue_sync_throttling_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ maybe_master_batch_send,
+ get_time_diff,
+ append_to_acc
+ ].
+
+maybe_master_batch_send(_Config) ->
+ SyncBatchSize = 4096,
+ SyncThroughput = 2000,
+ QueueLen = 10000,
+ ?assertEqual(
+ true, %% Message reach the last one in the queue
+ rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {0, 0, SyncThroughput}, {QueueLen, QueueLen}, 0}, SyncBatchSize)),
+ ?assertEqual(
+ true, %% # messages batched is less than batch size; and total message size has reached the batch size
+ rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {0, 0, SyncThroughput}, {SyncBatchSize, QueueLen}, 0}, SyncBatchSize)),
+ TotalBytes0 = SyncThroughput + 1,
+ Curr0 = 1,
+ ?assertEqual(
+ true, %% Total batch size exceed max sync throughput
+ rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {TotalBytes0, 0, SyncThroughput}, {Curr0, QueueLen}, 0}, SyncBatchSize)),
+ TotalBytes1 = 1,
+ Curr1 = 1,
+ ?assertEqual(
+ false, %% # messages batched is less than batch size; and total bytes is less than sync throughput
+ rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {TotalBytes1, 0, SyncThroughput}, {Curr1, QueueLen}, 0}, SyncBatchSize)),
+ ok.
+
+get_time_diff(_Config) ->
+ TotalBytes0 = 100,
+ Interval0 = 1000, %% ms
+ MaxSyncThroughput0 = 100, %% bytes/s
+ ?assertEqual(%% Used throughput = 100 / 1000 * 1000 = 100 bytes/s; matched max throughput
+ 0, %% => no need to pause queue sync
+ rabbit_mirror_queue_sync:get_time_diff(TotalBytes0, Interval0, MaxSyncThroughput0)),
+
+ TotalBytes1 = 100,
+ Interval1 = 1000, %% ms
+ MaxSyncThroughput1 = 200, %% bytes/s
+ ?assertEqual( %% Used throughput = 100 / 1000 * 1000 = 100 bytes/s; less than max throughput
+ 0, %% => no need to pause queue sync
+ rabbit_mirror_queue_sync:get_time_diff(TotalBytes1, Interval1, MaxSyncThroughput1)),
+
+ TotalBytes2 = 100,
+ Interval2 = 1000, %% ms
+ MaxSyncThroughput2 = 50, %% bytes/s
+ ?assertEqual( %% Used throughput = 100 / 1000 * 1000 = 100 bytes/s; greater than max throughput
+ 1000, %% => pause queue sync for 1000 ms
+ rabbit_mirror_queue_sync:get_time_diff(TotalBytes2, Interval2, MaxSyncThroughput2)),
+ ok.
+
+append_to_acc(_Config) ->
+ Msg = #basic_message{
+ id = 1,
+ content = #content{
+ properties = #'P_basic'{
+ priority = 2
+ },
+ payload_fragments_rev = [[<<"1234567890">>]] %% 10 bytes
+ },
+ is_persistent = true
+ },
+ BQDepth = 10,
+ SyncThroughput_0 = 0,
+ FoldAcc1 = {[], 0, {0, erlang:monotonic_time(), SyncThroughput_0}, {0, BQDepth}, erlang:monotonic_time()},
+ {_, _, {TotalBytes1, _, _}, _, _} = rabbit_mirror_queue_sync:append_to_acc(Msg, {}, false, FoldAcc1),
+ ?assertEqual(0, TotalBytes1), %% Skipping calculating TotalBytes for the pending batch as SyncThroughput is 0.
+
+ SyncThroughput = 100,
+ FoldAcc2 = {[], 0, {0, erlang:monotonic_time(), SyncThroughput}, {0, BQDepth}, erlang:monotonic_time()},
+ {_, _, {TotalBytes2, _, _}, _, _} = rabbit_mirror_queue_sync:append_to_acc(Msg, {}, false, FoldAcc2),
+ ?assertEqual(10, TotalBytes2), %% Message size is added to existing TotalBytes
+
+ FoldAcc3 = {[], 0, {TotalBytes2, erlang:monotonic_time(), SyncThroughput}, {0, BQDepth}, erlang:monotonic_time()},
+ {_, _, {TotalBytes3, _, _}, _, _} = rabbit_mirror_queue_sync:append_to_acc(Msg, {}, false, FoldAcc3),
+ ?assertEqual(TotalBytes2 + 10, TotalBytes3), %% Message size is added to existing TotalBytes
+ ok. \ No newline at end of file
diff --git a/deps/rabbit/test/unit_classic_mirrored_queue_throughput_SUITE.erl b/deps/rabbit/test/unit_classic_mirrored_queue_throughput_SUITE.erl
new file mode 100644
index 0000000000..7e10b5f5d9
--- /dev/null
+++ b/deps/rabbit/test/unit_classic_mirrored_queue_throughput_SUITE.erl
@@ -0,0 +1,29 @@
+-module(unit_classic_mirrored_queue_throughput_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ default_max_sync_throughput
+ ].
+
+default_max_sync_throughput(_Config) ->
+ ?assertEqual(
+ 0,
+ rabbit_mirror_queue_misc:default_max_sync_throughput()),
+ application:set_env(rabbit, mirroring_sync_max_throughput, 100),
+ ?assertEqual(
+ 100,
+ rabbit_mirror_queue_misc:default_max_sync_throughput()),
+ application:set_env(rabbit, mirroring_sync_max_throughput, "100MiB"),
+ ?assertEqual(
+ 100*1024*1024,
+ rabbit_mirror_queue_misc:default_max_sync_throughput()),
+ application:set_env(rabbit, mirroring_sync_max_throughput, "100MB"),
+ ?assertEqual(
+ 100000000,
+ rabbit_mirror_queue_misc:default_max_sync_throughput()),
+ ok.