diff options
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 14 |
3 files changed, 25 insertions, 8 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 20a5285542..67c8f34ac3 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -148,13 +148,15 @@ sync_mirrors(HandleInfo, EmitStats, QName, "Synchronising: " ++ Fmt ++ "~n", Params) end, Log("~p messages to synchronise", [BQ:len(BQS)]), - {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), + {ok, #amqqueue{slave_pids = SPids} = Q} = rabbit_amqqueue:lookup(QName), + SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q), + Log("batch size: ~p", [SyncBatchSize]), Ref = make_ref(), Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids), gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, case rabbit_mirror_queue_sync:master_go( - Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) of + Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; {sync_died, R, BQS1} -> Log("~p", [R]), {ok, S(BQS1)}; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 0cddd0ddd8..4b22c2bf25 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -22,7 +22,7 @@ initial_queue_node/2, suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2, validate_policy/1, maybe_auto_sync/1, maybe_drop_master_after_sync/1, - log_info/3, log_warning/3]). + sync_batch_size/1, log_info/3, log_warning/3]). %% for testing only -export([module/1]). @@ -45,6 +45,9 @@ {requires, rabbit_registry}, {enables, recovery}]}). +%% For compatibility with versions that don't support sync batching. +-define(DEFAULT_BATCH_SIZE, 1). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -365,6 +368,14 @@ maybe_auto_sync(Q = #amqqueue{pid = QPid}) -> ok end. +sync_batch_size(#amqqueue{} = Q) -> + case policy(<<"ha-sync-batch-size">>, Q) of + BatchSize when BatchSize > 1 -> + BatchSize; + _ -> + ?DEFAULT_BATCH_SIZE + end. + update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 0bca3b2cc4..3ef4e9d9ac 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/4, master_go/7, slave/7]). +-export([master_prepare/4, master_go/8, slave/7]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -65,9 +65,10 @@ -spec(master_prepare/4 :: (reference(), rabbit_amqqueue:name(), log_fun(), [pid()]) -> pid()). --spec(master_go/7 :: (pid(), reference(), log_fun(), +-spec(master_go/8 :: (pid(), reference(), log_fun(), rabbit_mirror_queue_master:stats_fun(), rabbit_mirror_queue_master:stats_fun(), + non_neg_integer(), bq(), bqs()) -> {'already_synced', bqs()} | {'ok', bqs()} | {'shutdown', any(), bqs()} | @@ -90,13 +91,13 @@ master_prepare(Ref, QName, Log, SPids) -> syncer(Ref, Log, MPid, SPids) end). -master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> +master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) -> Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()}, receive {'EXIT', Syncer, normal} -> {already_synced, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; {ready, Syncer} -> EmitStats({syncing, 0}), - case maybe_batch() of + case maybe_batch(SyncBatchSize) of true -> master_batch_go0(Args, BQ, BQS); false -> @@ -202,7 +203,10 @@ handle_set_maximum_since_use() -> ok end. -maybe_batch() -> true. +maybe_batch(SyncBatchSize) when SyncBatchSize > 1 -> + true; +maybe_batch(_SyncBatchSize) -> + false. %% Master %% --------------------------------------------------------------------------- |
