summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_mirror_queue_misc.erl13
-rw-r--r--src/rabbit_mirror_queue_sync.erl14
3 files changed, 25 insertions, 8 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 20a5285542..67c8f34ac3 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -148,13 +148,15 @@ sync_mirrors(HandleInfo, EmitStats,
QName, "Synchronising: " ++ Fmt ++ "~n", Params)
end,
Log("~p messages to synchronise", [BQ:len(BQS)]),
- {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
+ {ok, #amqqueue{slave_pids = SPids} = Q} = rabbit_amqqueue:lookup(QName),
+ SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q),
+ Log("batch size: ~p", [SyncBatchSize]),
Ref = make_ref(),
Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(
- Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) of
+ Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
{sync_died, R, BQS1} -> Log("~p", [R]),
{ok, S(BQS1)};
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 0cddd0ddd8..4b22c2bf25 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -22,7 +22,7 @@
initial_queue_node/2, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
- log_info/3, log_warning/3]).
+ sync_batch_size/1, log_info/3, log_warning/3]).
%% for testing only
-export([module/1]).
@@ -45,6 +45,9 @@
{requires, rabbit_registry},
{enables, recovery}]}).
+%% For compatibility with versions that don't support sync batching.
+-define(DEFAULT_BATCH_SIZE, 1).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -365,6 +368,14 @@ maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
ok
end.
+sync_batch_size(#amqqueue{} = Q) ->
+ case policy(<<"ha-sync-batch-size">>, Q) of
+ BatchSize when BatchSize > 1 ->
+ BatchSize;
+ _ ->
+ ?DEFAULT_BATCH_SIZE
+ end.
+
update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 0bca3b2cc4..3ef4e9d9ac 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/4, master_go/7, slave/7]).
+-export([master_prepare/4, master_go/8, slave/7]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -65,9 +65,10 @@
-spec(master_prepare/4 :: (reference(), rabbit_amqqueue:name(),
log_fun(), [pid()]) -> pid()).
--spec(master_go/7 :: (pid(), reference(), log_fun(),
+-spec(master_go/8 :: (pid(), reference(), log_fun(),
rabbit_mirror_queue_master:stats_fun(),
rabbit_mirror_queue_master:stats_fun(),
+ non_neg_integer(),
bq(), bqs()) ->
{'already_synced', bqs()} | {'ok', bqs()} |
{'shutdown', any(), bqs()} |
@@ -90,13 +91,13 @@ master_prepare(Ref, QName, Log, SPids) ->
syncer(Ref, Log, MPid, SPids)
end).
-master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
+master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) ->
Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
receive
{'EXIT', Syncer, normal} -> {already_synced, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
{ready, Syncer} -> EmitStats({syncing, 0}),
- case maybe_batch() of
+ case maybe_batch(SyncBatchSize) of
true ->
master_batch_go0(Args, BQ, BQS);
false ->
@@ -202,7 +203,10 @@ handle_set_maximum_since_use() ->
ok
end.
-maybe_batch() -> true.
+maybe_batch(SyncBatchSize) when SyncBatchSize > 1 ->
+ true;
+maybe_batch(_SyncBatchSize) ->
+ false.
%% Master
%% ---------------------------------------------------------------------------