diff options
| author | Michael Klishin <michael@novemberain.com> | 2015-10-12 20:01:11 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@novemberain.com> | 2015-10-12 20:01:11 +0300 |
| commit | 44a0ddb72dc0337235bcecf63878658bac4288a4 (patch) | |
| tree | 0754e646b74ac3290836fe391f446e98904e64f3 /src | |
| parent | 63cc2bb9391a34901de50b092f59045b1daf9289 (diff) | |
| parent | dc72935607e5bf6c563556139ea6992899b8520a (diff) | |
| download | rabbitmq-server-git-44a0ddb72dc0337235bcecf63878658bac4288a4.tar.gz | |
Merge pull request #344 from rabbitmq/rabbitmq-server-336
Implements Mirror Queue Sync in Batches
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_backing_queue.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 75 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 169 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 132 |
7 files changed, 407 insertions, 116 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index a03bda13c9..2b808e206c 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -33,6 +33,10 @@ -type(flow() :: 'flow' | 'noflow'). -type(msg_ids() :: [rabbit_types:msg_id()]). +-type(publish() :: {rabbit_types:basic_message(), + rabbit_types:message_properties(), boolean()}). +-type(delivered_publish() :: {rabbit_types:basic_message(), + rabbit_types:message_properties()}). -type(fetch_result(Ack) :: ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). -type(drop_result(Ack) :: @@ -104,6 +108,9 @@ rabbit_types:message_properties(), boolean(), pid(), flow(), state()) -> state(). +%% Like publish/6 but for batches of publishes. +-callback batch_publish([publish()], pid(), flow(), state()) -> state(). + %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). @@ -112,6 +119,11 @@ state()) -> {ack(), state()}. +%% Like publish_delivered/5 but for batches of publishes. +-callback batch_publish_delivered([delivered_publish()], pid(), flow(), + state()) + -> {[ack()], state()}. + %% Called to inform the BQ about messages which have reached the %% queue, but are not going to be further passed to BQ. -callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state(). @@ -253,8 +265,9 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {delete_crashed, 1}, {purge, 1}, - {purge_acks, 1}, {publish, 6}, - {publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1}, + {purge_acks, 1}, {publish, 6}, {publish_delivered, 5}, + {batch_publish, 4}, {batch_publish_delivered, 4}, + {discard, 4}, {drain_confirmed, 1}, {dropwhile, 2}, {fetchwhile, 4}, {fetch, 2}, {drop, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 7890128872..ee3a097a80 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,6 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1, publish/6, publish_delivered/5, + batch_publish/4, batch_publish_delivered/4, discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, @@ -147,13 +148,15 @@ sync_mirrors(HandleInfo, EmitStats, QName, "Synchronising: " ++ Fmt ++ "~n", Params) end, Log("~p messages to synchronise", [BQ:len(BQS)]), - {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), + {ok, #amqqueue{slave_pids = SPids} = Q} = rabbit_amqqueue:lookup(QName), + SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q), + Log("batch size: ~p", [SyncBatchSize]), Ref = make_ref(), Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids), gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, case rabbit_mirror_queue_sync:master_go( - Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) of + Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; {sync_died, R, BQS1} -> Log("~p", [R]), {ok, S(BQS1)}; @@ -241,6 +244,27 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow, BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). +batch_publish(Publishes, ChPid, Flow, + State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {Publishes1, false, MsgSizes} = + lists:foldl(fun ({Msg = #basic_message { id = MsgId }, + MsgProps, _IsDelivered}, {Pubs, false, Sizes}) -> + {[{Msg, MsgProps, true} | Pubs], %% [0] + false = dict:is_key(MsgId, SS), %% ASSERTION + Sizes + rabbit_basic:msg_size(Msg)} + end, {[], false, 0}, Publishes), + Publishes2 = lists:reverse(Publishes1), + ok = gm:broadcast(GM, {batch_publish, ChPid, Flow, Publishes2}, + MsgSizes), + BQS1 = BQ:batch_publish(Publishes2, ChPid, Flow, BQS), + ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). +%% [0] When the slave process handles the publish command, it sets the +%% IsDelivered flag to true, so to avoid iterating over the messages +%% again at the slave, we do it here. + publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, Flow, State = #state { gm = GM, seen_status = SS, @@ -253,6 +277,23 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. +batch_publish_delivered(Publishes, ChPid, Flow, + State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {false, MsgSizes} = + lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps}, + {false, Sizes}) -> + {false = dict:is_key(MsgId, SS), %% ASSERTION + Sizes + rabbit_basic:msg_size(Msg)} + end, {false, 0}, Publishes), + ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes}, + MsgSizes), + {AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS), + State1 = State #state { backing_queue_state = BQS1 }, + {AckTags, ensure_monitoring(ChPid, State1)}. + discard(MsgId, ChPid, Flow, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index fee890476e..b8997faea5 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -22,7 +22,7 @@ initial_queue_node/2, suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2, validate_policy/1, maybe_auto_sync/1, maybe_drop_master_after_sync/1, - log_info/3, log_warning/3]). + sync_batch_size/1, log_info/3, log_warning/3]). %% for testing only -export([module/1]). @@ -39,10 +39,13 @@ {mfa, {rabbit_registry, register, [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-sync-batch-size">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}}, {requires, rabbit_registry}, {enables, recovery}]}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -332,6 +335,14 @@ module(Mode) when is_binary(Mode) -> end end. +validate_mode(Mode) -> + case module(Mode) of + {ok, _Module} -> + ok; + not_mirrored -> + {error, "~p is not a valid ha-mode value", [Mode]} + end. + is_mirrored(Q) -> case module(Q) of {ok, _} -> true; @@ -355,6 +366,22 @@ maybe_auto_sync(Q = #amqqueue{pid = QPid}) -> ok end. +sync_batch_size(#amqqueue{} = Q) -> + case policy(<<"ha-sync-batch-size">>, Q) of + none -> %% we need this case because none > 1 == true + default_batch_size(); + BatchSize when BatchSize > 1 -> + BatchSize; + _ -> + default_batch_size() + end. + +-define(DEFAULT_BATCH_SIZE, 4096). + +default_batch_size() -> + rabbit_misc:get_env(rabbit, mirroring_sync_batch_size, + ?DEFAULT_BATCH_SIZE). + update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of @@ -410,25 +437,37 @@ validate_policy(KeyList) -> Mode = proplists:get_value(<<"ha-mode">>, KeyList, none), Params = proplists:get_value(<<"ha-params">>, KeyList, none), SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none), + SyncBatchSize = proplists:get_value( + <<"ha-sync-batch-size">>, KeyList, none), PromoteOnShutdown = proplists:get_value( <<"ha-promote-on-shutdown">>, KeyList, none), - case {Mode, Params, SyncMode, PromoteOnShutdown} of - {none, none, none, none} -> + case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown} of + {none, none, none, none, none} -> ok; - {none, _, _, _} -> + {none, _, _, _, _} -> {error, "ha-mode must be specified to specify ha-params, " "ha-sync-mode or ha-promote-on-shutdown", []}; _ -> - case module(Mode) of - {ok, M} -> case M:validate_policy(Params) of - ok -> case validate_sync_mode(SyncMode) of - ok -> validate_pos(PromoteOnShutdown); - E -> E - end; - E -> E - end; - _ -> {error, "~p is not a valid ha-mode value", [Mode]} - end + validate_policies( + [{Mode, fun validate_mode/1}, + {Params, ha_params_validator(Mode)}, + {SyncMode, fun validate_sync_mode/1}, + {SyncBatchSize, fun validate_sync_batch_size/1}, + {PromoteOnShutdown, fun validate_pos/1}]) + end. + +ha_params_validator(Mode) -> + fun(Val) -> + {ok, M} = module(Mode), + M:validate_policy(Val) + end. + +validate_policies([]) -> + ok; +validate_policies([{Val, Validator} | Rest]) -> + case Validator(Val) of + ok -> validate_policies(Rest); + E -> E end. validate_sync_mode(SyncMode) -> @@ -440,6 +479,14 @@ validate_sync_mode(SyncMode) -> "or \"automatic\", got ~p", [Mode]} end. +validate_sync_batch_size(none) -> + ok; +validate_sync_batch_size(N) when is_integer(N) andalso N > 0 -> + ok; +validate_sync_batch_size(N) -> + {error, "ha-sync-batch-size takes an integer greather than 0, " + "~p given", [N]}. + validate_pos(PromoteOnShutdown) -> case PromoteOnShutdown of <<"always">> -> ok; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 7f309ab0b7..5da91c70c5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -851,6 +851,15 @@ process_instruction({publish, ChPid, Flow, MsgProps, publish_or_discard(published, ChPid, MsgId, State), BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; +process_instruction({batch_publish, ChPid, Flow, Publishes}, State) -> + maybe_flow_ack(ChPid, Flow), + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + lists:foldl(fun ({#basic_message { id = MsgId }, + _MsgProps, _IsDelivered}, St) -> + publish_or_discard(published, ChPid, MsgId, St) + end, State, Publishes), + BQS1 = BQ:batch_publish(Publishes, ChPid, Flow, BQS), + {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({publish_delivered, ChPid, Flow, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> maybe_flow_ack(ChPid, Flow), @@ -860,6 +869,24 @@ process_instruction({publish_delivered, ChPid, Flow, MsgProps, {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), {ok, maybe_store_ack(true, MsgId, AckTag, State1 #state { backing_queue_state = BQS1 })}; +process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) -> + maybe_flow_ack(ChPid, Flow), + {MsgIds, + State1 = #state { backing_queue = BQ, backing_queue_state = BQS }} = + lists:foldl(fun ({#basic_message { id = MsgId }, _MsgProps}, + {MsgIds, St}) -> + {[MsgId | MsgIds], + publish_or_discard(published, ChPid, MsgId, St)} + end, {[], State}, Publishes), + true = BQ:is_empty(BQS), + {AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS), + MsgIdsAndAcks = lists:zip(lists:reverse(MsgIds), AckTags), + State2 = lists:foldl( + fun ({MsgId, AckTag}, St) -> + maybe_store_ack(true, MsgId, AckTag, St) + end, State1 #state { backing_queue_state = BQS1 }, + MsgIdsAndAcks), + {ok, State2}; process_instruction({discard, ChPid, Flow, MsgId}, State) -> maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index b76422ee6b..534ef1afad 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/4, master_go/7, slave/7]). +-export([master_prepare/4, master_go/8, slave/7]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -45,7 +45,7 @@ %% || <--- ready ---- || || %% || <--- next* ---- || || } %% || ---- msg* ----> || || } loop -%% || || ---- sync_msg* ----> || } +%% || || ---- sync_msgs* ---> || } %% || || <--- (credit)* ----- || } %% || <--- next ---- || || %% || ---- done ----> || || @@ -63,9 +63,10 @@ -spec(master_prepare/4 :: (reference(), rabbit_amqqueue:name(), log_fun(), [pid()]) -> pid()). --spec(master_go/7 :: (pid(), reference(), log_fun(), +-spec(master_go/8 :: (pid(), reference(), log_fun(), rabbit_mirror_queue_master:stats_fun(), rabbit_mirror_queue_master:stats_fun(), + non_neg_integer(), bq(), bqs()) -> {'already_synced', bqs()} | {'ok', bqs()} | {'shutdown', any(), bqs()} | @@ -88,48 +89,65 @@ master_prepare(Ref, QName, Log, SPids) -> syncer(Ref, Log, MPid, SPids) end). -master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> +master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) -> Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()}, receive {'EXIT', Syncer, normal} -> {already_synced, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; {ready, Syncer} -> EmitStats({syncing, 0}), - master_go0(Args, BQ, BQS) + master_batch_go0(Args, SyncBatchSize, + BQ, BQS) end. -master_go0(Args, BQ, BQS) -> - case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) -> - master_send(Msg, MsgProps, Unacked, Args, Acc) - end, {0, time_compat:monotonic_time()}, BQS) of +master_batch_go0(Args, BatchSize, BQ, BQS) -> + FoldFun = + fun (Msg, MsgProps, Unacked, Acc) -> + Acc1 = append_to_acc(Msg, MsgProps, Unacked, Acc), + case maybe_master_batch_send(Acc1, BatchSize) of + true -> master_batch_send(Args, Acc1); + false -> {cont, Acc1} + end + end, + FoldAcc = {[], 0, {0, BQ:depth(BQS)}, time_compat:monotonic_time()}, + bq_fold(FoldFun, FoldAcc, Args, BQ, BQS). + +master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, + {Batch, I, {Curr, Len}, Last}) -> + T = maybe_emit_stats(Last, I, EmitStats, Log), + HandleInfo({syncing, I}), + handle_set_maximum_since_use(), + SyncMsg = {msgs, Ref, lists:reverse(Batch)}, + NewAcc = {[], I + length(Batch), {Curr, Len}, T}, + master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent). + +%% Either send messages when we reach the last one in the queue or +%% whenever we have accumulated BatchSize messages. +maybe_master_batch_send({_, _, {Len, Len}, _}, _BatchSize) -> + true; +maybe_master_batch_send({_, _, {Curr, _Len}, _}, BatchSize) + when Curr rem BatchSize =:= 0 -> + true; +maybe_master_batch_send(_Acc, _BatchSize) -> + false. + +bq_fold(FoldFun, FoldAcc, Args, BQ, BQS) -> + case BQ:fold(FoldFun, FoldAcc, BQS) of {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; {_, BQS1} -> master_done(Args, BQS1) end. -master_send(Msg, MsgProps, Unacked, - {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) -> - Interval = time_compat:convert_time_unit( - time_compat:monotonic_time() - Last, native, micro_seconds), - T = case Interval > ?SYNC_PROGRESS_INTERVAL of - true -> EmitStats({syncing, I}), - Log("~p messages", [I]), - time_compat:monotonic_time(); - false -> Last - end, - HandleInfo({syncing, I}), - receive - {'$gen_cast', {set_maximum_since_use, Age}} -> - ok = file_handle_cache:set_maximum_since_use(Age) - after 0 -> - ok - end, +append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {Curr, Len}, T}) -> + {[{Msg, MsgProps, Unacked} | Batch], I, {Curr + 1, Len}, T}. + +master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) -> receive {'$gen_call', From, cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}), gen_server2:reply(From, ok), {stop, cancelled}; - {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked}, - {cont, {I + 1, T}}; + {next, Ref} -> Syncer ! SyncMsg, + {cont, NewAcc}; {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} end. @@ -149,6 +167,24 @@ stop_syncer(Syncer, Msg) -> after 0 -> ok end. +maybe_emit_stats(Last, I, EmitStats, Log) -> + Interval = time_compat:convert_time_unit( + time_compat:monotonic_time() - Last, native, micro_seconds), + case Interval > ?SYNC_PROGRESS_INTERVAL of + true -> EmitStats({syncing, I}), + Log("~p messages", [I]), + time_compat:monotonic_time(); + false -> Last + end. + +handle_set_maximum_since_use() -> + receive + {'$gen_cast', {set_maximum_since_use, Age}} -> + ok = file_handle_cache:set_maximum_since_use(Age) + after 0 -> + ok + end. + %% Master %% --------------------------------------------------------------------------- %% Syncer @@ -184,12 +220,9 @@ await_slaves(Ref, SPids) -> syncer_loop(Ref, MPid, SPids) -> MPid ! {next, Ref}, receive - {msg, Ref, Msg, MsgProps, Unacked} -> + {msgs, Ref, Msgs} -> SPids1 = wait_for_credit(SPids), - [begin - credit_flow:send(SPid), - SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked} - end || SPid <- SPids1], + broadcast(SPids1, {sync_msgs, Ref, Msgs}), syncer_loop(Ref, MPid, SPids1); {cancel, Ref} -> %% We don't tell the slaves we will die - so when we do @@ -200,6 +233,12 @@ syncer_loop(Ref, MPid, SPids) -> [SPid ! {sync_complete, Ref} || SPid <- SPids] end. +broadcast(SPids, Msg) -> + [begin + credit_flow:send(SPid), + SPid ! Msg + end || SPid <- SPids]. + wait_for_credit(SPids) -> case credit_flow:blocked() of true -> receive @@ -260,17 +299,9 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, update_ram_duration -> {TRef1, BQS1} = UpdateRamDuration(BQ, BQS), slave_sync_loop(Args, {MA, TRef1, BQS1}); - {sync_msg, Ref, Msg, Props, Unacked} -> + {sync_msgs, Ref, Batch} -> credit_flow:ack(Syncer), - Props1 = Props#message_properties{needs_confirming = false}, - {MA1, BQS1} = - case Unacked of - false -> {MA, - BQ:publish(Msg, Props1, true, none, noflow, BQS)}; - true -> {AckTag, BQS2} = BQ:publish_delivered( - Msg, Props1, none, noflow, BQS), - {[{Msg#basic_message.id, AckTag} | MA], BQS2} - end, + {MA1, BQS1} = process_batch(Batch, MA, BQ, BQS), slave_sync_loop(Args, {MA1, TRef, BQS1}); {'EXIT', Parent, Reason} -> {stop, Reason, State}; @@ -279,3 +310,55 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, BQ:delete_and_terminate(Reason, BQS), {stop, Reason, {[], TRef, undefined}} end. + +%% We are partitioning messages by the Unacked element in the tuple. +%% when unacked = true, then it's a publish_delivered message, +%% otherwise it's a publish message. +%% +%% Note that we can't first partition the batch and then publish each +%% part, since that would result in re-ordering messages, which we +%% don't want to do. +process_batch([], MA, _BQ, BQS) -> + {MA, BQS}; +process_batch(Batch, MA, BQ, BQS) -> + {_Msg, _MsgProps, Unacked} = hd(Batch), + process_batch(Batch, Unacked, [], MA, BQ, BQS). + +process_batch([{Msg, Props, true = Unacked} | Rest], true = Unacked, + Acc, MA, BQ, BQS) -> + %% publish_delivered messages don't need the IsDelivered flag, + %% therefore we just add {Msg, Props} to the accumulator. + process_batch(Rest, Unacked, [{Msg, props(Props)} | Acc], + MA, BQ, BQS); +process_batch([{Msg, Props, false = Unacked} | Rest], false = Unacked, + Acc, MA, BQ, BQS) -> + %% publish messages needs the IsDelivered flag which is set to true + %% here. + process_batch(Rest, Unacked, [{Msg, props(Props), true} | Acc], + MA, BQ, BQS); +process_batch(Batch, Unacked, Acc, MA, BQ, BQS) -> + {MA1, BQS1} = publish_batch(Unacked, lists:reverse(Acc), MA, BQ, BQS), + process_batch(Batch, MA1, BQ, BQS1). + +%% Unacked msgs are published via batch_publish. +publish_batch(false, Batch, MA, BQ, BQS) -> + batch_publish(Batch, MA, BQ, BQS); +%% Acked msgs are published via batch_publish_delivered. +publish_batch(true, Batch, MA, BQ, BQS) -> + batch_publish_delivered(Batch, MA, BQ, BQS). + + +batch_publish(Batch, MA, BQ, BQS) -> + BQS1 = BQ:batch_publish(Batch, none, noflow, BQS), + {MA, BQS1}. + +batch_publish_delivered(Batch, MA, BQ, BQS) -> + {AckTags, BQS1} = BQ:batch_publish_delivered(Batch, none, noflow, BQS), + MA1 = + lists:foldl(fun ({{Msg, _}, AckTag}, MAs) -> + [{Msg#basic_message.id, AckTag} | MAs] + end, MA, lists:zip(Batch, AckTags)), + {MA1, BQS1}. + +props(Props) -> + Props#message_properties{needs_confirming = false}. diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index a839badfc4..28cee163a1 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -35,6 +35,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, purge/1, purge_acks/1, publish/6, publish_delivered/5, discard/4, drain_confirmed/1, + batch_publish/4, batch_publish_delivered/4, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, @@ -203,6 +204,19 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)). +batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> + PubDict = publishes_by_priority( + Publishes, fun ({Msg, _, _}) -> Msg end), + lists:foldl( + fun ({Priority, Pubs}, St) -> + pick1(fun (_P, BQSN) -> + BQ:batch_publish(Pubs, ChPid, Flow, BQSN) + end, Priority, St) + end, State, orddict:to_list(PubDict)); +batch_publish(Publishes, ChPid, Flow, + State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(batch_publish(Publishes, ChPid, Flow, BQS)). + publish_delivered(Msg, MsgProps, ChPid, Flow, State = #state{bq = BQ}) -> pick2(fun (P, BQSN) -> {AckTag, BQSN1} = BQ:publish_delivered( @@ -213,6 +227,26 @@ publish_delivered(Msg, MsgProps, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)). +batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> + PubDict = publishes_by_priority( + Publishes, fun ({Msg, _}) -> Msg end), + {PrioritiesAndAcks, State1} = + lists:foldl( + fun ({Priority, Pubs}, {PriosAndAcks, St}) -> + {PriosAndAcks1, St1} = + pick2(fun (P, BQSN) -> + {AckTags, BQSN1} = + BQ:batch_publish_delivered( + Pubs, ChPid, Flow, BQSN), + {{P, AckTags}, BQSN1} + end, Priority, St), + {[PriosAndAcks1 | PriosAndAcks], St1} + end, {[], State}, orddict:to_list(PubDict)), + {lists:reverse(PrioritiesAndAcks), State1}; +batch_publish_delivered(Publishes, ChPid, Flow, + State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(batch_publish_delivered(Publishes, ChPid, Flow, BQS)). + %% TODO this is a hack. The BQ api does not give us enough information %% here - if we had the Msg we could look at its priority and forward %% to the appropriate sub-BQ. But we don't so we are stuck. @@ -532,6 +566,11 @@ a(State = #state{bqss = BQSs}) -> end. %%---------------------------------------------------------------------------- +publishes_by_priority(Publishes, ExtractMsg) -> + lists:foldl(fun (Pub, Dict) -> + Msg = ExtractMsg(Pub), + rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict) + end, orddict:new(), Publishes). priority(P, BQSs) when is_integer(P) -> {P, bq_fetch(P, BQSs)}; @@ -540,18 +579,21 @@ priority(#basic_message{content = Content}, BQSs) -> priority1(_Content, [{P, BQSN}]) -> {P, BQSN}; -priority1(Content = #content{properties = Props}, - [{P, BQSN} | Rest]) -> - #'P_basic'{priority = Priority0} = Props, - Priority = case Priority0 of - undefined -> 0; - _ when is_integer(Priority0) -> Priority0 - end, - case Priority >= P of +priority1(Content, [{P, BQSN} | Rest]) -> + case priority2(Content) >= P of true -> {P, BQSN}; false -> priority1(Content, Rest) end. +priority2(#basic_message{content = Content}) -> + priority2(rabbit_binary_parser:ensure_content_decoded(Content)); +priority2(#content{properties = Props}) -> + #'P_basic'{priority = Priority0} = Props, + case Priority0 of + undefined -> 0; + _ when is_integer(Priority0) -> Priority0 + end. + add_maybe_infinity(infinity, _) -> infinity; add_maybe_infinity(_, infinity) -> infinity; add_maybe_infinity(A, B) -> A + B. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a0e28e88f6..5e7672e7f0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,9 @@ -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, purge/1, purge_acks/1, - publish/6, publish_delivered/5, discard/4, drain_confirmed/1, + publish/6, publish_delivered/5, + batch_publish/4, batch_publish_delivered/4, + discard/4, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, @@ -558,52 +560,32 @@ purge(State = #vqstate { len = Len }) -> purge_acks(State) -> a(purge_pending_ack(false, State)). -publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, _ChPid, _Flow, - State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - State2 = case ?QUEUE:is_empty(Q3) of - false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } - end, - InCount1 = InCount + 1, - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({1, 0}, {none, MsgStatus1}, - State2#vqstate{ next_seq_id = SeqId + 1, - in_counter = InCount1, - unconfirmed = UC1 }), - a(reduce_memory_use(maybe_update_rates(State3))). - -publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, - id = MsgId }, - MsgProps = #message_properties { - needs_confirming = NeedsConfirming }, - _ChPid, _Flow, - State = #vqstate { qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - State2 = record_pending_ack(m(MsgStatus1), State1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({0, 1}, {none, MsgStatus1}, - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - unconfirmed = UC1 }), - {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. +publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) -> + State1 = + publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, + fun maybe_write_to_disk/4, + State), + a(reduce_memory_use(maybe_update_rates(State1))). + +batch_publish(Publishes, ChPid, Flow, State) -> + {ChPid, Flow, State1} = + lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes), + State2 = ui(State1), + a(reduce_memory_use(maybe_update_rates(State2))). + +publish_delivered(Msg, MsgProps, ChPid, Flow, State) -> + {SeqId, State1} = + publish_delivered1(Msg, MsgProps, ChPid, Flow, + fun maybe_write_to_disk/4, + State), + {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}. + +batch_publish_delivered(Publishes, ChPid, Flow, State) -> + {ChPid, Flow, SeqIds, State1} = + lists:foldl(fun batch_publish_delivered1/2, + {ChPid, Flow, [], State}, Publishes), + State2 = ui(State1), + {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}. discard(_MsgId, _ChPid, _Flow, State) -> State. @@ -1563,6 +1545,62 @@ process_delivers_and_acks_fun(_) -> %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- +publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, + MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + IsDelivered, _ChPid, _Flow, PersistFun, + State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), + {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), + State2 = case ?QUEUE:is_empty(Q3) of + false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; + true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } + end, + InCount1 = InCount + 1, + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + stats({1, 0}, {none, MsgStatus1}, + State2#vqstate{ next_seq_id = SeqId + 1, + in_counter = InCount1, + unconfirmed = UC1 }). + +batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) -> + {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, + fun maybe_prepare_write_to_disk/4, State)}. + +publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, + id = MsgId }, + MsgProps = #message_properties { + needs_confirming = NeedsConfirming }, + _ChPid, _Flow, PersistFun, + State = #vqstate { qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), + {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), + State2 = record_pending_ack(m(MsgStatus1), State1), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + State3 = stats({0, 1}, {none, MsgStatus1}, + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + unconfirmed = UC1 }), + {SeqId, State3}. + +batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) -> + {SeqId, State1} = + publish_delivered1(Msg, MsgProps, ChPid, Flow, + fun maybe_prepare_write_to_disk/4, + State), + {ChPid, Flow, [SeqId | SeqIds], State1}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_in_store = true }, State) -> |
