diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2021-12-28 00:36:22 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2021-12-28 00:36:22 +0300 |
| commit | a29a68a63297ec64a28d14ae260ef66511f2c5d1 (patch) | |
| tree | b040b4a28ceb680c1d01e1e728e7150d49f31105 | |
| parent | 49809d1516ec2909c9306c1fb46baf06312ba365 (diff) | |
| parent | b569ab5d74c9078c285e1b1cfce6fdd5f2291c79 (diff) | |
| download | rabbitmq-server-git-a29a68a63297ec64a28d14ae260ef66511f2c5d1.tar.gz | |
Merge branch 'thuandb-master'
| -rw-r--r-- | deps/rabbit/BUILD.bazel | 8 | ||||
| -rw-r--r-- | deps/rabbit/priv/schema/rabbit.schema | 17 | ||||
| -rw-r--r-- | deps/rabbit/src/rabbit_mirror_queue_master.erl | 10 | ||||
| -rw-r--r-- | deps/rabbit/src/rabbit_mirror_queue_misc.erl | 22 | ||||
| -rw-r--r-- | deps/rabbit/src/rabbit_mirror_queue_sync.erl | 70 | ||||
| -rw-r--r-- | deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl | 86 | ||||
| -rw-r--r-- | deps/rabbit/test/unit_classic_mirrored_queue_throughput_SUITE.erl | 29 |
7 files changed, 226 insertions, 16 deletions
diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index f607f38246..ebb517d2ad 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -989,6 +989,14 @@ suites = [ size = "medium", flaky = True, ), + rabbitmq_suite( + name = "rabbit_mirror_queue_sync_SUITE", + size = "small", + ), + rabbitmq_suite( + name = "rabbit_mirror_queue_misc_SUITE", + size = "small", + ), ] assert_suites( diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index a9435fadb8..1537639b10 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -1021,6 +1021,23 @@ end}. {mapping, "mirroring_sync_batch_size", "rabbit.mirroring_sync_batch_size", [{datatype, bytesize}, {validators, ["mirroring_sync_batch_size"]}]}. +%% Mirror sync max throughput (in bytes) per second. +%% Supported unit symbols: +%% k, kiB: kibibytes (2^10 - 1,024 bytes) +%% M, MiB: mebibytes (2^20 - 1,048,576 bytes) +%% G, GiB: gibibytes (2^30 - 1,073,741,824 bytes) +%% kB: kilobytes (10^3 - 1,000 bytes) +%% MB: megabytes (10^6 - 1,000,000 bytes) +%% GB: gigabytes (10^9 - 1,000,000,000 bytes) +%% +%% 0 means "no limit". +%% +%% {mirroring_sync_max_throughput, 0}, + +{mapping, "mirroring_sync_max_throughput", "rabbit.mirroring_sync_max_throughput", [ + {datatype, [integer, string]} +]}. + %% Peer discovery backend used by cluster formation. %% diff --git a/deps/rabbit/src/rabbit_mirror_queue_master.erl b/deps/rabbit/src/rabbit_mirror_queue_master.erl index 370a11af2b..e7980d44bc 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_master.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_master.erl @@ -156,13 +156,14 @@ sync_mirrors(HandleInfo, EmitStats, {ok, Q} = rabbit_amqqueue:lookup(QName), SPids = amqqueue:get_slave_pids(Q), SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q), - Log("batch size: ~p", [SyncBatchSize]), + SyncThroughput = rabbit_mirror_queue_misc:default_max_sync_throughput(), + log_mirror_sync_config(Log, SyncBatchSize, SyncThroughput), Ref = make_ref(), Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids), gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, case rabbit_mirror_queue_sync:master_go( - Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of + Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, SyncThroughput, BQ, BQS) of {cancelled, BQS1} -> Log(" synchronisation cancelled ", []), {ok, S(BQS1)}; {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; @@ -173,6 +174,11 @@ sync_mirrors(HandleInfo, EmitStats, {ok, S(BQS1)} end. +log_mirror_sync_config(Log, SyncBatchSize, 0) -> + Log("batch size: ~p", [SyncBatchSize]); +log_mirror_sync_config(Log, SyncBatchSize, SyncThroughput) -> + Log("max batch size: ~p; max sync throughput: ~p bytes/s", [SyncBatchSize, SyncThroughput]). + terminate({shutdown, dropped} = Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/deps/rabbit/src/rabbit_mirror_queue_misc.erl b/deps/rabbit/src/rabbit_mirror_queue_misc.erl index 7775489309..6b1e25122f 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_misc.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_misc.erl @@ -16,7 +16,8 @@ is_mirrored/1, is_mirrored_ha_nodes/1, update_mirrors/2, update_mirrors/1, validate_policy/1, maybe_auto_sync/1, maybe_drop_master_after_sync/1, - sync_batch_size/1, log_info/3, log_warning/3]). + sync_batch_size/1, default_max_sync_throughput/0, + log_info/3, log_warning/3]). -export([stop_all_slaves/5]). -export([sync_queue/1, cancel_sync_queue/1, queue_length/1]). @@ -506,6 +507,25 @@ default_batch_size() -> rabbit_misc:get_env(rabbit, mirroring_sync_batch_size, ?DEFAULT_BATCH_SIZE). +-define(DEFAULT_MAX_SYNC_THROUGHPUT, 0). + +default_max_sync_throughput() -> + case application:get_env(rabbit, mirroring_sync_max_throughput) of + {ok, Value} -> + case rabbit_resource_monitor_misc:parse_information_unit(Value) of + {ok, ParsedThroughput} -> + ParsedThroughput; + {error, parse_error} -> + rabbit_log:warning( + "The configured value for the mirroring_sync_max_throughput is " + "not a valid value: ~p. Disabled sync throughput control. ", + [Value]), + ?DEFAULT_MAX_SYNC_THROUGHPUT + end; + undefined -> + ?DEFAULT_MAX_SYNC_THROUGHPUT + end. + -spec update_mirrors (amqqueue:amqqueue(), amqqueue:amqqueue()) -> 'ok'. diff --git a/deps/rabbit/src/rabbit_mirror_queue_sync.erl b/deps/rabbit/src/rabbit_mirror_queue_sync.erl index 896bdd5c61..26554ece83 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_sync.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_sync.erl @@ -9,10 +9,15 @@ -include_lib("rabbit_common/include/rabbit.hrl"). --export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]). +-export([master_prepare/4, master_go/9, slave/7, conserve_resources/3]). + +%% Export for UTs +-export([maybe_master_batch_send/2, get_time_diff/3, append_to_acc/4]). -define(SYNC_PROGRESS_INTERVAL, 1000000). +-define(SYNC_THROUGHPUT_EVAL_INTERVAL_MILLIS, 50). + %% There are three processes around, the master, the syncer and the %% slave(s). The syncer is an intermediary, linked to the master in %% order to make sure we do not mess with the master's credit flow or @@ -67,23 +72,24 @@ master_prepare(Ref, QName, Log, SPids) -> rabbit_mirror_queue_master:stats_fun(), rabbit_mirror_queue_master:stats_fun(), non_neg_integer(), + non_neg_integer(), bq(), bqs()) -> {'already_synced', bqs()} | {'ok', bqs()} | {'cancelled', bqs()} | {'shutdown', any(), bqs()} | {'sync_died', any(), bqs()}. -master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) -> +master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, SyncThroughput, BQ, BQS) -> Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()}, receive {'EXIT', Syncer, normal} -> {already_synced, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; {ready, Syncer} -> EmitStats({syncing, 0}), - master_batch_go0(Args, SyncBatchSize, + master_batch_go0(Args, SyncBatchSize, SyncThroughput, BQ, BQS) end. -master_batch_go0(Args, BatchSize, BQ, BQS) -> +master_batch_go0(Args, BatchSize, SyncThroughput, BQ, BQS) -> FoldFun = fun (Msg, MsgProps, Unacked, Acc) -> Acc1 = append_to_acc(Msg, MsgProps, Unacked, Acc), @@ -92,24 +98,27 @@ master_batch_go0(Args, BatchSize, BQ, BQS) -> false -> {cont, Acc1} end end, - FoldAcc = {[], 0, {0, BQ:depth(BQS)}, erlang:monotonic_time()}, + FoldAcc = {[], 0, {0, erlang:monotonic_time(), SyncThroughput}, {0, BQ:depth(BQS)}, erlang:monotonic_time()}, bq_fold(FoldFun, FoldAcc, Args, BQ, BQS). master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, - {Batch, I, {Curr, Len}, Last}) -> + {Batch, I, {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, Last}) -> T = maybe_emit_stats(Last, I, EmitStats, Log), HandleInfo({syncing, I}), handle_set_maximum_since_use(), SyncMsg = {msgs, Ref, lists:reverse(Batch)}, - NewAcc = {[], I + length(Batch), {Curr, Len}, T}, + NewAcc = {[], I + length(Batch), {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, T}, master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent). %% Either send messages when we reach the last one in the queue or %% whenever we have accumulated BatchSize messages. -maybe_master_batch_send({_, _, {Len, Len}, _}, _BatchSize) -> +maybe_master_batch_send({_, _, _, {Len, Len}, _}, _BatchSize) -> + true; +maybe_master_batch_send({_, _, _, {Curr, _Len}, _}, BatchSize) + when Curr rem BatchSize =:= 0 -> true; -maybe_master_batch_send({_, _, {Curr, _Len}, _}, BatchSize) - when Curr rem BatchSize =:= 0 -> +maybe_master_batch_send({_, _, {TotalBytes, _, SyncThroughput}, {_Curr, _Len}, _}, _BatchSize) + when TotalBytes > SyncThroughput -> true; maybe_master_batch_send(_Acc, _BatchSize) -> false. @@ -121,8 +130,10 @@ bq_fold(FoldFun, FoldAcc, Args, BQ, BQS) -> {_, BQS1} -> master_done(Args, BQS1) end. -append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {Curr, Len}, T}) -> - {[{Msg, MsgProps, Unacked} | Batch], I, {Curr + 1, Len}, T}. +append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {_, _, 0}, {Curr, Len}, T}) -> + {[{Msg, MsgProps, Unacked} | Batch], I, {0, 0, 0}, {Curr + 1, Len}, T}; +append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, T}) -> + {[{Msg, MsgProps, Unacked} | Batch], I, {TotalBytes + rabbit_basic:msg_size(Msg), LastCheck, SyncThroughput}, {Curr + 1, Len}, T}. master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) -> receive @@ -131,11 +142,44 @@ master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) -> gen_server2:reply(From, ok), {stop, cancelled}; {next, Ref} -> Syncer ! SyncMsg, - {cont, NewAcc}; + {Msgs, I , {TotalBytes, LastCheck, SyncThroughput}, {Curr, Len}, T} = NewAcc, + {NewTotalBytes, NewLastCheck} = maybe_throttle_sync_throughput(TotalBytes, LastCheck, SyncThroughput), + {cont, {Msgs, I, {NewTotalBytes, NewLastCheck, SyncThroughput}, {Curr, Len}, T}}; {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} end. +maybe_throttle_sync_throughput(_ , _, 0) -> + {0, erlang:monotonic_time()}; +maybe_throttle_sync_throughput(TotalBytes, LastCheck, SyncThroughput) -> + Interval = erlang:convert_time_unit(erlang:monotonic_time() - LastCheck, native, milli_seconds), + case Interval > ?SYNC_THROUGHPUT_EVAL_INTERVAL_MILLIS of + true -> maybe_pause_sync(TotalBytes, Interval, SyncThroughput), + {0, erlang:monotonic_time()}; %% reset TotalBytes counter and LastCheck.; + false -> {TotalBytes, LastCheck} + end. + +maybe_pause_sync(TotalBytes, Interval, SyncThroughput) -> + Delta = get_time_diff(TotalBytes, Interval, SyncThroughput), + pause_queue_sync(Delta). + +pause_queue_sync(0) -> + rabbit_log_mirroring:debug("Sync throughput is ok."); +pause_queue_sync(Delta) -> + rabbit_log_mirroring:debug("Sync throughput exceeds threshold. Pause queue sync for ~p ms", [Delta]), + timer:sleep(Delta). + +%% Sync throughput computation: +%% - Total bytes have been sent since last check: TotalBytes +%% - Used/Elapsed time since last check: Interval (in milliseconds) +%% - Effective/Used throughput in bytes/s: TotalBytes/Interval * 1000. +%% - When UsedThroughput > SyncThroughput -> we need to slow down to compensate over-used rate. +%% The amount of time to pause queue sync is the different between time needed to broadcast TotalBytes at max throughput +%% and the elapsed time (Interval). +get_time_diff(TotalBytes, Interval, SyncThroughput) -> + rabbit_log_mirroring:debug("Total ~p bytes has been sent over last ~p ms. Effective sync througput: ~p", [TotalBytes, Interval, round(TotalBytes * 1000 / Interval)]), + max(round(TotalBytes/SyncThroughput * 1000 - Interval), 0). + master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) -> receive {'$gen_call', From, diff --git a/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl b/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl new file mode 100644 index 0000000000..502d5e430e --- /dev/null +++ b/deps/rabbit/test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl @@ -0,0 +1,86 @@ +-module(unit_classic_mirrored_queue_sync_throttling_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + maybe_master_batch_send, + get_time_diff, + append_to_acc + ]. + +maybe_master_batch_send(_Config) -> + SyncBatchSize = 4096, + SyncThroughput = 2000, + QueueLen = 10000, + ?assertEqual( + true, %% Message reach the last one in the queue + rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {0, 0, SyncThroughput}, {QueueLen, QueueLen}, 0}, SyncBatchSize)), + ?assertEqual( + true, %% # messages batched is less than batch size; and total message size has reached the batch size + rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {0, 0, SyncThroughput}, {SyncBatchSize, QueueLen}, 0}, SyncBatchSize)), + TotalBytes0 = SyncThroughput + 1, + Curr0 = 1, + ?assertEqual( + true, %% Total batch size exceed max sync throughput + rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {TotalBytes0, 0, SyncThroughput}, {Curr0, QueueLen}, 0}, SyncBatchSize)), + TotalBytes1 = 1, + Curr1 = 1, + ?assertEqual( + false, %% # messages batched is less than batch size; and total bytes is less than sync throughput + rabbit_mirror_queue_sync:maybe_master_batch_send({[], 0, {TotalBytes1, 0, SyncThroughput}, {Curr1, QueueLen}, 0}, SyncBatchSize)), + ok. + +get_time_diff(_Config) -> + TotalBytes0 = 100, + Interval0 = 1000, %% ms + MaxSyncThroughput0 = 100, %% bytes/s + ?assertEqual(%% Used throughput = 100 / 1000 * 1000 = 100 bytes/s; matched max throughput + 0, %% => no need to pause queue sync + rabbit_mirror_queue_sync:get_time_diff(TotalBytes0, Interval0, MaxSyncThroughput0)), + + TotalBytes1 = 100, + Interval1 = 1000, %% ms + MaxSyncThroughput1 = 200, %% bytes/s + ?assertEqual( %% Used throughput = 100 / 1000 * 1000 = 100 bytes/s; less than max throughput + 0, %% => no need to pause queue sync + rabbit_mirror_queue_sync:get_time_diff(TotalBytes1, Interval1, MaxSyncThroughput1)), + + TotalBytes2 = 100, + Interval2 = 1000, %% ms + MaxSyncThroughput2 = 50, %% bytes/s + ?assertEqual( %% Used throughput = 100 / 1000 * 1000 = 100 bytes/s; greater than max throughput + 1000, %% => pause queue sync for 1000 ms + rabbit_mirror_queue_sync:get_time_diff(TotalBytes2, Interval2, MaxSyncThroughput2)), + ok. + +append_to_acc(_Config) -> + Msg = #basic_message{ + id = 1, + content = #content{ + properties = #'P_basic'{ + priority = 2 + }, + payload_fragments_rev = [[<<"1234567890">>]] %% 10 bytes + }, + is_persistent = true + }, + BQDepth = 10, + SyncThroughput_0 = 0, + FoldAcc1 = {[], 0, {0, erlang:monotonic_time(), SyncThroughput_0}, {0, BQDepth}, erlang:monotonic_time()}, + {_, _, {TotalBytes1, _, _}, _, _} = rabbit_mirror_queue_sync:append_to_acc(Msg, {}, false, FoldAcc1), + ?assertEqual(0, TotalBytes1), %% Skipping calculating TotalBytes for the pending batch as SyncThroughput is 0. + + SyncThroughput = 100, + FoldAcc2 = {[], 0, {0, erlang:monotonic_time(), SyncThroughput}, {0, BQDepth}, erlang:monotonic_time()}, + {_, _, {TotalBytes2, _, _}, _, _} = rabbit_mirror_queue_sync:append_to_acc(Msg, {}, false, FoldAcc2), + ?assertEqual(10, TotalBytes2), %% Message size is added to existing TotalBytes + + FoldAcc3 = {[], 0, {TotalBytes2, erlang:monotonic_time(), SyncThroughput}, {0, BQDepth}, erlang:monotonic_time()}, + {_, _, {TotalBytes3, _, _}, _, _} = rabbit_mirror_queue_sync:append_to_acc(Msg, {}, false, FoldAcc3), + ?assertEqual(TotalBytes2 + 10, TotalBytes3), %% Message size is added to existing TotalBytes + ok.
\ No newline at end of file diff --git a/deps/rabbit/test/unit_classic_mirrored_queue_throughput_SUITE.erl b/deps/rabbit/test/unit_classic_mirrored_queue_throughput_SUITE.erl new file mode 100644 index 0000000000..7e10b5f5d9 --- /dev/null +++ b/deps/rabbit/test/unit_classic_mirrored_queue_throughput_SUITE.erl @@ -0,0 +1,29 @@ +-module(unit_classic_mirrored_queue_throughput_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +all() -> + [ + default_max_sync_throughput + ]. + +default_max_sync_throughput(_Config) -> + ?assertEqual( + 0, + rabbit_mirror_queue_misc:default_max_sync_throughput()), + application:set_env(rabbit, mirroring_sync_max_throughput, 100), + ?assertEqual( + 100, + rabbit_mirror_queue_misc:default_max_sync_throughput()), + application:set_env(rabbit, mirroring_sync_max_throughput, "100MiB"), + ?assertEqual( + 100*1024*1024, + rabbit_mirror_queue_misc:default_max_sync_throughput()), + application:set_env(rabbit, mirroring_sync_max_throughput, "100MB"), + ?assertEqual( + 100000000, + rabbit_mirror_queue_misc:default_max_sync_throughput()), + ok. |
